ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] FindCoordinator API 와 Group Coordinator 알아보기
    Kafka/Kafka Consumer 2024. 6. 29. 14:46
    반응형

    - 목차

     

    들어가며.

    Kafka Consumer 는 group.id 라는 설정을 통해서 Consumer Group 을 형성합니다.

    동일한 group.id 를 가지는 여러 Kafka Consumer 들은 하나의 Consumer Group 의 구성원이 됩니다.

    이 과정에서 Group Coordinator 라는 브로커가 선출됩니다.

    Group Coordinator 는 동일한 group.id 를 가지는 여러 Consumer 들이 하나의 Group 으로써 유기적인 동작을 할 수 있도록 도움을 제공합니다.

    이번 글에서는 Kafka Consumer 가 FindCoordinator API 를 통해서 어떻게 Group Coordinator 와 연결되는지에 대해서 알아보려고 합니다.

     

    FindCoordinator API 의 TCP Packet 분석.

    Kafka Consumer 를 실행시키기 위해서 기본적으로 또는 필수적으로 group.id 속성이 요구됩니다.

    일반적으로 아래와 같은 형식을 Kafka Consumer 는 기본적으로 동작합니다.

     

    from confluent_kafka import Consumer, KafkaError
    
    config = {
        'client.id': 'this_is_my_client',
        'bootstrap.servers': 'localhost:19092',
        'group.id': 'my-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
    }
    
    consumer = Consumer(config)

     

    Consumer 객체가 생성되는 과정에서 Consumer 는 bootstrap.servers 에 등록된 브로커에게 "FindCoordinator API" 를 요청합니다.

    이를 TCP Packet 관점에서 살펴보면 아래와 같습니다.

     

    IP 172.19.0.1.58448 > kafka1.19092: Flags [P.], seq 128:179, ack 479, win 512, options [nop,nop,TS val 2758270431 ecr 1950747460], length 51
    	0x0000:  4500 0067 20a8 4000 4006 c1be ac13 0001  E..g..@.@.......
    	0x0010:  ac13 0003 e450 4a94 a7cc 135d 7ffb b3d5  .....PJ....]....
    	0x0020:  8018 0200 5884 0000 0101 080a a467 dddf  ....X........g..
    	0x0030:  7446 0b44 0000 002f 000a 0002 0000 0003  tF.D.../........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 116d 792d 636f 6e73 756d 6572  ent..my-consumer
    	0x0060:  2d67 726f 7570 00                        -group.

     

    위 TCP Packet 은 tcpdump 을 통해서 포착한 Packet 이구요.

    "172.0.19.1:58448" 인 Consumer 가 kafka1.19092 인 브로커에게 FindCoordinator API 를 요청합니다.

    그리고 0x0034 에 해당하는 Bytes 인 "000a" 는 10 번 API 를 의미하며 이는 FindCoordinator 의 API Key 에 해당합니다.

     

    이러한 방식으로 Consumer 는 Bootstrap Server 인 Broker 에게 FindCoordinator API 를 요청하게 되구요.

    Broker 는 Consumer 에게 응답으로 Group Coordinator 에 해당하는 Broker 의 주소 정보를 응답합니다.

     

    FindCoordinator 응답.

    아래의 TCP Packet 은 FindCoordinator 요청의 대한 응답에 해당합니다.

    IP kafka1.19092 > 172.19.0.1.58448: Flags [P.], seq 479:518, ack 179, win 50592, options [nop,nop,TS val 1950747461 ecr 2758270431], length 39
    	0x0000:  4500 005b fafc 4000 4006 e775 ac13 0003  E..[..@.@..u....
    	0x0010:  ac13 0001 4a94 e450 7ffb b3d5 a7cc 1390  ....J..P........
    	0x0020:  8018 c5a0 5878 0000 0101 080a 7446 0b45  ....Xx......tF.E
    	0x0030:  a467 dddf 0000 0023 0000 0003 0000 0000  .g.....#........
    	0x0040:  0000 0004 4e4f 4e45 0000 0001 0009 6c6f  ....NONE......lo
    	0x0050:  6361 6c68 6f73 7400 004a 94              calhost..J.

     

    이를 분석해보면,

    Consumer 로부터 FindCoordinator 요청을 받은 "kafka1:19092" 브로커가 "172.0.19.1:58448" 인 Consumer 에게 Group Coordinator 의 정보를 응답합니다.

    Packet 의 Payload 를 중에서 가장 마지막 부분인 "6c6f 6361 6c68 6f73 7400 004a 94" 이 Group Coordinator 의 주소 정보에 해당하는데요.

    이를 아스키코드와 십진수로 변환하게 되면, "localhost:19092" 로 해독됩니다.

    이러한 방식으로 Broker 는 Group Coordinator 의 정보를 Broker 에게 제공합니다.

     

    __consumer_offsets Topic 은 언제 생성되는가 ?

    Kafka 는 내장 토픽을 가집니다.

    대표적으로 Consumer 의 Consume Offset 을 관리하는 __consumer_offsets Topic 과 __transaction_state Topic 이 있습니다.

    이들은 Lazy 하게 생성되는 특징이 있습니다.

    __consumer_offsets Topic 은 최초의 Consumer 가 실행되는 시점에 생성됩니다.

    카프카 클러스터가 구축되는 시점에 미리 생성되지 않습니다.

    __transaction_state Topic 도 이하동문으로 Transactional Producer 의 실행 시점에 생성됩니다.

     

    특히 __consumer_offsets Topic 이 생성되는 구체적인 시점은 FindCoordinator API 와 관련이 깊습니다.

    Consumer 가 FindCoordinator API 를 요청할 때에 "__consumer_offsets" Topic 이 존재하지 않다면,"__consumer_offsets" Topic 을 생성합니다.

     

    아래의 TCP Packet 은 "FindCoordinator" 요청의 응답입니다.

    혹시 "The.coordinator.is.not.available" 이라는 표현이 보이시나요 ?

    이것은 아직까지 "__consumer_offsets" 토픽이 존재하지 않기 때문에 Coordinator 를 사용할 수 없다는 의미의 Error 응답입니다.

    그리고 "0x0040" 에 해당하는 "000f" 가 바로 "COORDINATOR_NOT_AVAILABLE" 에 해당하는 15번 코드입니다.

    IP kafka1.19092 > 172.19.0.1.56788: Flags [P.], seq 503:562, ack 164, win 50599, options [nop,nop,TS val 1952187354 ecr 2759710287], length 59
    	0x0000:  4500 006f 52ef 4000 4006 8f6f ac13 0003  E..oR.@.@..o....
    	0x0010:  ac13 0001 4a94 ddd4 56eb bb96 c0aa 4afb  ....J...V.....J.
    	0x0020:  8018 c5a7 588c 0000 0101 080a 745c 03da  ....X.......t\..
    	0x0030:  a47d d64f 0000 0037 0000 0003 0000 0000  .}.O...7........
    	0x0040:  000f 0021 5468 6520 636f 6f72 6469 6e61  ...!The.coordina
    	0x0050:  746f 7220 6973 206e 6f74 2061 7661 696c  tor.is.not.avail
    	0x0060:  6162 6c65 2eff ffff ff00 00ff ffff ff    able...........

     

    그리고 Consumer 는 Coordinator 가 정상적으로 복구될 때까지 FindCoordinator 요청을 반복합니다.

    브로커에서 "__consumer_offsets" 토픽이 생성 완료되면, FindCoordinator 요청은 정상적으로 동작합니다.

     

     

    카프카 클러스터 실행을 위한 Docker Command.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2

     

    반응형
Designed by Tistory.