-
[Kafka] SyncGroup API 알아보기Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
Kafka Consumer 는 리밸런싱이라는 과정을 통해서 Consumer Group 을 형성합니다.
이 과정에서 FindCoordinator, JoinGroup, SyncGroup 등의 API 가 사용됩니다.
SyncGroup 은 각 Consumer 와 Topic-Partition 의 할당을 마무리짓고, 본격적인 Record Fetch 를 수행하는 마지막 과정입니다.
이번 글에서 SyncGroup API 에 대해서 상세히 알아보도록 하겠습니다.
https://westlife0615.tistory.com/944
[Kafka] FindCoordinator API 와 Group Coordinator 알아보기
- 목차 들어가며.Kafka Consumer 는 group.id 라는 설정을 통해서 Consumer Group 을 형성합니다.동일한 group.id 를 가지는 여러 Kafka Consumer 들은 하나의 Consumer Group 의 구성원이 됩니다.이 과정에서 Group Coordi
westlife0615.tistory.com
https://westlife0615.tistory.com/951
[Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ?
- 목차 들어가며.Kafka Consumer 는 group.id 라는 속성을 가집니다.group.id 라는 속성은 원격에서 실행되는 여러 개의 Kafka Consumer 들을 하나의 Group 으로 동작하게 만듭니다.오늘의 주제인 "JoinGroup API"
westlife0615.tistory.com
Consumer - Partition Assignment.
Consumer 들은 Group Coordinator 에게 JoinGroup API 를 요청하여 Consumer Group 에 참여 의사를 밝힙니다.
이 과정에서 Leader Consumer 가 선정되고, 나머지 Consumer 들은 Follower Consumer 가 됩니다.
Leader Consumer 는 JoinGroup API 의 응답으로써 모든 Consumer 들의 member id 정보를 획득합니다.
그리고 Leader Consumer 는 Group Coordinator 로부터 전달받은 Consumer 의 정보를 기반으로 Consumer - Partition 의 할당을 수행합니다.
이 과정에서 Leader Consumer 는 2개의 API 를 필요로합니다.
먼저 Metadata API 를 통해서 토픽과 파티션의 정보를 획득합니다.
그리고 JoinGroup API 를 통해서 모든 Consumer 들의 member-id 를 가지게 됩니다.
이제 Partition Assignment 전략에 의거해서 파티션을 Consumer 에게 할당할 수 있게 됩니다.
할당 방식은 크게 Range, Sticky, RoundRobin 으로 나뉩니다.
이러한 방식 중 하나를 통해서 Consumer 와 Partition 을 할당합니다.
그리고 SyncGroup API 의 요청으로 Member-Partition 할당 정보는 Group Coordinator 에게 제공합니다.
Leader Consumer 는 대략 아래와 같은 형식으로 Group Coordinator 에게 할당 정보를 전달합니다.
{ "group_id": "my-consumer-group", "generation_id": 5, "member_id": "consumer-1", "assignments": [ { "member_id": "consumer-1", "assignment": "topicA-0, topicA-1" }, { "member_id": "consumer-2", "assignment": "topicA-2" } ] }
그리고 Group Coordinator 는 Leader Consumer 가 할당한 정보를 Follower Consumer 들에게 제공합니다.
Group Coordinator 가 Follower Consumer 에게 Assignment 정보를 제공하는 방식을 SyncGroup 의 응답을 통해서 전달합니다.
이러한 방식이 가능한 이유는 모든 Consumer 들은 의무적으로 SyncGroup API 를 Group Coordinator 에게 전달하기 때문입니다.
Leader Consumer 와 Follower Consumer 의 SyncGroup API 요청의 차이점이 있다면,
Leader Consumer 는 Assignment 정보를 Group Coordinator 에게 제공하지만,
Follower Consumer 는 Assignment 정보를 제공하지 않습니다.
그 이유는 Assignment 행위는 Leader Consumer 만이 가능하기 때문이죠.
이렇게 Follower Consumer 는 Leader Consumer 가 제출한 Assignment 정보를 Group Coordinator 에 의해서 전달받을 수 있습니다.
그리고 Follower Consumer 는 자신에게 할당된 Partition 을 Fetch 할 수 있게 됩니다.
SyncGroup API Request 살펴보기.
우선 SyncGroup Request 의 포맷에 대해서 알아보도록 하겠습니다.
( 이 글에서는 Version 5 를 기준으로 설명합니다. )
SyncGroup Request (Version: 5) => group_id generation_id member_id group_instance_id protocol_type protocol_name [assignments] TAG_BUFFER group_id => COMPACT_STRING generation_id => INT32 member_id => COMPACT_STRING group_instance_id => COMPACT_NULLABLE_STRING protocol_type => COMPACT_NULLABLE_STRING protocol_name => COMPACT_NULLABLE_STRING assignments => member_id assignment TAG_BUFFER member_id => COMPACT_STRING assignment => COMPACT_BYTES
- group_id
- group_id 는 Consumer Group 을 식별할 수 있는 고유값입니다.
- group_id 를 기준으로 하나의 Consumer Group 이 생성됩니다.
- generation_id
- generation_id 는 얼마나 리밸런싱이 발생하였는지에 대한 수치입니다.
- Consumer 의 추가/제거 또는 파티션의 변경 등 여러가지 요소로 인해 리밸런싱이 발생하면 generation_id 의 값을 증가합니다.
- member_id
- member_id 는 JoinGroup API 를 통해서 Group Coordinator 로부터 부여받은 고유 식별값입니다.
- protocol_type
- protocol_type 은 consumer 또는 connect 중 한가지 값이 사용됩니다.
- Kafka Connect 또한 SyncGroup API 를 사용하기 때문에 구분을 위해서 protocol_type 값이 활용됩니다.
- protocol_name
- protocol_name 은 Consumer 와 Partition 의 할당 방식을 의미합니다.
- range, sticky, roundrobin 등의 값이 사용됩니다.
- assignments
- SyncGroup API 의 목적이라고도 할 수 있는 Member 와 Partition 의 할당 정보입니다.
- 이 데이터는 오로지 Leader Consumer 만이 사용합니다.
- 그 이유는 Member Partition Assignment 는 Leader Consumer 의 역할이자 책임이기 때문입니다.
- Follower Consumer 는 빈값으로 SyncGroup API 를 요청합니다.
SyncGroup Request TCP Packet 분석하기.
테스트 상황은 다음과 같습니다.
"test-topic" 이라는 이름의 Topic 이 존재하며, 이는 3개의 Partition 을 가집니다.
그리고 3개의 Consumer 들이 실행됩니다.
아래에는 3개의 TCP Packet 을 포착하여 작성하였구요.
첫 2개의 TCP Packet 은 Follower Consumer 의 SyncGroup API Request 이며,
마지막 TCP Packet 은 Leader Consumer 의 SyncGroup API Request 입니다.
IP sharp_hypatia.kafka-net.58928 > kafka1.9091: Flags [P.], seq 3074615952:3074616068, ack 879954872, win 501, options [nop,nop,TS val 848193994 ecr 467356024], length 116 0x0000: 4500 00a8 054a 4000 4006 dcd4 ac13 0008 E....J@.@....... 0x0010: ac13 0003 e630 2383 b742 ea90 3473 0bb8 .....0#..B..4s.. 0x0020: 8018 01f5 58cc 0000 0101 080a 328e 69ca ....X.......2.i. 0x0030: 1bdb 4978 0000 0070 000e 0003 0000 0004 ..Ix...p........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3466 _is_client_id-4f 0x0080: 3335 6639 6436 2d66 6235 662d 3433 6436 35f9d6-fb5f-43d6 0x0090: 2d38 3336 342d 6431 6361 3438 3233 6135 -8364-d1ca4823a5 0x00a0: 3563 ffff 0000 0000 5c...... IP condescending_lehmann.kafka-net.59688 > kafka1.9091: Flags [P.], seq 3288129610:3288129726, ack 3850761461, win 501, options [nop,nop,TS val 90071823 ecr 1304358968], length 116 0x0000: 4500 00a8 9bbc 4000 4006 4663 ac13 0007 E.....@.@.Fc.... 0x0010: ac13 0003 e928 2383 c3fc e04a e585 f4f5 .....(#....J.... 0x0020: 8018 01f5 58cb 0000 0101 080a 055e 630f ....X........^c. 0x0030: 4dbe f038 0000 0070 000e 0003 0000 0004 M..8...p........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6561 _is_client_id-ea 0x0080: 3861 3866 3236 2d64 3263 332d 3463 3964 8a8f26-d2c3-4c9d 0x0090: 2d61 3238 612d 3233 3539 3662 6632 6136 -a28a-23596bf2a6 0x00a0: 6433 ffff 0000 0000 d3...... IP happy_booth.kafka-net.59782 > kafka1.9091: Flags [P.], seq 2743252020:2743252406, ack 3754466135, win 501, options [nop,nop,TS val 928502324 ecr 4115129766], length 386 0x0000: 4500 01b6 a7f1 4000 4006 391e ac13 0009 E.....@.@.9..... 0x0010: ac13 0003 e986 2383 a382 b434 dfc8 9b57 ......#....4...W 0x0020: 8018 01f5 59db 0000 0101 080a 3757 d234 ....Y.......7W.4 0x0030: f547 e5a6 0000 017e 000e 0003 0000 0005 .G.....~........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6534 _is_client_id-e4 0x0080: 6661 3662 6239 2d65 3032 382d 3464 3963 fa6bb9-e028-4d9c 0x0090: 2d61 6366 652d 6663 3034 6533 3831 6563 -acfe-fc04e381ec 0x00a0: 3835 ffff 0000 0003 0036 7468 6973 5f69 85.......6this_i 0x00b0: 735f 636c 6965 6e74 5f69 642d 6534 6661 s_client_id-e4fa 0x00c0: 3662 6239 2d65 3032 382d 3464 3963 2d61 6bb9-e028-4d9c-a 0x00d0: 6366 652d 6663 3034 6533 3831 6563 3835 cfe-fc04e381ec85 0x00e0: 0000 001e 0000 0000 0001 000a 7465 7374 ............test 0x00f0: 2d74 6f70 6963 0000 0001 0000 0001 0000 -topic.......... 0x0100: 0000 0036 7468 6973 5f69 735f 636c 6965 ...6this_is_clie 0x0110: 6e74 5f69 642d 3466 3335 6639 6436 2d66 nt_id-4f35f9d6-f 0x0120: 6235 662d 3433 6436 2d38 3336 342d 6431 b5f-43d6-8364-d1 0x0130: 6361 3438 3233 6135 3563 0000 001e 0000 ca4823a55c...... 0x0140: 0000 0001 000a 7465 7374 2d74 6f70 6963 ......test-topic 0x0150: 0000 0001 0000 0000 0000 0000 0036 7468 .............6th 0x0160: 6973 5f69 735f 636c 6965 6e74 5f69 642d is_is_client_id- 0x0170: 6561 3861 3866 3236 2d64 3263 332d 3463 ea8a8f26-d2c3-4c 0x0180: 3964 2d61 3238 612d 3233 3539 3662 6632 9d-a28a-23596bf2 0x0190: 6136 6433 0000 001e 0000 0000 0001 000a a6d3............ 0x01a0: 7465 7374 2d74 6f70 6963 0000 0001 0000 test-topic...... 0x01b0: 0002 0000 0000 ......
Leader Consumer 의 SyncGroup API Request 인 마지막 TCP Packet 에 주목하면,
Follower Consumer 의 SyncGroup API Request 에 비해서 길이가 더 깁니다.
Length 는 바이트의 갯수가 386 개로 Follower Consumer 의 Packet Payload 의 116 보다 훨씬 깁니다.
이렇게 길이의 차이가 발생하는 이유는 Assignments 정보의 유무입니다.
Leader Consumer 는 Group Coordinator 에게 Assignments 정보를 제공합니다.
Packet 의 Payload 를 디코딩하면 아래와 같이 표현되며,
각 Consumer 의 member id 와 Topic-Partition 의 연결정보를 나타냅니다.
this_is_client_id-e4fa6bb9-e028-4d9c-acfe-fc04e381ec85 - test-topic 2 this_is_client_id-4f35f9d6-fb5f-43d6-8364-d1ca4823a55c - test-topic 0 this_is_client_id-ea8a8f26-d2c3-4c9d-a28a-23596bf2a6d3 - test-topic 1
SyncGroup Response 살펴보기.
SyncGroup Response 의 포맷은 단순합니다.
Group Coordinator 가 Consumer 들에게 자신에게 할당된 Partition 의 정보를 제공합니다.
SyncGroup Response (Version: 5) => throttle_time_ms error_code protocol_type protocol_name assignment TAG_BUFFER throttle_time_ms => INT32 error_code => INT16 protocol_type => COMPACT_NULLABLE_STRING protocol_name => COMPACT_NULLABLE_STRING assignment => COMPACT_BYTES
아래의 세개의 Packet 는 각 Consumer 가 자신에게 할당된 Partition 정보를 SyncGroup Response 를 통해서 전달받습니다.
Leader / Follower Consumer 에 구분없이 자신에 할당된 Partition 을 전달받게 됩니다.
IP kafka1.9091 > loving_morse.kafka-net.37940: Flags [P.], seq 1201:1249, ack 915, win 50470, options [nop,nop,TS val 468485600 ecr 849323560], length 48 0x0000: 4500 0064 5028 4000 4006 923a ac13 0003 E..dP(@.@..:.... 0x0010: ac13 0008 2383 9434 ba89 964b 04cb 9248 ....#..4...K...H 0x0020: 8018 c526 5888 0000 0101 080a 1bec 85e0 ...&X........... 0x0030: 329f a628 0000 002c 0000 0005 0000 0000 2..(...,........ 0x0040: 0000 0000 001e 0000 0000 0001 000a 7465 ..............te 0x0050: 7374 2d74 6f70 6963 0000 0001 0000 0000 st-topic........ 0x0060: 0000 0000 IP kafka1.9091 > sharp_williams.kafka-net.47790: Flags [P.], seq 663:711, ack 581, win 50470, options [nop,nop,TS val 4116259338 ecr 929631885], length 48 0x0000: 4500 0064 c69b 4000 4006 1bc6 ac13 0003 E..d..@.@....... 0x0010: ac13 0009 2383 baae e454 af40 3cfe 00ab ....#....T.@<... 0x0020: 8018 c526 5889 0000 0101 080a f559 220a ...&X........Y". 0x0030: 3769 0e8d 0000 002c 0000 0004 0000 0000 7i.....,........ 0x0040: 0000 0000 001e 0000 0000 0001 000a 7465 ..............te 0x0050: 7374 2d74 6f70 6963 0000 0001 0000 0002 st-topic........ 0x0060: 0000 0000 .... IP kafka1.9091 > priceless_cannon.kafka-net.51794: Flags [P.], seq 663:711, ack 581, win 50470, options [nop,nop,TS val 1305488544 ecr 91201388], length 48 0x0000: 4500 0064 84b1 4000 4006 5db2 ac13 0003 E..d..@.@.]..... 0x0010: ac13 0007 2383 ca52 167c 42b1 6577 b109 ....#..R.|B.ew.. 0x0020: 8018 c526 5887 0000 0101 080a 4dd0 2ca0 ...&X.......M.,. 0x0030: 056f 9f6c 0000 002c 0000 0004 0000 0000 .o.l...,........ 0x0040: 0000 0000 001e 0000 0000 0001 000a 7465 ..............te 0x0050: 7374 2d74 6f70 6963 0000 0001 0000 0001 st-topic........ 0x0060: 0000 0000 ....
Kafka Cluster 를 실행하기 위한 Docker Command.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2 docker run -d --name kafka3 --hostname kafka3 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=3 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:39092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:39092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 39092:39092 \ bitnami/kafka:3.1.2
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 [Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ? (0) 2024.06.30 [Kafka] FindCoordinator API 와 Group Coordinator 알아보기 (0) 2024.06.29 - group_id