ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까?
    Kafka/Kafka Consumer 2024. 6. 30. 14:56
    반응형

     

    - 목차

     

    들어가며.

    카프카 컨슈머는 실행되면서 Consumer Group 을 형성합니다.

    동일한 group.id 를 가지는 여러 컨슈머들은 하나의 Consumer Group 에 소속되게 됩니다.

    이 과정에서 JoinGroup 과 SyncGroup API 를 활용하게 되고, Consumer Group 의 형성과 파티션 할당이 이루어집니다.

    그리고 여러가지 상황에 의해서 컨슈머가 추가되거나 종료될 수 있는데요.

    이 과정에서 리밸런싱이 발생합니다.

     

    이번 글에서는 컨슈머의 여러가지 동작에 따른 리밸런싱이 어떻게 발생하는지 알아보는 시간을 가지려고 합니다.

     

     

    Rebalance 1) 새로운 컨슈머가 추가되는 상황.

    기존의 Consumer Group 이 존재하는 상황에서 새로운 컨슈머가 추가됩니다.

    이때에 새로운 컨슈머는 JoinGroup API Request 를 브로커에게 요청하게 됩니다.

    이러한 상황이 발생하면 Group Coordinator 는 기존의 Consumer Group 에 소속된 컨슈머들에게 REBALANCE_IN_PROGRESS 를 전달하게 됩니다.

     

    일반적으로 Group Coordinator 는 컨슈머가 요청한 Heartbeat Request 의 응답으로써 REBALANCE_IN_PROGRESS 상태임을 전파합니다.

    Consumer 는 보통 3초 단위로 Heartbeat 를 요청하게 되므로 컨슈머들은 최대 3초 내로 리밸런싱 상황을 감지할 수 있습니다.

     

    아래는 이와 관련된 TCP 패킷의 동작입니다.

    내용이 복잡한 관계로 간단히 정리를 하자면,

    1. heartbeat.interval.ms 를 주기로 3개의 Consumer 들은 Heartbeat 를 브로커에게 전송합니다.
    2. 그러다가 4번째 Consumer 가 브로커에게 JoinGroup 요청을 보냅니다.
    3. 그럼 Group Coordinator 는 새로운 Consumer 가 Consumer Group 에 참여함을 인지하고
    4. 모든 Consumer 들에게 REBALANCE_IN_PROGRESS 에러 응답을 전송합니다.
    5. 리밸런싱을 컨슈머에게 전달하는 방식을 Consumer 가 보낸 Heartbeat 요청의 응답을 통해서 전달합니다.

    이렇게 리밸런싱 상황임을 전달받은 브로커들은 JoinGroup 과 SyncGroup 요청을 다시 전달하면서 리밸런싱을 수행합니다.

     

    // Consumer 1 의 Heartbeat 요청	
    22:23:20.985039 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 3620269773 ecr 2925910942], length 112
    	0x0000:  4500 00a4 5d93 4000 4006 8491 ac13 0006  E...].@.@.......
    	0x0010:  ac13 0003 9b82 2383 8f30 dce5 b22f d4d4  ......#..0.../..
    	0x0020:  8018 01f5 58c6 0000 0101 080a d7c8 eecd  ....X...........
    	0x0030:  ae65 db9e 0000 006c 000c 0003 0000 0017  .e.....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3234  _is_client_id-24
    	0x0080:  3965 3133 6139 2d32 6639 302d 3461 6362  9e13a9-2f90-4acb
    	0x0090:  2d39 3934 332d 6137 3265 6639 6564 3532  -9943-a72ef9ed52
    	0x00a0:  3061 ffff                                0a..
    
    // Consumer 2 의 Heartbeat 요청
    22:23:20.985627 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 904303057 ecr 523460100], length 112
    	0x0000:  4500 00a4 29f1 4000 4006 b831 ac13 0008  E...).@.@..1....
    	0x0010:  ac13 0003 d202 2383 6b7a d459 248e 9929  ......#.kz.Y$..)
    	0x0020:  8018 01f5 58c8 0000 0101 080a 35e6 91d1  ....X.......5...
    	0x0030:  1f33 5e04 0000 006c 000c 0003 0000 0011  .3^....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3964  _is_client_id-9d
    	0x0080:  3131 3932 3535 2d30 6264 362d 3431 3031  119255-0bd6-4101
    	0x0090:  2d39 3666 362d 6138 6235 3064 6636 6565  -96f6-a8b50df6ee
    	0x00a0:  6331 ffff                                c1..
    
    // Consumer 3 의 Heartbeat 요청
    22:23:20.985772 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 984611383 ecr 4171233825], length 112
    	0x0000:  4500 00a4 46da 4000 4006 9b47 ac13 0009  E...F.@.@..G....
    	0x0010:  ac13 0003 8084 2383 cb9b 8bba 8bdf 79d2  ......#.......y.
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3aaf fa37  ....X.......:..7
    	0x0030:  f89f fa21 0000 006c 000c 0003 0000 0011  ...!...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3239  _is_client_id-29
    	0x0080:  3863 6139 3861 2d34 3163 392d 3434 3436  8ca98a-41c9-4446
    	0x0090:  2d61 3334 332d 3265 3335 3163 6561 3062  -a343-2e351cea0b
    	0x00a0:  6265 ffff                                be..
    
    // Consumer 4 의 JoinGroup 요청	
    22:23:24.371426 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 3293203651:3293203818, ack 1672368758, win 501, options [nop,nop,TS val 737703532 ecr 2072872991], length 167
    	0x0000:  4500 00db a2cc 4000 4006 3f1d ac13 000a  E.....@.@.?.....
    	0x0010:  ac13 0003 cad2 2383 c44a 4cc3 63ae 5276  ......#..JL.c.Rv
    	0x0020:  8018 01f5 5901 0000 0101 080a 2bf8 766c  ....Y.......+.vl
    	0x0030:  7b8d 881f 0000 00a3 000b 0005 0000 0002  {...............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0000  -group...p......
    	0x0070:  ffff 0008 636f 6e73 756d 6572 0000 0002  ....consumer....
    	0x0080:  0005 7261 6e67 6500 0000 2000 0300 0000  ..range.........
    	0x0090:  0100 0a74 6573 742d 746f 7069 6300 0000  ...test-topic...
    	0x00a0:  0000 0000 00ff ffff ff00 0000 0a72 6f75  .............rou
    	0x00b0:  6e64 726f 6269 6e00 0000 2000 0300 0000  ndrobin.........
    	0x00c0:  0100 0a74 6573 742d 746f 7069 6300 0000  ...test-topic...
    	0x00d0:  0000 0000 00ff ffff ff00 00              ...........
    
    // Consumer 4 의 JoinGroup 요청
    22:23:24.374787 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 167:388, ack 83, win 501, options [nop,nop,TS val 737703535 ecr 2072873002], length 221
    	0x0000:  4500 0111 a2cd 4000 4006 3ee6 ac13 000a  E.....@.@.>.....
    	0x0010:  ac13 0003 cad2 2383 c44a 4d6a 63ae 52c8  ......#..JMjc.R.
    	0x0020:  8018 01f5 5937 0000 0101 080a 2bf8 766f  ....Y7......+.vo
    	0x0030:  7b8d 882a 0000 00d9 000b 0005 0000 0003  {..*............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0036  -group...p.....6
    	0x0070:  7468 6973 5f69 735f 636c 6965 6e74 5f69  this_is_client_i
    	0x0080:  642d 6131 6662 3530 3163 2d63 3061 332d  d-a1fb501c-c0a3-
    	0x0090:  3435 3262 2d62 6634 632d 3937 6330 6662  452b-bf4c-97c0fb
    	0x00a0:  3437 3030 3838 ffff 0008 636f 6e73 756d  470088....consum
    	0x00b0:  6572 0000 0002 0005 7261 6e67 6500 0000  er......range...
    	0x00c0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x00d0:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x00e0:  0000 0a72 6f75 6e64 726f 6269 6e00 0000  ...roundrobin...
    	0x00f0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x0100:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x0110:  00                                       .
    
    // Consumer 2 의 Heartbeat 요청
    22:23:25.994514 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 904308066 ecr 523465112], length 112
    	0x0000:  4500 00a4 29f3 4000 4006 b82f ac13 0008  E...).@.@../....
    	0x0010:  ac13 0003 d202 2383 6b7a d4c9 248e 9937  ......#.kz..$..7
    	0x0020:  8018 01f5 58c8 0000 0101 080a 35e6 a562  ....X.......5..b
    	0x0030:  1f33 7198 0000 006c 000c 0003 0000 0012  .3q....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3964  _is_client_id-9d
    	0x0080:  3131 3932 3535 2d30 6264 362d 3431 3031  119255-0bd6-4101
    	0x0090:  2d39 3666 362d 6138 6235 3064 6636 6565  -96f6-a8b50df6ee
    	0x00a0:  6331 ffff                                c1..
    
    // Consumer 1 의 Heartbeat 요청
    22:23:25.995465 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 3620274784 ecr 2925915962], length 112
    	0x0000:  4500 00a4 5d95 4000 4006 848f ac13 0006  E...].@.@.......
    	0x0010:  ac13 0003 9b82 2383 8f30 dd55 b22f d4e2  ......#..0.U./..
    	0x0020:  8018 01f5 58c6 0000 0101 080a d7c9 0260  ....X..........`
    	0x0030:  ae65 ef3a 0000 006c 000c 0003 0000 0018  .e.:...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3234  _is_client_id-24
    	0x0080:  3965 3133 6139 2d32 6639 302d 3461 6362  9e13a9-2f90-4acb
    	0x0090:  2d39 3934 332d 6137 3265 6639 6564 3532  -9943-a72ef9ed52
    	0x00a0:  3061 ffff                                0a..
    
    // Consumer 3 의 Heartbeat 요청
    22:23:25.998548 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 984616396 ecr 4171238859], length 112
    	0x0000:  4500 00a4 46dc 4000 4006 9b45 ac13 0009  E...F.@.@..E....
    	0x0010:  ac13 0003 8084 2383 cb9b 8c2a 8bdf 79e0  ......#....*..y.
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3ab0 0dcc  ....X.......:...
    	0x0030:  f8a0 0dcb 0000 006c 000c 0003 0000 0012  .......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0002 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3239  _is_client_id-29
    	0x0080:  3863 6139 3861 2d34 3163 392d 3434 3436  8ca98a-41c9-4446
    	0x0090:  2d61 3334 332d 3265 3335 3163 6561 3062  -a343-2e351cea0b
    	0x00a0:  6265 ffff                                be..
    
    // 브로커가 Consumer 2 에게 Rebalance In Progress 알림.
    22:23:26.003516 IP kafka1.9091 > ecstatic_ramanujan.kafka-net.53762: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 523470105 ecr 904308066], length 14
    	0x0000:  4500 0042 d176 4000 4006 110e ac13 0003  E..B.v@.@.......
    	0x0010:  ac13 0008 2383 d202 248e 9937 6b7a d539  ....#...$..7kz.9
    	0x0020:  8018 c526 5866 0000 0101 080a 1f33 8519  ...&Xf.......3..
    	0x0030:  35e6 a562 0000 000a 0000 0012 0000 0000  5..b............
    	0x0040:  001b                                     ..
    
    // Consumer 2 의 JoinGroup 요청
    22:23:26.004135 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 904308075 ecr 523470105], length 221
    	0x0000:  4500 0111 29f5 4000 4006 b7c0 ac13 0008  E...).@.@.......
    	0x0010:  ac13 0003 d202 2383 6b7a d539 248e 9945  ......#.kz.9$..E
    	0x0020:  8018 01f5 5935 0000 0101 080a 35e6 a56b  ....Y5......5..k
    	0x0030:  1f33 8519 0000 00d9 000b 0005 0000 0013  .3..............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0036  -group...p.....6
    	0x0070:  7468 6973 5f69 735f 636c 6965 6e74 5f69  this_is_client_i
    	0x0080:  642d 3964 3131 3932 3535 2d30 6264 362d  d-9d119255-0bd6-
    	0x0090:  3431 3031 2d39 3666 362d 6138 6235 3064  4101-96f6-a8b50d
    	0x00a0:  6636 6565 6331 ffff 0008 636f 6e73 756d  f6eec1....consum
    	0x00b0:  6572 0000 0002 0005 7261 6e67 6500 0000  er......range...
    	0x00c0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x00d0:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x00e0:  0000 0a72 6f75 6e64 726f 6269 6e00 0000  ...roundrobin...
    	0x00f0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x0100:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x0110:  00                                       .
    
    // 브로커가 Consumer 1 에게 Rebalance In Progress 알림.
    22:23:26.011282 IP kafka1.9091 > sharp_nash.kafka-net.39810: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 2925920954 ecr 3620274784], length 14
    	0x0000:  4500 0042 1d59 4000 4006 c52d ac13 0003  E..B.Y@.@..-....
    	0x0010:  ac13 0006 2383 9b82 b22f d4e2 8f30 ddc5  ....#..../...0..
    	0x0020:  8018 c526 5864 0000 0101 080a ae66 02ba  ...&Xd.......f..
    	0x0030:  d7c9 0260 0000 000a 0000 0018 0000 0000  ...`............
    	0x0040:  001b                                     ..
    
    // Consumer 1 의 JoinGroup 요청
    22:23:26.011686 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 3620274800 ecr 2925920954], length 221
    	0x0000:  4500 0111 5d97 4000 4006 8420 ac13 0006  E...].@.@.......
    	0x0010:  ac13 0003 9b82 2383 8f30 ddc5 b22f d4f0  ......#..0.../..
    	0x0020:  8018 01f5 5933 0000 0101 080a d7c9 0270  ....Y3.........p
    	0x0030:  ae66 02ba 0000 00d9 000b 0005 0000 0019  .f..............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0036  -group...p.....6
    	0x0070:  7468 6973 5f69 735f 636c 6965 6e74 5f69  this_is_client_i
    	0x0080:  642d 3234 3965 3133 6139 2d32 6639 302d  d-249e13a9-2f90-
    	0x0090:  3461 6362 2d39 3934 332d 6137 3265 6639  4acb-9943-a72ef9
    	0x00a0:  6564 3532 3061 ffff 0008 636f 6e73 756d  ed520a....consum
    	0x00b0:  6572 0000 0002 0005 7261 6e67 6500 0000  er......range...
    	0x00c0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x00d0:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x00e0:  0000 0a72 6f75 6e64 726f 6269 6e00 0000  ...roundrobin...
    	0x00f0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x0100:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x0110:  00                                       .
    
    // 브로커가 Consumer 3 에게 Rebalance In Progress 알림.
    22:23:26.016836 IP kafka1.9091 > cranky_herschel.kafka-net.32900: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 4171243856 ecr 984616396], length 14
    	0x0000:  4500 0042 fd92 4000 4006 e4f0 ac13 0003  E..B..@.@.......
    	0x0010:  ac13 0009 2383 8084 8bdf 79e0 cb9b 8c9a  ....#.....y.....
    	0x0020:  8018 c526 5867 0000 0101 080a f8a0 2150  ...&Xg........!P
    	0x0030:  3ab0 0dcc 0000 000a 0000 0012 0000 0000  :...............
    	0x0040:  001b                                     ..
    
    
    // Consumer 3 의 JoinGroup 요청
    22:23:26.017284 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 984616414 ecr 4171243856], length 221
    	0x0000:  4500 0111 46de 4000 4006 9ad6 ac13 0009  E...F.@.@.......
    	0x0010:  ac13 0003 8084 2383 cb9b 8c9a 8bdf 79ee  ......#.......y.
    	0x0020:  8018 01f5 5936 0000 0101 080a 3ab0 0dde  ....Y6......:...
    	0x0030:  f8a0 2150 0000 00d9 000b 0005 0000 0013  ..!P............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 1770 0004 93e0 0036  -group...p.....6
    	0x0070:  7468 6973 5f69 735f 636c 6965 6e74 5f69  this_is_client_i
    	0x0080:  642d 3239 3863 6139 3861 2d34 3163 392d  d-298ca98a-41c9-
    	0x0090:  3434 3436 2d61 3334 332d 3265 3335 3163  4446-a343-2e351c
    	0x00a0:  6561 3062 6265 ffff 0008 636f 6e73 756d  ea0bbe....consum
    	0x00b0:  6572 0000 0002 0005 7261 6e67 6500 0000  er......range...
    	0x00c0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x00d0:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x00e0:  0000 0a72 6f75 6e64 726f 6269 6e00 0000  ...roundrobin...
    	0x00f0:  2000 0300 0000 0100 0a74 6573 742d 746f  .........test-to
    	0x0100:  7069 6300 0000 0000 0000 00ff ffff ff00  pic.............
    	0x0110:  00                                       .
    
    // Consumer 4 의 SyncGroup 요청
    22:23:26.027169 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 388:504, ack 224, win 501, options [nop,nop,TS val 737705187 ecr 2072874654], length 116
    	0x0000:  4500 00a8 a2ce 4000 4006 3f4e ac13 000a  E.....@.@.?N....
    	0x0010:  ac13 0003 cad2 2383 c44a 4e47 63ae 5355  ......#..JNGc.SU
    	0x0020:  8018 01f5 58ce 0000 0101 080a 2bf8 7ce3  ....X.......+.|.
    	0x0030:  7b8d 8e9e 0000 0070 000e 0003 0000 0004  {......p........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6131  _is_client_id-a1
    	0x0080:  6662 3530 3163 2d63 3061 332d 3435 3262  fb501c-c0a3-452b
    	0x0090:  2d62 6634 632d 3937 6330 6662 3437 3030  -bf4c-97c0fb4700
    	0x00a0:  3838 ffff 0000 0000                      88......
    
    // Consumer 2 의 SyncGroup 요청	
    22:23:26.027769 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 781:897, ack 212, win 501, options [nop,nop,TS val 904308099 ecr 523470129], length 116
    	0x0000:  4500 00a8 29f6 4000 4006 b828 ac13 0008  E...).@.@..(....
    	0x0010:  ac13 0003 d202 2383 6b7a d616 248e 99d2  ......#.kz..$...
    	0x0020:  8018 01f5 58cc 0000 0101 080a 35e6 a583  ....X.......5...
    	0x0030:  1f33 8531 0000 0070 000e 0003 0000 0014  .3.1...p........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3964  _is_client_id-9d
    	0x0080:  3131 3932 3535 2d30 6264 362d 3431 3031  119255-0bd6-4101
    	0x0090:  2d39 3666 362d 6138 6235 3064 6636 6565  -96f6-a8b50df6ee
    	0x00a0:  6331 ffff 0000 0000                      c1......
    
    // Consumer 3 의 SyncGroup 요청		
    22:23:26.027772 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 781:897, ack 212, win 501, options [nop,nop,TS val 984616425 ecr 4171243867], length 116
    	0x0000:  4500 00a8 46df 4000 4006 9b3e ac13 0009  E...F.@.@..>....
    	0x0010:  ac13 0003 8084 2383 cb9b 8d77 8bdf 7a7b  ......#....w..z{
    	0x0020:  8018 01f5 58cd 0000 0101 080a 3ab0 0de9  ....X.......:...
    	0x0030:  f8a0 215b 0000 0070 000e 0003 0000 0014  ..![...p........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3239  _is_client_id-29
    	0x0080:  3863 6139 3861 2d34 3163 392d 3434 3436  8ca98a-41c9-4446
    	0x0090:  2d61 3334 332d 3265 3335 3163 6561 3062  -a343-2e351cea0b
    	0x00a0:  6265 ffff 0000 0000                      be......
    
    // Consumer 1 의 SyncGroup 요청	
    22:23:26.031485 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 845:1301, ack 844, win 501, options [nop,nop,TS val 3620274820 ecr 2925920974], length 456
    	0x0000:  4500 01fc 5d99 4000 4006 8333 ac13 0006  E...].@.@..3....
    	0x0010:  ac13 0003 9b82 2383 8f30 dee2 b22f d7f5  ......#..0.../..
    	0x0020:  8018 01f5 5a1e 0000 0101 080a d7c9 0284  ....Z...........
    	0x0030:  ae66 02ce 0000 01c4 000e 0003 0000 001b  .f..............
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3234  _is_client_id-24
    	0x0080:  3965 3133 6139 2d32 6639 302d 3461 6362  9e13a9-2f90-4acb
    	0x0090:  2d39 3934 332d 6137 3265 6639 6564 3532  -9943-a72ef9ed52
    	0x00a0:  3061 ffff 0000 0004 0036 7468 6973 5f69  0a.......6this_i
    	0x00b0:  735f 636c 6965 6e74 5f69 642d 3234 3965  s_client_id-249e
    	0x00c0:  3133 6139 2d32 6639 302d 3461 6362 2d39  13a9-2f90-4acb-9
    	0x00d0:  3934 332d 6137 3265 6639 6564 3532 3061  943-a72ef9ed520a
    	0x00e0:  0000 001e 0000 0000 0001 000a 7465 7374  ............test
    	0x00f0:  2d74 6f70 6963 0000 0001 0000 0000 0000  -topic..........
    	0x0100:  0000 0036 7468 6973 5f69 735f 636c 6965  ...6this_is_clie
    	0x0110:  6e74 5f69 642d 6131 6662 3530 3163 2d63  nt_id-a1fb501c-c
    	0x0120:  3061 332d 3435 3262 2d62 6634 632d 3937  0a3-452b-bf4c-97
    	0x0130:  6330 6662 3437 3030 3838 0000 000a 0000  c0fb470088......
    	0x0140:  0000 0000 0000 0000 0036 7468 6973 5f69  .........6this_i
    	0x0150:  735f 636c 6965 6e74 5f69 642d 3964 3131  s_client_id-9d11
    	0x0160:  3932 3535 2d30 6264 362d 3431 3031 2d39  9255-0bd6-4101-9
    	0x0170:  3666 362d 6138 6235 3064 6636 6565 6331  6f6-a8b50df6eec1
    	0x0180:  0000 001e 0000 0000 0001 000a 7465 7374  ............test
    	0x0190:  2d74 6f70 6963 0000 0001 0000 0002 0000  -topic..........
    	0x01a0:  0000 0036 7468 6973 5f69 735f 636c 6965  ...6this_is_clie
    	0x01b0:  6e74 5f69 642d 3239 3863 6139 3861 2d34  nt_id-298ca98a-4
    	0x01c0:  3163 392d 3434 3436 2d61 3334 332d 3265  1c9-4446-a343-2e
    	0x01d0:  3335 3163 6561 3062 6265 0000 001e 0000  351cea0bbe......
    	0x01e0:  0000 0001 000a 7465 7374 2d74 6f70 6963  ......test-topic
    	0x01f0:  0000 0001 0000 0001 0000 0000            ............
    
    // 다시 모든 Consumer 들이 Heartbeat 전송
    22:23:26.051939 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 504:616, ack 252, win 501, options [nop,nop,TS val 737705212 ecr 2072874679], length 112
    	0x0000:  4500 00a4 a2cf 4000 4006 3f51 ac13 000a  E.....@.@.?Q....
    	0x0010:  ac13 0003 cad2 2383 c44a 4ebb 63ae 5371  ......#..JN.c.Sq
    	0x0020:  8018 01f5 58ca 0000 0101 080a 2bf8 7cfc  ....X.......+.|.
    	0x0030:  7b8d 8eb7 0000 006c 000c 0003 0000 0005  {......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6131  _is_client_id-a1
    	0x0080:  6662 3530 3163 2d63 3061 332d 3435 3262  fb501c-c0a3-452b
    	0x0090:  2d62 6634 632d 3937 6330 6662 3437 3030  -bf4c-97c0fb4700
    	0x00a0:  3838 ffff                                88..
    22:23:31.067122 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 616:728, ack 266, win 501, options [nop,nop,TS val 737710227 ecr 2072874706], length 112
    	0x0000:  4500 00a4 a2d1 4000 4006 3f4f ac13 000a  E.....@.@.?O....
    	0x0010:  ac13 0003 cad2 2383 c44a 4f2b 63ae 537f  ......#..JO+c.S.
    	0x0020:  8018 01f5 58ca 0000 0101 080a 2bf8 9093  ....X.......+...
    	0x0030:  7b8d 8ed2 0000 006c 000c 0003 0000 0006  {......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6131  _is_client_id-a1
    	0x0080:  6662 3530 3163 2d63 3061 332d 3435 3262  fb501c-c0a3-452b
    	0x0090:  2d62 6634 632d 3937 6330 6662 3437 3030  -bf4c-97c0fb4700
    	0x00a0:  3838 ffff                                88..
    22:23:31.070401 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 1373:1485, ack 962, win 501, options [nop,nop,TS val 3620279858 ecr 2925921022], length 112
    	0x0000:  4500 00a4 5d9c 4000 4006 8488 ac13 0006  E...].@.@.......
    	0x0010:  ac13 0003 9b82 2383 8f30 e0f2 b22f d86b  ......#..0.../.k
    	0x0020:  8018 01f5 58c6 0000 0101 080a d7c9 1632  ....X..........2
    	0x0030:  ae66 02fe 0000 006c 000c 0003 0000 001d  .f.....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3234  _is_client_id-24
    	0x0080:  3965 3133 6139 2d32 6639 302d 3461 6362  9e13a9-2f90-4acb
    	0x0090:  2d39 3934 332d 6137 3265 6639 6564 3532  -9943-a72ef9ed52
    	0x00a0:  3061 ffff                                0a..
    22:23:31.071841 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 969:1081, ack 330, win 501, options [nop,nop,TS val 904313143 ecr 523470180], length 112
    	0x0000:  4500 00a4 29f9 4000 4006 b829 ac13 0008  E...).@.@..)....
    	0x0010:  ac13 0003 d202 2383 6b7a d6d2 248e 9a48  ......#.kz..$..H
    	0x0020:  8018 01f5 58c8 0000 0101 080a 35e6 b937  ....X.......5..7
    	0x0030:  1f33 8564 0000 006c 000c 0003 0000 0016  .3.d...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3964  _is_client_id-9d
    	0x0080:  3131 3932 3535 2d30 6264 362d 3431 3031  119255-0bd6-4101
    	0x0090:  2d39 3666 362d 6138 6235 3064 6636 6565  -96f6-a8b50df6ee
    	0x00a0:  6331 ffff                                c1..
    22:23:31.080844 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 969:1081, ack 330, win 501, options [nop,nop,TS val 984621478 ecr 4171243919], length 112
    	0x0000:  4500 00a4 46e2 4000 4006 9b3f ac13 0009  E...F.@.@..?....
    	0x0010:  ac13 0003 8084 2383 cb9b 8e33 8bdf 7af1  ......#....3..z.
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3ab0 21a6  ....X.......:.!.
    	0x0030:  f8a0 218f 0000 006c 000c 0003 0000 0016  ..!....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3239  _is_client_id-29
    	0x0080:  3863 6139 3861 2d34 3163 392d 3434 3436  8ca98a-41c9-4446
    	0x0090:  2d61 3334 332d 3265 3335 3163 6561 3062  -a343-2e351cea0b
    	0x00a0:  6265 ffff                                be..

     

     

    Rebalance In Progress 에러 코드.

    REBALANCE_IN_PROGRESS 의 코드는 27 번으로 16진수로 0x1b 로 표현됩니다.

    브로커는 아래와 같은 형식으로 컨슈머에게 리밸런싱임을 알리게 됩니다.

    "kafka1.9091 > sharp_nash.kafka-net.39810" 이 브로커에서 클라이언트로 패킷이 전달되는 방향을 의미하며,

    "0x0040, 0x0041" 위치의 바이트가 REBALANCE_IN_PROGRESS 에러 코드를 의미합니다.

    22:23:26.011282 IP kafka1.9091 > sharp_nash.kafka-net.39810: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 2925920954 ecr 3620274784], length 14
    	0x0000:  4500 0042 1d59 4000 4006 c52d ac13 0003  E..B.Y@.@..-....
    	0x0010:  ac13 0006 2383 9b82 b22f d4e2 8f30 ddc5  ....#..../...0..
    	0x0020:  8018 c526 5864 0000 0101 080a ae66 02ba  ...&Xd.......f..
    	0x0030:  d7c9 0260 0000 000a 0000 0018 0000 0000  ...`............
    	0x0040:  001b

     

     

    Rebalance 2) Consumer 의 종료 상황 ( LeaveGroup ).

    컨슈너는 Graceful Shutdown 방식으로 종료될 때에 LeaveGroup API 를 호출하게 됩니다.

    아래는 종료되는 Kafka Consumer 가 브로커에게 LeaveGroup API 를 요청하는 TCP Packet 입니다.

    0x0038,0x0039 바이트에서 표시된 000d 가 13번에 해당하는 LeaveGroup API 의 Key 입니다.

     

    그리고 Group Coordinator 는 컨슈머들에서 Rebalance In Process (001b) 에러 응답을 전송하고 리밸런싱을 시작합니다.

     

    21:53:01.351590 IP competent_rubin.kafka-net.40950 > kafka2.9091: Flags [P.], seq 2484:2590, ack 604, win 501, options [nop,nop,TS val 1006060414 ecr 2406605886], length 106
    	0x0000:  4500 009e bad6 4000 4006 2751 ac13 0008  E.....@.@.'Q....
    	0x0010:  ac13 0004 9ff6 2383 5966 07eb 8775 8b5a  ......#.Yf...u.Z
    	0x0020:  8018 01f5 58c3 0000 0101 080a 3bf7 437e  ....X.......;.C~
    	0x0030:  8f71 e43e 0000 0066 000d 0001 0000 0017  .q.>...f........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0036 7468 6973 5f69 735f  -group.6this_is_
    	0x0070:  636c 6965 6e74 5f69 642d 3131 3038 6636  client_id-1108f6
    	0x0080:  6432 2d66 3362 322d 3431 6630 2d62 3635  d2-f3b2-41f0-b65
    	0x0090:  392d 3438 3231 3833 6365 6137 3231       9-482183cea721
        
    21:53:02.449809 IP kafka2.9091 > affectionate_babbage.kafka-net.44196: Flags [P.], seq 1709:1723, ack 3643, win 50470, options [nop,nop,TS val 1315012574 ecr 692413949], length 14
    	0x0000:  4500 0042 4c4a 4000 4006 963b ac13 0004  E..BLJ@.@..;....
    	0x0010:  ac13 0006 2383 aca4 2384 5ad9 f567 f74d  ....#...#.Z..g.M
    	0x0020:  8018 c526 5865 0000 0101 080a 4e61 7fde  ...&Xe......Na..
    	0x0030:  2945 65fd 0000 000a 0000 001e 0000 0000  )Ee.............
    	0x0040:  001b

     

     

    Rebalance 3) Consumer 가 강제 종료되는 상황.

    컨슈머가 만약에 강제로 종료된다면 이 상황에서는 즉시 리밸런싱이 발생하지 않습니다.

    강제로 종료된 Consumer 가 session.timeout.ms 동안에 Heartbeat 를 전송하지 않을 것이고,

    session.timeout.ms 시간 이후에 리밸런싱이 시작됩니다.

     

    아래는 2개의 Consumer 들이 Heartbeat 를 전송하는 상황인데요.

    22:33:18 시점에 하나의 Consumer 가 종료되었고, 22:33:28 인 10초 경과 이후에 리밸런싱이 본격적으로 시작됩니다.

    ( heartbeat.interval.ms : 5초, session.timeout.ms : 6초 )

     

    // Consumer 1 Heartbeat 전송
    22:33:18.489497 IP inspiring_torvalds.kafka-net.43690 > kafka2.9091: Flags [P.], seq 1963:2075, ack 1513, win 501, options [nop,nop,TS val 694827921 ecr 1317421521], length 112
    	0x0000:  4500 00a4 5fdf 4000 4006 8244 ac13 0006  E..._.@.@..D....
    	0x0010:  ac13 0004 aaaa 2383 573e c929 0632 84c1  ......#.W>.).2..
    	0x0020:  8018 01f5 58c7 0000 0101 080a 296a 3b91  ....X.......)j;.
    	0x0030:  4e86 41d1 0000 006c 000c 0003 0000 0010  N.A....l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0013 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6462  _is_client_id-db
    	0x0080:  3939 6261 3462 2d35 3239 632d 3430 6131  99ba4b-529c-40a1
    	0x0090:  2d61 6666 662d 3163 3666 3365 6139 6266  -afff-1c6f3ea9bf
    	0x00a0:  3762 ffff                                7b..
    
    // Consumer 2 Heartbeat 전송
    22:33:18.493039 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1028:1140, ack 422, win 501, options [nop,nop,TS val 1008475469 ecr 2409019861], length 112
    	0x0000:  4500 00a4 471a 4000 4006 9b07 ac13 0008  E...G.@.@.......
    	0x0010:  ac13 0004 8f2c 2383 342f 7efc d795 8535  .....,#.4/~....5
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3c1c 1d4d  ....X.......<..M
    	0x0030:  8f96 b9d5 0000 006c 000c 0003 0000 000a  .......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0013 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3762  _is_client_id-7b
    	0x0080:  3630 3164 6631 2d62 3431 302d 3438 3434  601df1-b410-4844
    	0x0090:  2d39 6233 642d 3438 3837 6530 3864 3731  -9b3d-4887e08d71
    	0x00a0:  3663 ffff                                6c..
    
    // Consumer 2 Heartbeat 전송
    22:33:23.525129 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1140:1252, ack 436, win 501, options [nop,nop,TS val 1008480501 ecr 2409024893], length 112
    	0x0000:  4500 00a4 471c 4000 4006 9b05 ac13 0008  E...G.@.@.......
    	0x0010:  ac13 0004 8f2c 2383 342f 7f6c d795 8543  .....,#.4/.l...C
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3c1c 30f5  ....X.......<.0.
    	0x0030:  8f96 cd7d 0000 006c 000c 0003 0000 000b  ...}...l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0013 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3762  _is_client_id-7b
    	0x0080:  3630 3164 6631 2d62 3431 302d 3438 3434  601df1-b410-4844
    	0x0090:  2d39 6233 642d 3438 3837 6530 3864 3731  -9b3d-4887e08d71
    	0x00a0:  3663 ffff                                6c..
    
    // Consumer 2 Heartbeat 전송
    22:33:28.545167 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1252:1364, ack 450, win 501, options [nop,nop,TS val 1008485522 ecr 2409029908], length 112
    	0x0000:  4500 00a4 471e 4000 4006 9b03 ac13 0008  E...G.@.@.......
    	0x0010:  ac13 0004 8f2c 2383 342f 7fdc d795 8551  .....,#.4/.....Q
    	0x0020:  8018 01f5 58c9 0000 0101 080a 3c1c 4492  ....X.......<.D.
    	0x0030:  8f96 e114 0000 006c 000c 0003 0000 000c  .......l........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0013 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3762  _is_client_id-7b
    	0x0080:  3630 3164 6631 2d62 3431 302d 3438 3434  601df1-b410-4844
    	0x0090:  2d39 6233 642d 3438 3837 6530 3864 3731  -9b3d-4887e08d71
    	0x00a0:  3663 ffff                                6c..
        
    // Broker 가 Rebalance In Progress 를 Consumer 에게 알림.
    22:33:28.560668 IP kafka2.9091 > distracted_elgamal.kafka-net.36652: Flags [P.], seq 450:464, ack 1364, win 50470, options [nop,nop,TS val 2409034929 ecr 1008485522], length 14
    	0x0000:  4500 0042 a43a 4000 4006 3e49 ac13 0004  E..B.:@.@.>I....
    	0x0010:  ac13 0008 2383 8f2c d795 8551 342f 804c  ....#..,...Q4/.L
    	0x0020:  8018 c526 5867 0000 0101 080a 8f96 f4b1  ...&Xg..........
    	0x0030:  3c1c 4492 0000 000a 0000 000c 0000 0000  <.D.............
    	0x0040:  001b

     

     

    Rebalance 4) Partition 이 추가되는 상황.

    토픽의 파티션이 추가되는 상황에서 리밸런싱이 발생합니다.

    아래의 명령어는 기존의 Topic 에 Partition 을 변경하는 명령어의 예시입니다.

    kafka-topics.sh \
        --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
        --alter \
        --topic test-topic \
        --partitions 4

     

    파티션이  추가되는 경우에는 Metadata 의 갱신과 리밸런싱이 관련됩니다.

    Consumer 는 metadata.max.age.ms 를 주기로 메타데이터를 갱신하는데요.

    기본적으로 metadata.max.age.ms 는 5분으로 설정됩니다.

    그래서 metadata 를 갱신한 이후 Partition 의 정보가 기존의 정보와 일치하지 않는 경우에

    JoinGroup, SyncGroup 를 새롭게 요청함으로써 리밸런싱이 발생합니다.

     

    이는 모듈마다 차이가 존재합니다.

    python 의 confluent-kafka 모듈의 경우에는 5분 이상의 시간이 소요된 이후에 Rebalancing 이 시작되었고,

    java 의 kafka-clients 모듈의 경우에는 2분 이내에 Rebalacing 이 시작되었습니다.

     

    파티션이 변경되는 경우에는 Kafka Consumer 를 재실행하는 방식으로 파티션-컨슈머의 할당을 시도하는 것이 효율적이라고 판단됩니다.

     

     

     

     

     

    카프카 클러스터 실행하기.

     

    아래는 카프카 클러스터를 실행하기 위한 Zookeeper 와 Kafka Broker 의 도커 명령어입니다.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka3 --hostname kafka3 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=3 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:39092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9092,INNER://kafka3:9091,OUTER://localhost:39092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 39092:39092 \
      bitnami/kafka:3.1.2

     

     

    컨슈머 실행 스크립트.

    cat <<EOF> /tmp/code.py
    import signal
    import sys
    from confluent_kafka import Consumer, KafkaError
    
    config = {
        'client.id': 'this_is_client_id',
        'bootstrap.servers': 'kafka1:9091,kafka2:9091,kafka3:9091',
        'group.id': 'my-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'heartbeat.interval.ms': 5000,
        'session.timeout.ms': 6000,
        'metadata.max.age.ms': 10000
    }
    
    consumer = Consumer(config)
    consumer.subscribe(['test-topic'])
    
    def signal_handler(sig, frame):
        print("\nShutdown signal received. Closing Kafka consumer...")
        consumer.close()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    try:
        while True:
            msg = consumer.poll(1)
            if msg is not None and not msg.error():
                print(f"Received message: {msg.value().decode('utf-8')}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        print("Closing Kafka consumer...")
        consumer.close()
    EOF
    
    docker run -d --rm --network kafka-net -v /tmp/code.py:/tmp/code.py python:3.9-bullseye sh -c 'pip install confluent-kafka && python /tmp/code.py'
    docker run -it --rm --network kafka-net openjdk:11 bash -c '
    
    
    
    wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar -P /app/libs/
    wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar -P /app/libs/
    
    
    cat <<EOF > /app/KafkaConsumerExample.java
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka1:9092");
            props.put("group.id", "my-consumer-group");
            props.put("metadata.max.age.ms", 10000L);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("auto.offset.reset", "earliest");
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("test-topic"));  // 토픽 구독
    
            System.out.println("Kafka Consumer started. Waiting for messages...");
    
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
                                record.key(), record.value(), record.partition(), record.offset());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
    
    EOF
    
    javac -cp "/app/libs/kafka-clients-3.5.1.jar:/app/libs/slf4j-api-1.7.36.jar:/app/libs/slf4j-nop-1.7.36.jar" /app/KafkaConsumerExample.java
    cd /app && java -cp "/app/libs/kafka-clients-3.5.1.jar:/app/libs/slf4j-api-1.7.36.jar:/app/libs/slf4j-nop-1.7.36.jar:." KafkaConsumerExample
    '

     

    반응형
Designed by Tistory.