ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Consumer Heartbeat 의 내부 동작 원리
    Kafka/Kafka Consumer 2024. 6. 30. 14:56
    반응형

    - 목차

     

    들어가며.

    카프카 컨슈머는 실행 후 Consumer Group 을 형성합니다.

    이 과정에서 컨슈머들은 브로커와 FindCoordinator, JoinGroup, SyncGroup 의 API 통신을 수행합니다. 

    이렇게 여러 컨슈머들을 하나의 그룹이 됩니다.

    하나의 Consumer Group 된 컨슈머들은 SyncGroup API 를 마무리로 각자 자신이 소비해야할 파티션 목록을 부여받게 됩니다.

    그리고 본격적으로 데이터의 소비를 시작합니다. ( Fetch API 를 통해서 )

     

    카프카 컨슈머는 Fetch API 를 통해서 데이터를 소비하는 작업 뿐만 아니라 Heartbeat 요청을 브로커에게 꾸준히 전달합니다.

    이때 Heartbeat 를 전달받는 브로커를 Group Coordinator 라고 부릅니다.

    여러 컨슈머들은 자신에게 부여된 하나의 Group Coordinator 에게 Heartbeat 요청을 전송합니다.

     

    Heartbeat 는 heartbeat.interval.ms 와 session.timeout.ms 설정에 영향을 받습니다.

    heartbeat.interval.ms 를 주기로 컨슈머는 브로커에게 Heartbeat 요청을 전송합니다.

    그리고 session.timeout.ms 으로 설정된 제한 기간동안 컨슈머가 브로커에게 Heartbeat 요청을 전송하지 않으면,

    브로커는 해당 컨슈머가 비정상적이라고 판단하고 리밸런싱을 시작합니다.

     

    출처 : https://www.lydtechconsulting.com/blog-kafka-rebalance-part1.html

     

    Heartbeat API 알아보기.

    아래는 컨슈머와 브로커가 서로 주고받는 Heartbeat API 의 요청/응답 포맷입니다.

     

    Heartbeat Request (Version: 4) => group_id generation_id member_id group_instance_id TAG_BUFFER 
      group_id => COMPACT_STRING
      generation_id => INT32
      member_id => COMPACT_STRING
      group_instance_id => COMPACT_NULLABLE_STRING
      
    Heartbeat Response (Version: 3) => throttle_time_ms error_code 
      throttle_time_ms => INT32
      error_code => INT16

     

    Heartbeat API 요청의 Format 을 살펴보면 아래와 같습니다.

     

    • group_id
      • group_id 는 카프카 컨슈머가 소속된 Consumer Group 의 group.id 입니다.

     

    • generation_id
      • generation_id 는 Consumer Group 이 몇 번의 리밸런싱이 발생하였는지에 대한 숫자 값입니다.
      • 새로운 컨슈머가 추가/제거되는 것처럼 Consumer Group 의 재구성된 횟수를 의미합니다.

     

    • member_id
      • member_id 는 카프카 컨슈머가 JoinGroup API Request 를 통해서 브로커로부터 부여받은 식별값입니다.
      • 이 값은 Group 내에서 Consumer 가 가지는 고유한 값입니다.

     

    • group.instance.id
      • group.instance.id 는 Consumer Group 의 Static Membership 과 관련된 데이터입니다.
      • 다른 글에서 Static Membership Assignment 에 대해서 설명하도록 하겠습니다.

     

    위와 같은 포맷으로 컨슈머는 브로커에게 Heartbeat 를 요청합니다.

     

    Heartbeat 의 TCP Packet 살펴보기.

    아래의 정보는 컨슈머가 브로커에게 전송하는 Heartbeat API Request 와 Response 의 패킷입니다.

    tcpdump 명령어를 통해서 아래의 패킷을 포착하였습니다.

     

    // Heartbeat API Request
    22:21:55.326856 IP romantic_blackburn.kafka-net.51460 > kafka2.9091: Flags [P.], seq 279:391, ack 547, win 501, options [nop,nop,TS val 3631458414 ecr 569066083], length 112
    	0x0000:  4500 00a4 76a1 4000 4006 6b81 ac13 0007  E...v.@.@.k.....
    	0x0010:  ac13 0004 c904 2383 e6a0 8ba9 2619 0fbe  ......#.....&...
    	0x0020:  8018 01f5 58c8 0000 0101 080a d873 a86e  ....X........s.n
    	0x0030:  21eb 4263 0000 006c 000c 0003 0000 0006  !.Bc...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3761  _is_client_id-7a
    	0x0080:  3033 3064 3139 2d61 3631 382d 3430 3237  030d19-a618-4027
    	0x0090:  2d39 6433 332d 3830 3937 6462 3034 6332  -9d33-8097db04c2
    	0x00a0:  3466 ffff                                4f..
        
    // Heartbeat API Response
    22:21:55.332608 IP kafka2.9091 > romantic_blackburn.kafka-net.51460: Flags [P.], seq 547:561, ack 391, win 50470, options [nop,nop,TS val 569066088 ecr 3631458414], length 14
    	0x0000:  4500 0042 e474 4000 4006 fe0f ac13 0004  E..B.t@.@.......
    	0x0010:  ac13 0007 2383 c904 2619 0fbe e6a0 8c19  ....#...&.......
    	0x0020:  8018 c526 5866 0000 0101 080a 21eb 4268  ...&Xf......!.Bh
    	0x0030:  d873 a86e 0000 000a 0000 0006 0000 0000  .s.n............
    	0x0040:  0000

     

    이를 간단히 살펴보면 0x0038 ~ 0x0039 에 해당하는 "000c" 의 값이 Heartbeat API 의 Key 에 해당합니다.

    Heartbeat 의 API Key 는 12 번으로 "000c" 와 같이 표시됩니다.

     

    "this_is_client_id" 와 "my-consumer-group" 그리고 "this_is_client_id-58dc67b4-e153-4a9d-b197-84c5c36c4b0b" 와 같이 client.id, group.id, member.id 등도 함께 표현됩니다.

     

    그리고 브로커는 컨슈머에게 두번째 Packet 으로 응답을 합니다.

    두번째 응답은 Heartbeat API Response 의 패킷입니다.

    이들은 Correlation Id 를 통해서 서로의 요청/응답을 연결합니다.

    첫번째 Packet 의 0x003e & 0x003f 의 "0006" 은 Heartbeat API Request 의 Correlation ID 입니다.

    그리고 이와 연결되는 두번째 응답 패킷의 0x003a & 0x003fb 의 "0006"Heartbeat API Response 의 Correlation ID 입니다.

    이 값을 통해서 Request 와 Response 는 서로 연결됩니다.

     

    heartbeat.interval.ms 알아보기.

    heartbeat.interval.ms 의 기본값은 3초입니다.

    이 시간설정을 변경해서 Heartbeat 전송 주기를 살펴봅니다.

    아래와 같이 heartbeat.interval.ms 를 5초로 설정합니다.

    그리고 Kafka Consumer 를 실행하게 되면, 5초 간격으로 Heartbeat API Request 가 전송됨을 확인할 수 있습니다.

     

    from confluent_kafka import Consumer, KafkaError
    
    config = {
        'client.id': 'this_is_client_id',
        'bootstrap.servers': 'kafka1:9091',
        'group.id': 'my-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'heartbeat.interval.ms': 5000
    }
    
    consumer = Consumer(config)
    consumer.subscribe(['test-topic'])

     

     

    아래처럼 5초 간격으로 Heartbeat API Request 들이 요청됩니다.

     

    // Heartbeat API Request
    22:30:53.686116 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 2839933330:2839933442, ack 2406593168, win 501, options [nop,nop,TS val 3631996763 ecr 569604432], length 112
    	0x0000:  4500 00a4 a9fe 4000 4006 3824 ac13 0007  E.....@.@.8$....
    	0x0010:  ac13 0004 98dc 2383 a945 f192 8f71 b290  ......#..E...q..
    	0x0020:  8018 01f5 58c8 0000 0101 080a d87b df5b  ....X........{.[
    	0x0030:  21f3 7950 0000 006c 000c 0003 0000 0006  !.yP...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6339  _is_client_id-c9
    	0x0080:  3365 6331 6364 2d32 6433 342d 3436 6233  3ec1cd-2d34-46b3
    	0x0090:  2d61 3931 342d 3266 6337 3834 6530 3631  -a914-2fc784e061
    	0x00a0:  3237 ffff                                27..
        
    // Heartbeat API Request
    22:30:58.711047 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 192:304, ack 125, win 501, options [nop,nop,TS val 3632001788 ecr 569604441], length 112
    	0x0000:  4500 00a4 aa01 4000 4006 3821 ac13 0007  E.....@.@.8!....
    	0x0010:  ac13 0004 98dc 2383 a945 f252 8f71 b30c  ......#..E.R.q..
    	0x0020:  8018 01f5 58c8 0000 0101 080a d87b f2fc  ....X........{..
    	0x0030:  21f3 7959 0000 006c 000c 0003 0000 0008  !.yY...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6339  _is_client_id-c9
    	0x0080:  3365 6331 6364 2d32 6433 342d 3436 6233  3ec1cd-2d34-46b3
    	0x0090:  2d61 3931 342d 3266 6337 3834 6530 3631  -a914-2fc784e061
    	0x00a0:  3237 ffff                                27..
        
    // Heartbeat API Request
    22:31:03.734301 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 304:416, ack 139, win 501, options [nop,nop,TS val 3632006811 ecr 569609472], length 112
    	0x0000:  4500 00a4 aa03 4000 4006 381f ac13 0007  E.....@.@.8.....
    	0x0010:  ac13 0004 98dc 2383 a945 f2c2 8f71 b31a  ......#..E...q..
    	0x0020:  8018 01f5 58c8 0000 0101 080a d87c 069b  ....X........|..
    	0x0030:  21f3 8d00 0000 006c 000c 0003 0000 0009  !......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6339  _is_client_id-c9
    	0x0080:  3365 6331 6364 2d32 6433 342d 3436 6233  3ec1cd-2d34-46b3
    	0x0090:  2d61 3931 342d 3266 6337 3834 6530 3631  -a914-2fc784e061
    	0x00a0:  3237 ffff                                27..
        
    // Heartbeat API Request
    22:31:08.753856 IP eager_khorana.kafka-net.39132 > kafka2.9091: Flags [P.], seq 416:528, ack 153, win 501, options [nop,nop,TS val 3632011831 ecr 569614485], length 112
    	0x0000:  4500 00a4 aa05 4000 4006 381d ac13 0007  E.....@.@.8.....
    	0x0010:  ac13 0004 98dc 2383 a945 f332 8f71 b328  ......#..E.2.q.(
    	0x0020:  8018 01f5 58c8 0000 0101 080a d87c 1a37  ....X........|.7
    	0x0030:  21f3 a095 0000 006c 000c 0003 0000 000a  !......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6339  _is_client_id-c9
    	0x0080:  3365 6331 6364 2d32 6433 342d 3436 6233  3ec1cd-2d34-46b3
    	0x0090:  2d61 3931 342d 3266 6337 3834 6530 3631  -a914-2fc784e061
    	0x00a0:  3237 ffff                                27..

     

     

     

    session.timeout.ms 와 Rebalance 살펴보기.

    session.timeout.ms 시간 이내에 Consumer 는 Broker 에게 Heartbeat 를 전송하지 못하면 Rebalance 가 발생합니다.

    아래 예시는 session.timeout.ms 의 경과와 리밸런싱을 테스트하기 위해서 heartbeat.interval.ms 를 5초로 설정하고,

    session.timeout.ms 를 6초로 설정합니다.

     

    from confluent_kafka import Consumer, KafkaError
    
    config = {
        'client.id': 'this_is_client_id',
        'bootstrap.servers': 'kafka1:9091',
        'group.id': 'my-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'heartbeat.interval.ms': 5000,
        'session.timeout.ms': 6000
    }
    
    consumer = Consumer(config)
    consumer.subscribe(['test-topic'])

     

    아래의 결과는 Heartbeat, JoinGroup, SyncGroup API Request 에 대한 패킷들입니다.

    의도적으로 Heartbeat 의 주기를 session.timeout.ms 보다 길게주었고, 그로 인해 리밸런싱이 발생합니다.

    12:57:43 -> 12:57:50 이 Heartbeat API Request 가 발생하지만, 이는 7초의 간격으로 전송되었습니다.

    따라서 session.timeout.ms 인 6초를 초과하여 리밸런싱이 발생합니다.

    리밸런싱 동안에 Consumer 는 JoinGroup 과 SyncGroup API 를 요청하여 Consumer Group 을 다시 형성하게 되죠.

     

    // Heartbeat API Request 
    12:57:43.589490 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2522:2634, ack 782, win 501, options [nop,nop,TS val 3196531016 ecr 3651053683], length 112
    	0x0000:  4500 00a4 76ff 4000 4006 6b27 ac13 0004  E...v.@.@.k'....
    	0x0010:  ac13 0003 d8b4 2383 d12d 70cb c81e d6a8  ......#..-p.....
    	0x0020:  8018 01f5 58c4 0000 0101 080a be87 3148  ....X.........1H
    	0x0030:  d99e a873 0000 006c 000c 0003 0000 006d  ...s...l.......m
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0005 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6633  _is_client_id-f3
    	0x0080:  6630 3639 3136 2d39 3730 642d 3464 6566  f06916-970d-4def
    	0x0090:  2d39 6633 392d 3430 3265 3030 3337 6564  -9f39-402e0037ed
    	0x00a0:  3431 ffff                                41..
        
    // Heartbeat API Request 
    12:57:50.628733 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2634:2746, ack 796, win 501, options [nop,nop,TS val 3196536053 ecr 3651058699], length 112
    	0x0000:  4500 00a4 7701 4000 4006 6b25 ac13 0004  E...w.@.@.k%....
    	0x0010:  ac13 0003 d8b4 2383 d12d 713b c81e d6b6  ......#..-q;....
    	0x0020:  8018 01f5 58c4 0000 0101 080a be87 44f5  ....X.........D.
    	0x0030:  d99e bc0b 0000 006c 000c 0003 0000 006e  .......l.......n
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0005 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6633  _is_client_id-f3
    	0x0080:  6630 3639 3136 2d39 3730 642d 3464 6566  f06916-970d-4def
    	0x0090:  2d39 6633 392d 3430 3265 3030 3337 6564  -9f39-402e0037ed
    	0x00a0:  3431 ffff                                41..
        
    // JoinGroup API Request 
    12:57:51.661621 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2746:2913, ack 796, win 501, options [nop,nop,TS val 3196537087 ecr 3651058699], length 167
    	0x0000:  4500 00db 7702 4000 4006 6aed ac13 0004  E...w.@.@.j.....
    	0x0010:  ac13 0003 d8b4 2383 d12d 71ab c81e d6b6  ......#..-q.....
    	0x0020:  8018 01f5 58fb 0000 0101 080a be87 48ff  ....X.........H.
    	0x0030:  d99e bc0b 0000 00a3 000b 0005 0000 006f  ...............o
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0000  -group...p......
    	0x0070:  ffff 0008 636f 6e73 756d 6572 0000 0002  ....consumer....
    	0x0080:  0005 7261 6e67 6500 0000 2000 0300 0000  ..range.........
    	0x0090:  0100 0a74 6573 742d 746f 7069 6300 0000  ...test-topic...
    	0x00a0:  0000 0000 00ff ffff ff00 0000 0a72 6f75  .............rou
    	0x00b0:  6e64 726f 6269 6e00 0000 2000 0300 0000  ndrobin.........
    	0x00c0:  0100 0a74 6573 742d 746f 7069 6300 0000  ...test-topic...
    	0x00d0:  0000 0000 00ff ffff ff00 00              ...........
        
    // JoinGroup API Request 
    12:57:53.670231 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 2913:3134, ack 892, win 501, options [nop,nop,TS val 3196539096 ecr 3651066774], length 221
    	0x0000:  4500 0111 7705 4000 4006 6ab4 ac13 0004  E...w.@.@.j.....
    	0x0010:  ac13 0003 d8b4 2383 d12d 7252 c81e d716  ......#..-rR....
    	0x0020:  8018 01f5 5931 0000 0101 080a be87 50d8  ....Y1........P.
    	0x0030:  d99e db96 0000 00d9 000b 0005 0000 0070  ...............p
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0036  -group...p.....6
    	0x0070:  7468 6973 5f69 735f 636c 6965 6e74 5f69  this_is_client_i
    	0x0080:  642d 3130 3661 6638 6563 2d31 3665 352d  d-106af8ec-16e5-
    	0x0090:  3431 3937 2d61 3735 352d 6639 6333 3032  4197-a755-f9c302
    	0x00a0:  3936 6537 3431 ffff 0008 636f 6e73 756d  96e741....consum
    	0x00b0:  6572 0000 0002 0005 7261 6e67 6500 0000  er......range...
    	0x00c0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x00d0:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x00e0:  0000 0a72 6f75 6e64 726f 6269 6e00 0000  ...roundrobin...
    	0x00f0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x0100:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x0110:  00                                       .
        
    // SyncGroup API Request 
    12:57:57.714625 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3198:3404, ack 1249, win 501, options [nop,nop,TS val 3196543140 ecr 3651070819], length 206
    	0x0000:  4500 0102 770b 4000 4006 6abd ac13 0004  E...w.@.@.j.....
    	0x0010:  ac13 0003 d8b4 2383 d12d 736f c81e d87b  ......#..-so...{
    	0x0020:  8018 01f5 5922 0000 0101 080a be87 60a4  ....Y"........`.
    	0x0030:  d99e eb63 0000 00ca 000e 0003 0000 0072  ...c...........r
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0007 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3130  _is_client_id-10
    	0x0080:  3661 6638 6563 2d31 3665 352d 3431 3937  6af8ec-16e5-4197
    	0x0090:  2d61 3735 352d 6639 6333 3032 3936 6537  -a755-f9c30296e7
    	0x00a0:  3431 ffff 0000 0001 0036 7468 6973 5f69  41.......6this_i
    	0x00b0:  735f 636c 6965 6e74 5f69 642d 3130 3661  s_client_id-106a
    	0x00c0:  6638 6563 2d31 3665 352d 3431 3937 2d61  f8ec-16e5-4197-a
    	0x00d0:  3735 352d 6639 6333 3032 3936 6537 3431  755-f9c30296e741
    	0x00e0:  0000 001e 0000 0000 0001 000a 7465 7374  ............test
    	0x00f0:  2d74 6f70 6963 0000 0001 0000 0000 0000  -topic..........
    	0x0100:  0000                                     ..
        
    // Heartbeat API Request 
    12:57:59.733611 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3404:3516, ack 1297, win 501, options [nop,nop,TS val 3196545158 ecr 3651072837], length 112
    	0x0000:  4500 00a4 770c 4000 4006 6b1a ac13 0004  E...w.@.@.k.....
    	0x0010:  ac13 0003 d8b4 2383 d12d 743d c81e d8ab  ......#..-t=....
    	0x0020:  8018 01f5 58c4 0000 0101 080a be87 6886  ....X.........h.
    	0x0030:  d99e f345 0000 006c 000c 0003 0000 0073  ...E...l.......s
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0007 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3130  _is_client_id-10
    	0x0080:  3661 6638 6563 2d31 3665 352d 3431 3937  6af8ec-16e5-4197
    	0x0090:  2d61 3735 352d 6639 6333 3032 3936 6537  -a755-f9c30296e7
    	0x00a0:  3431 ffff                                41..
        
    // Heartbeat API Request 
    12:58:04.762392 IP sweet_kapitsa.kafka-net.55476 > kafka1.9091: Flags [P.], seq 3588:3700, ack 1381, win 501, options [nop,nop,TS val 3196550185 ecr 3651076864], length 112
    	0x0000:  4500 00a4 770f 4000 4006 6b17 ac13 0004  E...w.@.@.k.....
    	0x0010:  ac13 0003 d8b4 2383 d12d 74f5 c81e d8ff  ......#..-t.....
    	0x0020:  8018 01f5 58c4 0000 0101 080a be87 7c29  ....X.........|)
    	0x0030:  d99f 0300 0000 006c 000c 0003 0000 0075  .......l.......u
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0007 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3130  _is_client_id-10
    	0x0080:  3661 6638 6563 2d31 3665 352d 3431 3937  6af8ec-16e5-4197
    	0x0090:  2d61 3735 352d 6639 6333 3032 3936 6537  -a755-f9c30296e7
    	0x00a0:  3431 ffff                                41..

     

     

    관련 자료들.

     

    카프카 API 문서.

     

    https://kafka.apache.org/protocol.html#protocol_api_keys

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

     

    Docker Commands.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2

     

     

    반응형
Designed by Tistory.