ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Streams] Config 알아보기
    Kafka 2023. 12. 29. 06:31
    728x90
    반응형

    - 목차

     

    소개.

    Kafka Streams 를 실행하기 위한 기본적은 설정들에 대해서 알아보도록 하겠습니다.

     

     

    StreamsConfig.

    APPLICATION_ID_CONFIG.

    APPLICATION_ID_CONFIG 설정은 Kafka Consumer 의 client IDconsumer group 에 관한 설정입니다.

    Kafka Streams 는 스트림 프로세싱 어플리케이션으로써 Kafka Consumer 이기도 합니다.

    그리고 Kafka Consumer 는 반드시 consumer group 이 필요하죠.

    그리고 Broker 의 Client 이기 때문에 client ID 또한 필요합니다.

    APPLICATION_ID_CONFIG 의 값은 client ID prefix 로 사용되며, consumer group 이름이기도 합니다.

     

    APPLICATION_ID_CONFIG 을 "words-processing-stream" 로 설정하였습니다.

    그리고 Kafka Streams 를 실행시키면 consumer group 은 words-processing-stream 이고

    Client ID 는 words-processing-stream-XXX 로 시작하는 값으로 설정됩니다.

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream");

     

    < Kafka CLI kafka-consumer-groups >

    아래의 명령어를 통해서 consumer group 을 조회할 수 있습니다.

    kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
    GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                               HOST            CLIENT-ID
    words-processing-stream words           0          -               0               -               words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1   words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer
    words-processing-stream words           1          11090           11098           8               words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1   words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer
    words-processing-stream words           2          -               0               -               words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer-6d2c0859-3977-4389-b239-504e2f8dea59 /192.168.65.1   words-processing-stream-6e5f80ea-f8e9-4d73-ab34-cbc217095d74-StreamThread-1-consumer

     

     

     

    BOOTSTRAP_SERVERS_CONFIGS.

    BOOTSTRAP_SERVERS_CONFIGS"bootstrap.servers" 의 상수 표현입니다.

    일반적인 카프카 컨슈머, 프로듀서에서 설정하던 "bootstrap.servers" 와 동일합니다.

    BOOTSTRAP_SERVERS_CONFIGS 설정의 값의 대상은 카프카 브로커들의 주소 정보이구요.

    더 정확히 표현하자면, 카프카 브로커의 advertised.listeners 에 등록된 IP 와 Port 가 필요합니다.

    클라이언트인 카프카 스트림 어플리케이션이 BOOTSTRAP_SERVERS_CONFIGS 에 등록된 IP:Port 를 통해서 브로커에 접근할 수 있습니다.

     

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29093,localhost:29093");

     

    DEFAULT_KEY_SERDE_CLASS_CONFIG.

    DEDAULT_KEY_SERDE_CLASS_CONFIG 설정은 Serialization 을 수행하는 Class 의 Fully Qualified Package 값이 필요합니다.

    이름에서 알 수 있듯이, KStream 별로 전달되는 Key-Value Pair Record 의 Key 를 직렬화/역직렬화 하는데에 사용되는 클래스입니다.

     

    DEDAULT_KEY_SERDE_CLASS_CONFIG 에 설정한 직렬화 클래스는 기본적으로 사용하는 직렬화/역직렬화 방식입니다.

    만약 KStream 을 생성할 때에 Produced.with(Serdes.String(),Serdes.String()) 와 같은 설정을 선언하지 않는다면,

    DEDAULT_KEY_SERDE_CLASS_CONFIG 에 설정된 직렬화/역직렬화 방식이 적용됩니다.

     

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29093,localhost:29093");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

     

    DEFAULT_VALUE_SERDE_CLASS_CONFIG.

    DEDAULT_VALUE_SERDE_CLASS_CONFIG 설정은 DEDAULT_KEY_SERDE_CLASS_CONFIG 와 동일합니다.

    다만, 직렬화/역직렬화의 대상이 Record 의 Key 이냐 Value 이냐의 차이가 있습니다.

     

     

     

     

    반응형
Designed by Tistory.