-
[Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ?Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
Kafka Consumer 는 group.id 라는 속성을 가집니다.
group.id 라는 속성은 원격에서 실행되는 여러 개의 Kafka Consumer 들을 하나의 Group 으로 동작하게 만듭니다.
오늘의 주제인 "JoinGroup API" 는 여러 개의 Kafka Consumer 들을 하나의 Group 으로 동작하게 만드는 API 중의 하나인데요.
카프카 브로커는 JoinGroup API 를 요청한 여러 개의 Kafka Consumer 들을 모아서 하나의 Consumer Group 으로 동작하게 만듭니다.
이어지는 내용에서 JoinGroup 과 관련된 상세한 이야기를 설명해보겠습니다.
JoinGroup API .
아래의 정보는 Kafka Consumer 가 브로커에게 요청하는 JoinGroup Request 의 포맷입니다.
( Version 은 현재 시점의 최신 버전인 Version 5 로 설명을 진행합니다. )
JoinGroup Request (Version: 5) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols] group_id => STRING session_timeout_ms => INT32 rebalance_timeout_ms => INT32 member_id => STRING group_instance_id => NULLABLE_STRING protocol_type => STRING protocols => name metadata name => STRING metadata => BYTES
Consumer 는 브로커에게 위와 같은 형식으로 브로커에게 Consumer Group 에 참여 의사를 밝히게 됩니다.
JoinGroup Request Data 의 개별적인 속성을 알아보도록 하겠습니다.
- group_id
- group_id 는 Consumer Group 의 식별값을 나타내는 그룹명입니다.
- group_id 를 JoinGroup Request 와 함께 Group Coordinator 에게 제출해야합니다.
- session_timeout.ms
- session_timeout_ms 는 Group Coordinator 가 Consumer 의 생존 여부를 판단하는 제한 시간입니다.
- Consumer 는 주기적으로 Coordinator 에게 Heartbeat 요청을 전달해야합니다.
- 만약 session_timeout_ms 동안 Coordinator 는 Consumer 의 Heartbeat 를 전달받지 못하면 Rabalance 가 시작됩니다.
- rebalance_timeout_ms
- rebalance_timeout_ms 는 리밸런싱가 허용되는 기간을 의미합니다.
- 즉, Consumer 들이 Consumer Group 에 참여할 수 있는 시간 범위를 의미하며,
이 기간 내에 Consumer 들은 JoinGroup 과 SyncGroup 을 요청해야합니다.
- group_instance_id
- group_instance_id 는 Consumer Group 의 Consumer 들이 Static Membership 를 유지하기 위한 설정인데요.
- ( 이 내용은 추후에 새로운 글로 설명드리겠습니다. )
- member_id
- Consumer Group 에 참여하는 Consumer 의 고유 ID 를 의미합니다.
- 즉, Consumer 를 식별할 수 있는 ID 로써 Group Coordinator 가 부여해주게 됩니다.
- protocol_type
- consumer 또는 connect 중의 하나에 해당합니다.
- Kafka Connect 의 Worker 들도 Kafka Consumer 와 같은 Join/Sync 의 과정을 수행하게 되며, 동일한 API 를 사용합니다.
- protocols
- Partition Assignment Strategy 을 지정합니다.
- Kafka 는 일반적인 시스템들과 다르게 client-side 에서 Partition 이 할당됩니다.
- 그래서 클라이언트는 어떠한 방식으로 Consumer 와 Partition 을 매핑할지에 대한 방식을 브로커에게 제출합니다.
- range, roundrobin, sticky 등의 방식이 존재합니다.
JoinGroup Request 의 TCP Packet 살펴보기.
실제로 JoinGroup Request 가 어떠한 방식으로 브로커에게 전달되는지 확인해봅니다.
아래는 간단하게 구현된 Kafka Consumer 스크립트입니다.
Consumer 를 실행하여 JoinGroup 의 과정을 살펴볼 예정이라 Record 를 Polling 하는 로직은 존재하지 않습니다.
from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_client_id', 'bootstrap.servers': 'kafka1:9091', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, } consumer = Consumer(config) consumer.subscribe(['test-topic'])
위 스크립트를 실행하고, tcpdump 를 통해서 아래의 TCP Packet 들을 확인할 수 있습니다.
총 4개의 Packet 이 존재합니다.
2개의 Packet 은 JoinGroup Request Packet, 2개의 Packet 은 JoinGroup Response 에 대한 패킷입니다.
IP wizardly_thompson.kafka-net.54392 > kafka1.9091: Flags [P.], seq 77:244, ack 440, win 501, options [nop,nop,TS val 82477355 ecr 1296764498], length 167 0x0000: 4500 00db c9f4 4000 4006 17f8 ac13 0007 E.....@.@....... 0x0010: ac13 0003 d478 2383 ce8e 550c 0d00 7979 .....x#...U...yy 0x0020: 8018 01f5 58fe 0000 0101 080a 04ea 812b ....X..........+ 0x0030: 4d4b 0e52 0000 00a3 000b 0005 0000 0002 MK.R............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 afc8 0004 93e0 0000 -group.......... 0x0070: ffff 0008 636f 6e73 756d 6572 0000 0002 ....consumer.... 0x0080: 0005 7261 6e67 6500 0000 2000 0300 0000 ..range......... 0x0090: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00a0: 0000 0000 00ff ffff ff00 0000 0a72 6f75 .............rou 0x00b0: 6e64 726f 6269 6e00 0000 2000 0300 0000 ndrobin......... 0x00c0: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00d0: 0000 0000 00ff ffff ff00 00 ........... IP kafka1.9091 > wizardly_thompson.kafka-net.54392: Flags [P.], seq 440:522, ack 244, win 50559, options [nop,nop,TS val 1296764502 ecr 82477355], length 82 0x0000: 4500 0086 3891 4000 4006 a9b0 ac13 0003 E...8.@.@....... 0x0010: ac13 0007 2383 d478 0d00 7979 ce8e 55b3 ....#..x..yy..U. 0x0020: 8018 c57f 58a9 0000 0101 080a 4d4b 0e56 ....X.......MK.V 0x0030: 04ea 812b 0000 004e 0000 0002 0000 0000 ...+...N........ 0x0040: 004f ffff ffff 0000 0000 0036 7468 6973 .O.........6this 0x0050: 5f69 735f 636c 6965 6e74 5f69 642d 6366 _is_client_id-cf 0x0060: 6438 3262 6336 2d33 3139 612d 3464 6431 d82bc6-319a-4dd1 0x0070: 2d38 3962 382d 6464 3436 3335 3238 6465 -89b8-dd463528de 0x0080: 3433 0000 0000 43.... IP wizardly_thompson.kafka-net.54392 > kafka1.9091: Flags [P.], seq 244:465, ack 522, win 501, options [nop,nop,TS val 82477357 ecr 1296764502], length 221 0x0000: 4500 0111 c9f5 4000 4006 17c1 ac13 0007 E.....@.@....... 0x0010: ac13 0003 d478 2383 ce8e 55b3 0d00 79cb .....x#...U...y. 0x0020: 8018 01f5 5934 0000 0101 080a 04ea 812d ....Y4.........- 0x0030: 4d4b 0e56 0000 00d9 000b 0005 0000 0003 MK.V............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 afc8 0004 93e0 0036 -group.........6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 6366 6438 3262 6336 2d33 3139 612d d-cfd82bc6-319a- 0x0090: 3464 6431 2d38 3962 382d 6464 3436 3335 4dd1-89b8-dd4635 0x00a0: 3238 6465 3433 ffff 0008 636f 6e73 756d 28de43....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 IP kafka1.9091 > wizardly_thompson.kafka-net.54392: Flags [FP.], seq 522:757, ack 466, win 50470, options [nop,nop,TS val 1296811801 ecr 82498866], length 235 0x0000: 4500 011f 389c 4000 4006 a90c ac13 0003 E...8.@.@....... 0x0010: ac13 0007 2383 d478 0d00 79cb ce8e 5691 ....#..x..y...V. 0x0020: 8019 c526 5942 0000 0101 080a 4d4b c719 ...&YB......MK.. 0x0030: 04ea d532 0000 00e7 0000 0003 0000 0000 ...2............ 0x0040: 0000 0000 0003 0005 7261 6e67 6500 3674 ........range.6t 0x0050: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0060: 2d63 6664 3832 6263 362d 3331 3961 2d34 -cfd82bc6-319a-4 0x0070: 6464 312d 3839 6238 2d64 6434 3633 3532 dd1-89b8-dd46352 0x0080: 3864 6534 3300 3674 6869 735f 6973 5f63 8de43.6this_is_c 0x0090: 6c69 656e 745f 6964 2d63 6664 3832 6263 lient_id-cfd82bc 0x00a0: 362d 3331 3961 2d34 6464 312d 3839 6238 6-319a-4dd1-89b8 0x00b0: 2d64 6434 3633 3532 3864 6534 3300 0000 -dd463528de43... 0x00c0: 0100 3674 6869 735f 6973 5f63 6c69 656e ..6this_is_clien 0x00d0: 745f 6964 2d63 6664 3832 6263 362d 3331 t_id-cfd82bc6-31 0x00e0: 3961 2d34 6464 312d 3839 6238 2d64 6434 9a-4dd1-89b8-dd4 0x00f0: 3633 3532 3864 6534 33ff ff00 0000 2000 63528de43....... 0x0100: 0300 0000 0100 0a74 6573 742d 746f 7069 .......test-topi 0x0110: 6300 0000 0000 0000 00ff ffff ff00 00 c..............
첫번째 JoinGroup Request.
아래의 TCP Packet 은 Consumer 가 Broker 에게 요청하는 JoinGroup Request 입니다.
IP wizardly_thompson.kafka-net.54392 > kafka1.9091: Flags [P.], seq 77:244, ack 440, win 501, options [nop,nop,TS val 82477355 ecr 1296764498], length 167 0x0000: 4500 00db c9f4 4000 4006 17f8 ac13 0007 E.....@.@....... 0x0010: ac13 0003 d478 2383 ce8e 550c 0d00 7979 .....x#...U...yy 0x0020: 8018 01f5 58fe 0000 0101 080a 04ea 812b ....X..........+ 0x0030: 4d4b 0e52 0000 00a3 000b 0005 0000 0002 MK.R............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 afc8 0004 93e0 0000 -group.......... 0x0070: ffff 0008 636f 6e73 756d 6572 0000 0002 ....consumer.... 0x0080: 0005 7261 6e67 6500 0000 2000 0300 0000 ..range......... 0x0090: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00a0: 0000 0000 00ff ffff ff00 0000 0a72 6f75 .............rou 0x00b0: 6e64 726f 6269 6e00 0000 2000 0300 0000 ndrobin......... 0x00c0: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00d0: 0000 0000 00ff ffff ff00 00 ...........
- wizardly_thompson.kafka-net.54392 > kafka1.9091
- "wizardly_thompson.kafka-net.54392" 은 Docker 로 실행된 Consumer 입니다.
- "kafka1.9091" 는 Group Coordinator 에 해당하는 브로커입니다.
- 0x0030: 4d4b 0e52 0000 00a3 000b 0005 0000 0002 MK.R............
- 0x0038 ~ 0x0039 에 해당하는 Byte 의 데이터는 "000b" 입니다.
- "000b" 는 십진수로 11 에 해당하는 값으로 이는 JoinGroup API Key 에 해당합니다.
- 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client
- 0x0043 바이트부터 "this_is_client_id" 라는 텍스트가 확인됩니다.
- 이는 Client ID 에 해당하는 정보입니다.
- 0x0043 바이트부터 "this_is_client_id" 라는 텍스트가 확인됩니다.
- 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer
- 0x0055 바이트부터 "my-consumer-group" 텍스트가 확인됩니다.
- 이는 group.id 에 해당하는 정보입니다.
- 0x0055 바이트부터 "my-consumer-group" 텍스트가 확인됩니다.
- 0x0070: ffff 0008 636f 6e73 756d 6572 0000 0002 ....consumer....
- 0x0074 바이트부터 "consumer" 라는 텍스트가 확인됩니다.
- 이는 Protocol Type 에 해당하는 정보입니다.
- 0x00b0: 6e64 726f 6269 6e00 0000 2000 0300 0000 ndrobin.........
- 0x00ad 바이트부터 "roundrobin" 라는 텍스트가 확인됩니다.
- 이는 RoundRobin 방식으로 Consumer 와 Partition 을 할당함을 의미합니다.
이를 요약하면 Consumer 가 Group Coordinator 에게 첫번째 JoinGroup Request 를 전달합니다.
여기서 중요한 점은 아직까지 member id 가 없다는 사실입니다.
Consumer 는 Group Coordinator 에게 2번의 JoinGroup Request 를 전달합니다.
첫번째 JoinGroup Request 에서는 member id 를 제외한 정보를 전달하여 Consumer Group 의 생성을 시작하게 됩니다.
첫번째 JoinGroup Response.
아래는 JoinGroup Request 의 응답입니다.
아래에서 주목할 점은 "this_is_client_id-cfd82bc6-319a-4dd1-89b8-dd463528de43" 인 member id 정보입니다.
Group Coordinator 는 Consumer 의 첫번째 JoinGroup Request 의 응답으로써 Member Id 를 생성하여 제공합니다.
이로써 Consumer 는 자신의 member id 를 가지게 됩니다.
마치 Kafka Producer 가 InitProducerId 요청을 통해서 Transaction Coordinator 로부터 Producer Id 와 Epoch 를 부여받는 과정과 유사합니다.
IP kafka1.9091 > wizardly_thompson.kafka-net.54392: Flags [P.], seq 440:522, ack 244, win 50559, options [nop,nop,TS val 1296764502 ecr 82477355], length 82 0x0000: 4500 0086 3891 4000 4006 a9b0 ac13 0003 E...8.@.@....... 0x0010: ac13 0007 2383 d478 0d00 7979 ce8e 55b3 ....#..x..yy..U. 0x0020: 8018 c57f 58a9 0000 0101 080a 4d4b 0e56 ....X.......MK.V 0x0030: 04ea 812b 0000 004e 0000 0002 0000 0000 ...+...N........ 0x0040: 004f ffff ffff 0000 0000 0036 7468 6973 .O.........6this 0x0050: 5f69 735f 636c 6965 6e74 5f69 642d 6366 _is_client_id-cf 0x0060: 6438 3262 6336 2d33 3139 612d 3464 6431 d82bc6-319a-4dd1 0x0070: 2d38 3962 382d 6464 3436 3335 3238 6465 -89b8-dd463528de 0x0080: 3433 0000 0000 43....
두번째 JoinGroup Request.
아래의 요청은 Consumer 가 Group Coordinator 에서 두번째로 요청하는 JoinGroup Request 입니다.
이전 JoinGroup Request 의 응답으로써 전달받은 member_id 가 추가되어 있음을 확인할 수 있습니다.
IP wizardly_thompson.kafka-net.54392 > kafka1.9091: Flags [P.], seq 244:465, ack 522, win 501, options [nop,nop,TS val 82477357 ecr 1296764502], length 221 0x0000: 4500 0111 c9f5 4000 4006 17c1 ac13 0007 E.....@.@....... 0x0010: ac13 0003 d478 2383 ce8e 55b3 0d00 79cb .....x#...U...y. 0x0020: 8018 01f5 5934 0000 0101 080a 04ea 812d ....Y4.........- 0x0030: 4d4b 0e56 0000 00d9 000b 0005 0000 0003 MK.V............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 afc8 0004 93e0 0036 -group.........6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 6366 6438 3262 6336 2d33 3139 612d d-cfd82bc6-319a- 0x0090: 3464 6431 2d38 3962 382d 6464 3436 3335 4dd1-89b8-dd4635 0x00a0: 3238 6465 3433 ffff 0008 636f 6e73 756d 28de43....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00
두번째 JoinGroup Response.
아래는 두번째 JoinGroup Request 에 대한 응답입니다.
아래에서 확인되는 응답 중에서 중요한 포인트는 leader_id, member_id, members 입니다.
IP kafka1.9091 > wizardly_thompson.kafka-net.54392: Flags [FP.], seq 522:757, ack 466, win 50470, options [nop,nop,TS val 1296811801 ecr 82498866], length 235 0x0000: 4500 011f 389c 4000 4006 a90c ac13 0003 E...8.@.@....... 0x0010: ac13 0007 2383 d478 0d00 79cb ce8e 5691 ....#..x..y...V. 0x0020: 8019 c526 5942 0000 0101 080a 4d4b c719 ...&YB......MK.. 0x0030: 04ea d532 0000 00e7 0000 0003 0000 0000 ...2............ 0x0040: 0000 0000 0003 0005 7261 6e67 6500 3674 ........range.6t 0x0050: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0060: 2d63 6664 3832 6263 362d 3331 3961 2d34 -cfd82bc6-319a-4 0x0070: 6464 312d 3839 6238 2d64 6434 3633 3532 dd1-89b8-dd46352 0x0080: 3864 6534 3300 3674 6869 735f 6973 5f63 8de43.6this_is_c 0x0090: 6c69 656e 745f 6964 2d63 6664 3832 6263 lient_id-cfd82bc 0x00a0: 362d 3331 3961 2d34 6464 312d 3839 6238 6-319a-4dd1-89b8 0x00b0: 2d64 6434 3633 3532 3864 6534 3300 0000 -dd463528de43... 0x00c0: 0100 3674 6869 735f 6973 5f63 6c69 656e ..6this_is_clien 0x00d0: 745f 6964 2d63 6664 3832 6263 362d 3331 t_id-cfd82bc6-31 0x00e0: 3961 2d34 6464 312d 3839 6238 2d64 6434 9a-4dd1-89b8-dd4 0x00f0: 3633 3532 3864 6534 33ff ff00 0000 2000 63528de43....... 0x0100: 0300 0000 0100 0a74 6573 742d 746f 7069 .......test-topi 0x0110: 6300 0000 0000 0000 00ff ffff ff00 00 c..............
먼저 JoinGroup Response 의 포맷을 확인해봅니다.
JoinGroup Response (Version: 5) => throttle_time_ms error_code generation_id protocol_name leader member_id [members] throttle_time_ms => INT32 error_code => INT16 generation_id => INT32 protocol_name => STRING leader => STRING member_id => STRING members => member_id group_instance_id metadata member_id => STRING group_instance_id => NULLABLE_STRING metadata => BYTES
- leader
- Group Coordinator 는 Consumer Group 에서 어떤 Consumer 가 Leader 인지 선정합니다.
- 일반적으로 가장 먼저 Group 에 참여한 Group Leader 가 됩니다.
- member_id
- JoinGroup API 를 요청한 Consumer 의 member_id 입니다.
- members
- Consumer Group 에 참여한 모든 Consumer 정보를 반환합니다.
두번째 JoinGroup 의 Response 의 경우에는 하나의 Consumer 만이 실행되었기 때문에 leader, member_id, members 가 모두 동일합니다.
Leader Consumer 는 JoinGroup 응답으로 member list 정보를 획득.
아래는 3개의 Consumer 를 실행시킨 후 포착한 JoinGroup Response Packet 들입니다.
첫번째 Packet 이 Group Coordinator 에 의해서 Leader 로 선정된 Consumer 의 JoinGroup Response 입니다.
그리고 나머지 Packet 은 일반 Consumer 의 JoinGroup Response Packet 인데요.
주목할 점은 오직 Leader Consumer 만이 모든 Consumer 들의 member id 를 전달받습니다.
그리고 Leader Consumer 는 이렇게 전달받은 member id 를 기준으로 Consumer 와 Partition 할당을 적용하게 됩니다.
IP kafka1.9091 > cranky_hugle.kafka-net.58818: Flags [P.], seq 2685707895:2685708318, ack 4113933153, win 50470, options [nop,nop,TS val 1301467559 ecr 87145182], length 423 0x0000: 4500 01db c6fb 4000 4006 19f1 ac13 0003 E.....@.@....... 0x0010: ac13 0007 2383 e5c2 a014 a677 f535 a361 ....#......w.5.a 0x0020: 8018 c526 59fe 0000 0101 080a 4d92 d1a7 ...&Y.......M... 0x0030: 0531 bade 0000 01a3 0000 0003 0000 0000 .1.............. 0x0040: 0000 0000 0006 0005 7261 6e67 6500 3674 ........range.6t 0x0050: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0060: 2d61 6631 6161 3236 322d 6662 3165 2d34 -af1aa262-fb1e-4 0x0070: 3133 302d 6234 3838 2d62 6237 6262 3762 130-b488-bb7bb7b 0x0080: 3330 6632 6600 3674 6869 735f 6973 5f63 30f2f.6this_is_c 0x0090: 6c69 656e 745f 6964 2d61 6631 6161 3236 lient_id-af1aa26 0x00a0: 322d 6662 3165 2d34 3133 302d 6234 3838 2-fb1e-4130-b488 0x00b0: 2d62 6237 6262 3762 3330 6632 6600 0000 -bb7bb7b30f2f... 0x00c0: 0300 3674 6869 735f 6973 5f63 6c69 656e ..6this_is_clien 0x00d0: 745f 6964 2d61 6631 6161 3236 322d 6662 t_id-af1aa262-fb 0x00e0: 3165 2d34 3133 302d 6234 3838 2d62 6237 1e-4130-b488-bb7 0x00f0: 6262 3762 3330 6632 66ff ff00 0000 2000 bb7b30f2f....... 0x0100: 0300 0000 0100 0a74 6573 742d 746f 7069 .......test-topi 0x0110: 6300 0000 0000 0000 00ff ffff ff00 0000 c............... 0x0120: 3674 6869 735f 6973 5f63 6c69 656e 745f 6this_is_client_ 0x0130: 6964 2d34 3234 6431 3131 652d 6664 3334 id-424d111e-fd34 0x0140: 2d34 3531 662d 3930 3461 2d66 3361 3138 -451f-904a-f3a18 0x0150: 6237 6236 3165 66ff ff00 0000 2000 0300 b7b61ef......... 0x0160: 0000 0100 0a74 6573 742d 746f 7069 6300 .....test-topic. 0x0170: 0000 0000 0000 00ff ffff ff00 0000 3674 ..............6t 0x0180: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0190: 2d63 6132 3232 3762 652d 6433 6163 2d34 -ca2227be-d3ac-4 0x01a0: 3530 312d 6162 3131 2d62 3634 6135 3535 501-ab11-b64a555 0x01b0: 6332 3265 61ff ff00 0000 2000 0300 0000 c22ea........... 0x01c0: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x01d0: 0000 0000 00ff ffff ff00 00 ........... IP kafka1.9091 > nice_feistel.kafka-net.38422: Flags [P.], seq 1062799684:1062799825, ack 3118343773, win 50470, options [nop,nop,TS val 464464616 ecr 845268787], length 141 0x0000: 4500 00c1 3d33 4000 4006 a4d2 ac13 0003 E...=3@.@....... 0x0010: ac13 0008 2383 9616 3f59 0944 b9de 265d ....#...?Y.D..&] 0x0020: 8018 c526 58e5 0000 0101 080a 1baf 2ae8 ...&X.........*. 0x0030: 3261 c733 0000 0089 0000 0003 0000 0000 2a.3............ 0x0040: 0000 0000 0006 0005 7261 6e67 6500 3674 ........range.6t 0x0050: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0060: 2d61 6631 6161 3236 322d 6662 3165 2d34 -af1aa262-fb1e-4 0x0070: 3133 302d 6234 3838 2d62 6237 6262 3762 130-b488-bb7bb7b 0x0080: 3330 6632 6600 3674 6869 735f 6973 5f63 30f2f.6this_is_c 0x0090: 6c69 656e 745f 6964 2d63 6132 3232 3762 lient_id-ca2227b 0x00a0: 652d 6433 6163 2d34 3530 312d 6162 3131 e-d3ac-4501-ab11 0x00b0: 2d62 3634 6135 3535 6332 3265 6100 0000 -b64a555c22ea... 0x00c0: 00 . IP kafka1.9091 > unruffled_faraday.kafka-net.47152: Flags [P.], seq 1558751371:1558751512, ack 4110337188, win 50470, options [nop,nop,TS val 4112238354 ecr 925576294], length 141 0x0000: 4500 00c1 2c7a 4000 4006 b58a ac13 0003 E...,z@.@....... 0x0010: ac13 0009 2383 b830 5ce8 a88b f4fe c4a4 ....#..0\....... 0x0020: 8018 c526 58e6 0000 0101 080a f51b c712 ...&X........... 0x0030: 372b 2c66 0000 0089 0000 0003 0000 0000 7+,f............ 0x0040: 0000 0000 0006 0005 7261 6e67 6500 3674 ........range.6t 0x0050: 6869 735f 6973 5f63 6c69 656e 745f 6964 his_is_client_id 0x0060: 2d61 6631 6161 3236 322d 6662 3165 2d34 -af1aa262-fb1e-4 0x0070: 3133 302d 6234 3838 2d62 6237 6262 3762 130-b488-bb7bb7b 0x0080: 3330 6632 6600 3674 6869 735f 6973 5f63 30f2f.6this_is_c 0x0090: 6c69 656e 745f 6964 2d34 3234 6431 3131 lient_id-424d111 0x00a0: 652d 6664 3334 2d34 3531 662d 3930 3461 e-fd34-451f-904a 0x00b0: 2d66 3361 3138 6237 6236 3165 6600 0000 -f3a18b7b61ef... 0x00c0: 00 .
카프카 클러스터를 실행하기 위한 Docker Command.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2 docker run -d --name kafka3 --hostname kafka3 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=3 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:39092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:39092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 39092:39092 \ bitnami/kafka:3.1.2
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 [Kafka] SyncGroup API 알아보기 (0) 2024.06.30 [Kafka] FindCoordinator API 와 Group Coordinator 알아보기 (0) 2024.06.29 - group_id