-
[Kafka Streams] Config 알아보기Kafka 2023. 12. 29. 06:31728x90반응형
- 목차
소개.
Kafka Streams 를 실행하기 위한 기본적은 설정들에 대해서 알아보도록 하겠습니다.
StreamsConfig.
APPLICATION_ID_CONFIG.
APPLICATION_ID_CONFIG 설정은 Kafka Consumer 의 client ID 와 consumer group 에 관한 설정입니다.
Kafka Streams 는 스트림 프로세싱 어플리케이션으로써 Kafka Consumer 이기도 합니다.
그리고 Kafka Consumer 는 반드시 consumer group 이 필요하죠.
그리고 Broker 의 Client 이기 때문에 client ID 또한 필요합니다.
APPLICATION_ID_CONFIG 의 값은 client ID prefix 로 사용되며, consumer group 이름이기도 합니다.
APPLICATION_ID_CONFIG 을 "words-processing-stream" 로 설정하였습니다.
그리고 Kafka Streams 를 실행시키면 consumer group 은 words-processing-stream 이고
Client ID 는 words-processing-stream-XXX 로 시작하는 값으로 설정됩니다.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream");
< Kafka CLI kafka-consumer-groups >
아래의 명령어를 통해서 consumer group 을 조회할 수 있습니다.
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID words-processing-stream words 0 - 0 - words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1 words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer words-processing-stream words 1 11090 11098 8 words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1 words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer words-processing-stream words 2 - 0 - words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1 words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer
BOOTSTRAP_SERVERS_CONFIGS.
BOOTSTRAP_SERVERS_CONFIGS 는 "bootstrap.servers" 의 상수 표현입니다.
일반적인 카프카 컨슈머, 프로듀서에서 설정하던 "bootstrap.servers" 와 동일합니다.
BOOTSTRAP_SERVERS_CONFIGS 설정의 값의 대상은 카프카 브로커들의 주소 정보이구요.
더 정확히 표현하자면, 카프카 브로커의 advertised.listeners 에 등록된 IP 와 Port 가 필요합니다.
클라이언트인 카프카 스트림 어플리케이션이 BOOTSTRAP_SERVERS_CONFIGS 에 등록된 IP:Port 를 통해서 브로커에 접근할 수 있습니다.
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29093,localhost:29093");
DEFAULT_KEY_SERDE_CLASS_CONFIG.
DEDAULT_KEY_SERDE_CLASS_CONFIG 설정은 Serialization 을 수행하는 Class 의 Fully Qualified Package 값이 필요합니다.
이름에서 알 수 있듯이, KStream 별로 전달되는 Key-Value Pair Record 의 Key 를 직렬화/역직렬화 하는데에 사용되는 클래스입니다.
DEDAULT_KEY_SERDE_CLASS_CONFIG 에 설정한 직렬화 클래스는 기본적으로 사용하는 직렬화/역직렬화 방식입니다.
만약 KStream 을 생성할 때에 Produced.with(Serdes.String(),Serdes.String()) 와 같은 설정을 선언하지 않는다면,
DEDAULT_KEY_SERDE_CLASS_CONFIG 에 설정된 직렬화/역직렬화 방식이 적용됩니다.
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29093,localhost:29093"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
DEFAULT_VALUE_SERDE_CLASS_CONFIG.
DEDAULT_VALUE_SERDE_CLASS_CONFIG 설정은 DEDAULT_KEY_SERDE_CLASS_CONFIG 와 동일합니다.
다만, 직렬화/역직렬화의 대상이 Record 의 Key 이냐 Value 이냐의 차이가 있습니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] listeners, advertised.listeners 알아보기 (2) 2024.01.01 [Kafka] Partition Leader Election 알아보기 (파티션 리더 선출) (0) 2023.12.31 Kafka Log Segment 알아보기 (0) 2023.12.25 [Kafka] Zookeeper 는 Broker 를 어떻게 관리할까 ? (0) 2023.12.22 Kafka Repartition 알아보기. (0) 2023.12.22