-
[Kafka] Consumer Heartbeat 의 내부 동작 원리Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
카프카 컨슈머는 실행 후 Consumer Group 을 형성합니다.
이 과정에서 컨슈머들은 브로커와 FindCoordinator, JoinGroup, SyncGroup 의 API 통신을 수행합니다.
이렇게 여러 컨슈머들을 하나의 그룹이 됩니다.
하나의 Consumer Group 된 컨슈머들은 SyncGroup API 를 마무리로 각자 자신이 소비해야할 파티션 목록을 부여받게 됩니다.
그리고 본격적으로 데이터의 소비를 시작합니다. ( Fetch API 를 통해서 )
카프카 컨슈머는 Fetch API 를 통해서 데이터를 소비하는 작업 뿐만 아니라 Heartbeat 요청을 브로커에게 꾸준히 전달합니다.
이때 Heartbeat 를 전달받는 브로커를 Group Coordinator 라고 부릅니다.
여러 컨슈머들은 자신에게 부여된 하나의 Group Coordinator 에게 Heartbeat 요청을 전송합니다.
Heartbeat 는 heartbeat.interval.ms 와 session.timeout.ms 설정에 영향을 받습니다.
heartbeat.interval.ms 를 주기로 컨슈머는 브로커에게 Heartbeat 요청을 전송합니다.
그리고 session.timeout.ms 으로 설정된 제한 기간동안 컨슈머가 브로커에게 Heartbeat 요청을 전송하지 않으면,
브로커는 해당 컨슈머가 비정상적이라고 판단하고 리밸런싱을 시작합니다.
출처 : https://www.lydtechconsulting.com/blog-kafka-rebalance-part1.html Heartbeat API 알아보기.
아래는 컨슈머와 브로커가 서로 주고받는 Heartbeat API 의 요청/응답 포맷입니다.
Heartbeat Request (Version: 4) => group_id generation_id member_id group_instance_id TAG_BUFFER group_id => COMPACT_STRING generation_id => INT32 member_id => COMPACT_STRING group_instance_id => COMPACT_NULLABLE_STRING Heartbeat Response (Version: 3) => throttle_time_ms error_code throttle_time_ms => INT32 error_code => INT16
Heartbeat API 요청의 Format 을 살펴보면 아래와 같습니다.
- group_id
- group_id 는 카프카 컨슈머가 소속된 Consumer Group 의 group.id 입니다.
- generation_id
- generation_id 는 Consumer Group 이 몇 번의 리밸런싱이 발생하였는지에 대한 숫자 값입니다.
- 새로운 컨슈머가 추가/제거되는 것처럼 Consumer Group 의 재구성된 횟수를 의미합니다.
- member_id
- member_id 는 카프카 컨슈머가 JoinGroup API Request 를 통해서 브로커로부터 부여받은 식별값입니다.
- 이 값은 Group 내에서 Consumer 가 가지는 고유한 값입니다.
- group.instance.id
- group.instance.id 는 Consumer Group 의 Static Membership 과 관련된 데이터입니다.
- 다른 글에서 Static Membership Assignment 에 대해서 설명하도록 하겠습니다.
위와 같은 포맷으로 컨슈머는 브로커에게 Heartbeat 를 요청합니다.
Heartbeat 의 TCP Packet 살펴보기.
아래의 정보는 컨슈머가 브로커에게 전송하는 Heartbeat API Request 와 Response 의 패킷입니다.
tcpdump 명령어를 통해서 아래의 패킷을 포착하였습니다.
// Heartbeat API Request 22:21:55.326856 IP romantic_blackburn.kafka-net.51460 > kafka2.9091: Flags [P.], seq 279:391, ack 547, win 501, options [nop,nop,TS val 3631458414 ecr 569066083], length 112 0x0000: 4500 00a4 76a1 4000 4006 6b81 ac13 0007 E...v.@.@.k..... 0x0010: ac13 0004 c904 2383 e6a0 8ba9 2619 0fbe ......#.....&... 0x0020: 8018 01f5 58c8 0000 0101 080a d873 a86e ....X........s.n 0x0030: 21eb 4263 0000 006c 000c 0003 0000 0006 !.Bc...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3761 _is_client_id-7a 0x0080: 3033 3064 3139 2d61 3631 382d 3430 3237 030d19-a618-4027 0x0090: 2d39 6433 332d 3830 3937 6462 3034 6332 -9d33-8097db04c2 0x00a0: 3466 ffff 4f.. // Heartbeat API Response 22:21:55.332608 IP kafka2.9091 > romantic_blackburn.kafka-net.51460: Flags [P.], seq 547:561, ack 391, win 50470, options [nop,nop,TS val 569066088 ecr 3631458414], length 14 0x0000: 4500 0042 e474 4000 4006 fe0f ac13 0004 E..B.t@.@....... 0x0010: ac13 0007 2383 c904 2619 0fbe e6a0 8c19 ....#...&....... 0x0020: 8018 c526 5866 0000 0101 080a 21eb 4268 ...&Xf......!.Bh 0x0030: d873 a86e 0000 000a 0000 0006 0000 0000 .s.n............ 0x0040: 0000
이를 간단히 살펴보면 0x0038 ~ 0x0039 에 해당하는 "000c" 의 값이 Heartbeat API 의 Key 에 해당합니다.
Heartbeat 의 API Key 는 12 번으로 "000c" 와 같이 표시됩니다.
"this_is_client_id" 와 "my-consumer-group" 그리고 "this_is_client_id-58dc67b4-e153-4a9d-b197-84c5c36c4b0b" 와 같이 client.id, group.id, member.id 등도 함께 표현됩니다.
그리고 브로커는 컨슈머에게 두번째 Packet 으로 응답을 합니다.
두번째 응답은 Heartbeat API Response 의 패킷입니다.
이들은 Correlation Id 를 통해서 서로의 요청/응답을 연결합니다.
첫번째 Packet 의 0x003e & 0x003f 의 "0006" 은 Heartbeat API Request 의 Correlation ID 입니다.
그리고 이와 연결되는 두번째 응답 패킷의 0x003a & 0x003fb 의 "0006" 은 Heartbeat API Response 의 Correlation ID 입니다.
이 값을 통해서 Request 와 Response 는 서로 연결됩니다.
heartbeat.interval.ms 알아보기.
heartbeat.interval.ms 의 기본값은 3초입니다.
이 시간설정을 변경해서 Heartbeat 전송 주기를 살펴봅니다.
아래와 같이 heartbeat.interval.ms 를 5초로 설정합니다.
그리고 Kafka Consumer 를 실행하게 되면, 5초 간격으로 Heartbeat API Request 가 전송됨을 확인할 수 있습니다.
from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_client_id', 'bootstrap.servers': 'kafka1:9091', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'heartbeat.interval.ms': 5000 } consumer = Consumer(config) consumer.subscribe(['test-topic'])
아래처럼 5초 간격으로 Heartbeat API Request 들이 요청됩니다.
// Heartbeat API Request 22:30:53.686116 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 2839933330:2839933442, ack 2406593168, win 501, options [nop,nop,TS val 3631996763 ecr 569604432], length 112 0x0000: 4500 00a4 a9fe 4000 4006 3824 ac13 0007 E.....@.@.8$.... 0x0010: ac13 0004 98dc 2383 a945 f192 8f71 b290 ......#..E...q.. 0x0020: 8018 01f5 58c8 0000 0101 080a d87b df5b ....X........{.[ 0x0030: 21f3 7950 0000 006c 000c 0003 0000 0006 !.yP...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6339 _is_client_id-c9 0x0080: 3365 6331 6364 2d32 6433 342d 3436 6233 3ec1cd-2d34-46b3 0x0090: 2d61 3931 342d 3266 6337 3834 6530 3631 -a914-2fc784e061 0x00a0: 3237 ffff 27.. // Heartbeat API Request 22:30:58.711047 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 192:304, ack 125, win 501, options [nop,nop,TS val 3632001788 ecr 569604441], length 112 0x0000: 4500 00a4 aa01 4000 4006 3821 ac13 0007 E.....@.@.8!.... 0x0010: ac13 0004 98dc 2383 a945 f252 8f71 b30c ......#..E.R.q.. 0x0020: 8018 01f5 58c8 0000 0101 080a d87b f2fc ....X........{.. 0x0030: 21f3 7959 0000 006c 000c 0003 0000 0008 !.yY...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6339 _is_client_id-c9 0x0080: 3365 6331 6364 2d32 6433 342d 3436 6233 3ec1cd-2d34-46b3 0x0090: 2d61 3931 342d 3266 6337 3834 6530 3631 -a914-2fc784e061 0x00a0: 3237 ffff 27.. // Heartbeat API Request 22:31:03.734301 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 304:416, ack 139, win 501, options [nop,nop,TS val 3632006811 ecr 569609472], length 112 0x0000: 4500 00a4 aa03 4000 4006 381f ac13 0007 E.....@.@.8..... 0x0010: ac13 0004 98dc 2383 a945 f2c2 8f71 b31a ......#..E...q.. 0x0020: 8018 01f5 58c8 0000 0101 080a d87c 069b ....X........|.. 0x0030: 21f3 8d00 0000 006c 000c 0003 0000 0009 !......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6339 _is_client_id-c9 0x0080: 3365 6331 6364 2d32 6433 342d 3436 6233 3ec1cd-2d34-46b3 0x0090: 2d61 3931 342d 3266 6337 3834 6530 3631 -a914-2fc784e061 0x00a0: 3237 ffff 27.. // Heartbeat API Request 22:31:08.753856 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 416:528, ack 153, win 501, options [nop,nop,TS val 3632011831 ecr 569614485], length 112 0x0000: 4500 00a4 aa05 4000 4006 381d ac13 0007 E.....@.@.8..... 0x0010: ac13 0004 98dc 2383 a945 f332 8f71 b328 ......#..E.2.q.( 0x0020: 8018 01f5 58c8 0000 0101 080a d87c 1a37 ....X........|.7 0x0030: 21f3 a095 0000 006c 000c 0003 0000 000a !......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6339 _is_client_id-c9 0x0080: 3365 6331 6364 2d32 6433 342d 3436 6233 3ec1cd-2d34-46b3 0x0090: 2d61 3931 342d 3266 6337 3834 6530 3631 -a914-2fc784e061 0x00a0: 3237 ffff 27..
session.timeout.ms 와 Rebalance 살펴보기.
session.timeout.ms 시간 이내에 Consumer 는 Broker 에게 Heartbeat 를 전송하지 못하면 Rebalance 가 발생합니다.
아래 예시는 session.timeout.ms 의 경과와 리밸런싱을 테스트하기 위해서 heartbeat.interval.ms 를 5초로 설정하고,
session.timeout.ms 를 6초로 설정합니다.
from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_client_id', 'bootstrap.servers': 'kafka1:9091', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'heartbeat.interval.ms': 5000, 'session.timeout.ms': 6000 } consumer = Consumer(config) consumer.subscribe(['test-topic'])
아래의 결과는 Heartbeat, JoinGroup, SyncGroup API Request 에 대한 패킷들입니다.
의도적으로 Heartbeat 의 주기를 session.timeout.ms 보다 길게주었고, 그로 인해 리밸런싱이 발생합니다.
12:57:43 -> 12:57:50 이 Heartbeat API Request 가 발생하지만, 이는 7초의 간격으로 전송되었습니다.
따라서 session.timeout.ms 인 6초를 초과하여 리밸런싱이 발생합니다.
리밸런싱 동안에 Consumer 는 JoinGroup 과 SyncGroup API 를 요청하여 Consumer Group 을 다시 형성하게 되죠.
// Heartbeat API Request 12:57:43.589490 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2522:2634, ack 782, win 501, options [nop,nop,TS val 3196531016 ecr 3651053683], length 112 0x0000: 4500 00a4 76ff 4000 4006 6b27 ac13 0004 E...v.@.@.k'.... 0x0010: ac13 0003 d8b4 2383 d12d 70cb c81e d6a8 ......#..-p..... 0x0020: 8018 01f5 58c4 0000 0101 080a be87 3148 ....X.........1H 0x0030: d99e a873 0000 006c 000c 0003 0000 006d ...s...l.......m 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0005 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6633 _is_client_id-f3 0x0080: 6630 3639 3136 2d39 3730 642d 3464 6566 f06916-970d-4def 0x0090: 2d39 6633 392d 3430 3265 3030 3337 6564 -9f39-402e0037ed 0x00a0: 3431 ffff 41.. // Heartbeat API Request 12:57:50.628733 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2634:2746, ack 796, win 501, options [nop,nop,TS val 3196536053 ecr 3651058699], length 112 0x0000: 4500 00a4 7701 4000 4006 6b25 ac13 0004 E...w.@.@.k%.... 0x0010: ac13 0003 d8b4 2383 d12d 713b c81e d6b6 ......#..-q;.... 0x0020: 8018 01f5 58c4 0000 0101 080a be87 44f5 ....X.........D. 0x0030: d99e bc0b 0000 006c 000c 0003 0000 006e .......l.......n 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0005 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6633 _is_client_id-f3 0x0080: 6630 3639 3136 2d39 3730 642d 3464 6566 f06916-970d-4def 0x0090: 2d39 6633 392d 3430 3265 3030 3337 6564 -9f39-402e0037ed 0x00a0: 3431 ffff 41.. // JoinGroup API Request 12:57:51.661621 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2746:2913, ack 796, win 501, options [nop,nop,TS val 3196537087 ecr 3651058699], length 167 0x0000: 4500 00db 7702 4000 4006 6aed ac13 0004 E...w.@.@.j..... 0x0010: ac13 0003 d8b4 2383 d12d 71ab c81e d6b6 ......#..-q..... 0x0020: 8018 01f5 58fb 0000 0101 080a be87 48ff ....X.........H. 0x0030: d99e bc0b 0000 00a3 000b 0005 0000 006f ...............o 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0000 -group...p...... 0x0070: ffff 0008 636f 6e73 756d 6572 0000 0002 ....consumer.... 0x0080: 0005 7261 6e67 6500 0000 2000 0300 0000 ..range......... 0x0090: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00a0: 0000 0000 00ff ffff ff00 0000 0a72 6f75 .............rou 0x00b0: 6e64 726f 6269 6e00 0000 2000 0300 0000 ndrobin......... 0x00c0: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00d0: 0000 0000 00ff ffff ff00 00 ........... // JoinGroup API Request 12:57:53.670231 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2913:3134, ack 892, win 501, options [nop,nop,TS val 3196539096 ecr 3651066774], length 221 0x0000: 4500 0111 7705 4000 4006 6ab4 ac13 0004 E...w.@.@.j..... 0x0010: ac13 0003 d8b4 2383 d12d 7252 c81e d716 ......#..-rR.... 0x0020: 8018 01f5 5931 0000 0101 080a be87 50d8 ....Y1........P. 0x0030: d99e db96 0000 00d9 000b 0005 0000 0070 ...............p 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0036 -group...p.....6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 3130 3661 6638 6563 2d31 3665 352d d-106af8ec-16e5- 0x0090: 3431 3937 2d61 3735 352d 6639 6333 3032 4197-a755-f9c302 0x00a0: 3936 6537 3431 ffff 0008 636f 6e73 756d 96e741....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 . // SyncGroup API Request 12:57:57.714625 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3198:3404, ack 1249, win 501, options [nop,nop,TS val 3196543140 ecr 3651070819], length 206 0x0000: 4500 0102 770b 4000 4006 6abd ac13 0004 E...w.@.@.j..... 0x0010: ac13 0003 d8b4 2383 d12d 736f c81e d87b ......#..-so...{ 0x0020: 8018 01f5 5922 0000 0101 080a be87 60a4 ....Y"........`. 0x0030: d99e eb63 0000 00ca 000e 0003 0000 0072 ...c...........r 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0007 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3130 _is_client_id-10 0x0080: 3661 6638 6563 2d31 3665 352d 3431 3937 6af8ec-16e5-4197 0x0090: 2d61 3735 352d 6639 6333 3032 3936 6537 -a755-f9c30296e7 0x00a0: 3431 ffff 0000 0001 0036 7468 6973 5f69 41.......6this_i 0x00b0: 735f 636c 6965 6e74 5f69 642d 3130 3661 s_client_id-106a 0x00c0: 6638 6563 2d31 3665 352d 3431 3937 2d61 f8ec-16e5-4197-a 0x00d0: 3735 352d 6639 6333 3032 3936 6537 3431 755-f9c30296e741 0x00e0: 0000 001e 0000 0000 0001 000a 7465 7374 ............test 0x00f0: 2d74 6f70 6963 0000 0001 0000 0000 0000 -topic.......... 0x0100: 0000 .. // Heartbeat API Request 12:57:59.733611 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3404:3516, ack 1297, win 501, options [nop,nop,TS val 3196545158 ecr 3651072837], length 112 0x0000: 4500 00a4 770c 4000 4006 6b1a ac13 0004 E...w.@.@.k..... 0x0010: ac13 0003 d8b4 2383 d12d 743d c81e d8ab ......#..-t=.... 0x0020: 8018 01f5 58c4 0000 0101 080a be87 6886 ....X.........h. 0x0030: d99e f345 0000 006c 000c 0003 0000 0073 ...E...l.......s 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0007 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3130 _is_client_id-10 0x0080: 3661 6638 6563 2d31 3665 352d 3431 3937 6af8ec-16e5-4197 0x0090: 2d61 3735 352d 6639 6333 3032 3936 6537 -a755-f9c30296e7 0x00a0: 3431 ffff 41.. // Heartbeat API Request 12:58:04.762392 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3588:3700, ack 1381, win 501, options [nop,nop,TS val 3196550185 ecr 3651076864], length 112 0x0000: 4500 00a4 770f 4000 4006 6b17 ac13 0004 E...w.@.@.k..... 0x0010: ac13 0003 d8b4 2383 d12d 74f5 c81e d8ff ......#..-t..... 0x0020: 8018 01f5 58c4 0000 0101 080a be87 7c29 ....X.........|) 0x0030: d99f 0300 0000 006c 000c 0003 0000 0075 .......l.......u 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0007 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3130 _is_client_id-10 0x0080: 3661 6638 6563 2d31 3665 352d 3431 3937 6af8ec-16e5-4197 0x0090: 2d61 3735 352d 6639 6333 3032 3936 6537 -a755-f9c30296e7 0x00a0: 3431 ffff 41..
관련 자료들.
카프카 API 문서.
https://kafka.apache.org/protocol.html#protocol_api_keys
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
Docker Commands.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기 (0) 2024.06.30 [Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 [Kafka] SyncGroup API 알아보기 (0) 2024.06.30 [Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ? (0) 2024.06.30 - group_id