-
[Kafka] FindCoordinator API 와 Group Coordinator 알아보기Kafka/Kafka Consumer 2024. 6. 29. 14:46반응형
- 목차
들어가며.
Kafka Consumer 는 group.id 라는 설정을 통해서 Consumer Group 을 형성합니다.
동일한 group.id 를 가지는 여러 Kafka Consumer 들은 하나의 Consumer Group 의 구성원이 됩니다.
이 과정에서 Group Coordinator 라는 브로커가 선출됩니다.
Group Coordinator 는 동일한 group.id 를 가지는 여러 Consumer 들이 하나의 Group 으로써 유기적인 동작을 할 수 있도록 도움을 제공합니다.
이번 글에서는 Kafka Consumer 가 FindCoordinator API 를 통해서 어떻게 Group Coordinator 와 연결되는지에 대해서 알아보려고 합니다.
FindCoordinator API 의 TCP Packet 분석.
Kafka Consumer 를 실행시키기 위해서 기본적으로 또는 필수적으로 group.id 속성이 요구됩니다.
일반적으로 아래와 같은 형식을 Kafka Consumer 는 기본적으로 동작합니다.
from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_my_client', 'bootstrap.servers': 'localhost:19092', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, } consumer = Consumer(config)
Consumer 객체가 생성되는 과정에서 Consumer 는 bootstrap.servers 에 등록된 브로커에게 "FindCoordinator API" 를 요청합니다.
이를 TCP Packet 관점에서 살펴보면 아래와 같습니다.
IP 172.19.0.1.58448 > kafka1.19092: Flags [P.], seq 128:179, ack 479, win 512, options [nop,nop,TS val 2758270431 ecr 1950747460], length 51 0x0000: 4500 0067 20a8 4000 4006 c1be ac13 0001 E..g..@.@....... 0x0010: ac13 0003 e450 4a94 a7cc 135d 7ffb b3d5 .....PJ....].... 0x0020: 8018 0200 5884 0000 0101 080a a467 dddf ....X........g.. 0x0030: 7446 0b44 0000 002f 000a 0002 0000 0003 tF.D.../........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 116d 792d 636f 6e73 756d 6572 ent..my-consumer 0x0060: 2d67 726f 7570 00 -group.
위 TCP Packet 은 tcpdump 을 통해서 포착한 Packet 이구요.
"172.0.19.1:58448" 인 Consumer 가 kafka1.19092 인 브로커에게 FindCoordinator API 를 요청합니다.
그리고 0x0034 에 해당하는 Bytes 인 "000a" 는 10 번 API 를 의미하며 이는 FindCoordinator 의 API Key 에 해당합니다.
이러한 방식으로 Consumer 는 Bootstrap Server 인 Broker 에게 FindCoordinator API 를 요청하게 되구요.
Broker 는 Consumer 에게 응답으로 Group Coordinator 에 해당하는 Broker 의 주소 정보를 응답합니다.
FindCoordinator 응답.
아래의 TCP Packet 은 FindCoordinator 요청의 대한 응답에 해당합니다.
IP kafka1.19092 > 172.19.0.1.58448: Flags [P.], seq 479:518, ack 179, win 50592, options [nop,nop,TS val 1950747461 ecr 2758270431], length 39 0x0000: 4500 005b fafc 4000 4006 e775 ac13 0003 E..[..@.@..u.... 0x0010: ac13 0001 4a94 e450 7ffb b3d5 a7cc 1390 ....J..P........ 0x0020: 8018 c5a0 5878 0000 0101 080a 7446 0b45 ....Xx......tF.E 0x0030: a467 dddf 0000 0023 0000 0003 0000 0000 .g.....#........ 0x0040: 0000 0004 4e4f 4e45 0000 0001 0009 6c6f ....NONE......lo 0x0050: 6361 6c68 6f73 7400 004a 94 calhost..J.
이를 분석해보면,
Consumer 로부터 FindCoordinator 요청을 받은 "kafka1:19092" 브로커가 "172.0.19.1:58448" 인 Consumer 에게 Group Coordinator 의 정보를 응답합니다.
Packet 의 Payload 를 중에서 가장 마지막 부분인 "6c6f 6361 6c68 6f73 7400 004a 94" 이 Group Coordinator 의 주소 정보에 해당하는데요.
이를 아스키코드와 십진수로 변환하게 되면, "localhost:19092" 로 해독됩니다.
이러한 방식으로 Broker 는 Group Coordinator 의 정보를 Broker 에게 제공합니다.
__consumer_offsets Topic 은 언제 생성되는가 ?
Kafka 는 내장 토픽을 가집니다.
대표적으로 Consumer 의 Consume Offset 을 관리하는 __consumer_offsets Topic 과 __transaction_state Topic 이 있습니다.
이들은 Lazy 하게 생성되는 특징이 있습니다.
__consumer_offsets Topic 은 최초의 Consumer 가 실행되는 시점에 생성됩니다.
카프카 클러스터가 구축되는 시점에 미리 생성되지 않습니다.
__transaction_state Topic 도 이하동문으로 Transactional Producer 의 실행 시점에 생성됩니다.
특히 __consumer_offsets Topic 이 생성되는 구체적인 시점은 FindCoordinator API 와 관련이 깊습니다.
Consumer 가 FindCoordinator API 를 요청할 때에 "__consumer_offsets" Topic 이 존재하지 않다면,"__consumer_offsets" Topic 을 생성합니다.
아래의 TCP Packet 은 "FindCoordinator" 요청의 응답입니다.
혹시 "The.coordinator.is.not.available" 이라는 표현이 보이시나요 ?
이것은 아직까지 "__consumer_offsets" 토픽이 존재하지 않기 때문에 Coordinator 를 사용할 수 없다는 의미의 Error 응답입니다.
그리고 "0x0040" 에 해당하는 "000f" 가 바로 "COORDINATOR_NOT_AVAILABLE" 에 해당하는 15번 코드입니다.
IP kafka1.19092 > 172.19.0.1.56788: Flags [P.], seq 503:562, ack 164, win 50599, options [nop,nop,TS val 1952187354 ecr 2759710287], length 59 0x0000: 4500 006f 52ef 4000 4006 8f6f ac13 0003 E..oR.@.@..o.... 0x0010: ac13 0001 4a94 ddd4 56eb bb96 c0aa 4afb ....J...V.....J. 0x0020: 8018 c5a7 588c 0000 0101 080a 745c 03da ....X.......t\.. 0x0030: a47d d64f 0000 0037 0000 0003 0000 0000 .}.O...7........ 0x0040: 000f 0021 5468 6520 636f 6f72 6469 6e61 ...!The.coordina 0x0050: 746f 7220 6973 206e 6f74 2061 7661 696c tor.is.not.avail 0x0060: 6162 6c65 2eff ffff ff00 00ff ffff ff able...........
그리고 Consumer 는 Coordinator 가 정상적으로 복구될 때까지 FindCoordinator 요청을 반복합니다.
브로커에서 "__consumer_offsets" 토픽이 생성 완료되면, FindCoordinator 요청은 정상적으로 동작합니다.
카프카 클러스터 실행을 위한 Docker Command.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 [Kafka] SyncGroup API 알아보기 (0) 2024.06.30 [Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ? (0) 2024.06.30