-
[Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까?Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
카프카 컨슈머는 실행되면서 Consumer Group 을 형성합니다.
동일한 group.id 를 가지는 여러 컨슈머들은 하나의 Consumer Group 에 소속되게 됩니다.
이 과정에서 JoinGroup 과 SyncGroup API 를 활용하게 되고, Consumer Group 의 형성과 파티션 할당이 이루어집니다.
그리고 여러가지 상황에 의해서 컨슈머가 추가되거나 종료될 수 있는데요.
이 과정에서 리밸런싱이 발생합니다.
이번 글에서는 컨슈머의 여러가지 동작에 따른 리밸런싱이 어떻게 발생하는지 알아보는 시간을 가지려고 합니다.
Rebalance 1) 새로운 컨슈머가 추가되는 상황.
기존의 Consumer Group 이 존재하는 상황에서 새로운 컨슈머가 추가됩니다.
이때에 새로운 컨슈머는 JoinGroup API Request 를 브로커에게 요청하게 됩니다.
이러한 상황이 발생하면 Group Coordinator 는 기존의 Consumer Group 에 소속된 컨슈머들에게 REBALANCE_IN_PROGRESS 를 전달하게 됩니다.
일반적으로 Group Coordinator 는 컨슈머가 요청한 Heartbeat Request 의 응답으로써 REBALANCE_IN_PROGRESS 상태임을 전파합니다.
Consumer 는 보통 3초 단위로 Heartbeat 를 요청하게 되므로 컨슈머들은 최대 3초 내로 리밸런싱 상황을 감지할 수 있습니다.
아래는 이와 관련된 TCP 패킷의 동작입니다.
내용이 복잡한 관계로 간단히 정리를 하자면,
- heartbeat.interval.ms 를 주기로 3개의 Consumer 들은 Heartbeat 를 브로커에게 전송합니다.
- 그러다가 4번째 Consumer 가 브로커에게 JoinGroup 요청을 보냅니다.
- 그럼 Group Coordinator 는 새로운 Consumer 가 Consumer Group 에 참여함을 인지하고
- 모든 Consumer 들에게 REBALANCE_IN_PROGRESS 에러 응답을 전송합니다.
- 리밸런싱을 컨슈머에게 전달하는 방식을 Consumer 가 보낸 Heartbeat 요청의 응답을 통해서 전달합니다.
이렇게 리밸런싱 상황임을 전달받은 브로커들은 JoinGroup 과 SyncGroup 요청을 다시 전달하면서 리밸런싱을 수행합니다.
// Consumer 1 의 Heartbeat 요청 22:23:20.985039 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 3620269773 ecr 2925910942], length 112 0x0000: 4500 00a4 5d93 4000 4006 8491 ac13 0006 E...].@.@....... 0x0010: ac13 0003 9b82 2383 8f30 dce5 b22f d4d4 ......#..0.../.. 0x0020: 8018 01f5 58c6 0000 0101 080a d7c8 eecd ....X........... 0x0030: ae65 db9e 0000 006c 000c 0003 0000 0017 .e.....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3234 _is_client_id-24 0x0080: 3965 3133 6139 2d32 6639 302d 3461 6362 9e13a9-2f90-4acb 0x0090: 2d39 3934 332d 6137 3265 6639 6564 3532 -9943-a72ef9ed52 0x00a0: 3061 ffff 0a.. // Consumer 2 의 Heartbeat 요청 22:23:20.985627 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 904303057 ecr 523460100], length 112 0x0000: 4500 00a4 29f1 4000 4006 b831 ac13 0008 E...).@.@..1.... 0x0010: ac13 0003 d202 2383 6b7a d459 248e 9929 ......#.kz.Y$..) 0x0020: 8018 01f5 58c8 0000 0101 080a 35e6 91d1 ....X.......5... 0x0030: 1f33 5e04 0000 006c 000c 0003 0000 0011 .3^....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3964 _is_client_id-9d 0x0080: 3131 3932 3535 2d30 6264 362d 3431 3031 119255-0bd6-4101 0x0090: 2d39 3666 362d 6138 6235 3064 6636 6565 -96f6-a8b50df6ee 0x00a0: 6331 ffff c1.. // Consumer 3 의 Heartbeat 요청 22:23:20.985772 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 336:448, ack 43, win 501, options [nop,nop,TS val 984611383 ecr 4171233825], length 112 0x0000: 4500 00a4 46da 4000 4006 9b47 ac13 0009 E...F.@.@..G.... 0x0010: ac13 0003 8084 2383 cb9b 8bba 8bdf 79d2 ......#.......y. 0x0020: 8018 01f5 58c9 0000 0101 080a 3aaf fa37 ....X.......:..7 0x0030: f89f fa21 0000 006c 000c 0003 0000 0011 ...!...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3239 _is_client_id-29 0x0080: 3863 6139 3861 2d34 3163 392d 3434 3436 8ca98a-41c9-4446 0x0090: 2d61 3334 332d 3265 3335 3163 6561 3062 -a343-2e351cea0b 0x00a0: 6265 ffff be.. // Consumer 4 의 JoinGroup 요청 22:23:24.371426 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 3293203651:3293203818, ack 1672368758, win 501, options [nop,nop,TS val 737703532 ecr 2072872991], length 167 0x0000: 4500 00db a2cc 4000 4006 3f1d ac13 000a E.....@.@.?..... 0x0010: ac13 0003 cad2 2383 c44a 4cc3 63ae 5276 ......#..JL.c.Rv 0x0020: 8018 01f5 5901 0000 0101 080a 2bf8 766c ....Y.......+.vl 0x0030: 7b8d 881f 0000 00a3 000b 0005 0000 0002 {............... 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0000 -group...p...... 0x0070: ffff 0008 636f 6e73 756d 6572 0000 0002 ....consumer.... 0x0080: 0005 7261 6e67 6500 0000 2000 0300 0000 ..range......... 0x0090: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00a0: 0000 0000 00ff ffff ff00 0000 0a72 6f75 .............rou 0x00b0: 6e64 726f 6269 6e00 0000 2000 0300 0000 ndrobin......... 0x00c0: 0100 0a74 6573 742d 746f 7069 6300 0000 ...test-topic... 0x00d0: 0000 0000 00ff ffff ff00 00 ........... // Consumer 4 의 JoinGroup 요청 22:23:24.374787 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 167:388, ack 83, win 501, options [nop,nop,TS val 737703535 ecr 2072873002], length 221 0x0000: 4500 0111 a2cd 4000 4006 3ee6 ac13 000a E.....@.@.>..... 0x0010: ac13 0003 cad2 2383 c44a 4d6a 63ae 52c8 ......#..JMjc.R. 0x0020: 8018 01f5 5937 0000 0101 080a 2bf8 766f ....Y7......+.vo 0x0030: 7b8d 882a 0000 00d9 000b 0005 0000 0003 {..*............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0036 -group...p.....6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 6131 6662 3530 3163 2d63 3061 332d d-a1fb501c-c0a3- 0x0090: 3435 3262 2d62 6634 632d 3937 6330 6662 452b-bf4c-97c0fb 0x00a0: 3437 3030 3838 ffff 0008 636f 6e73 756d 470088....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 . // Consumer 2 의 Heartbeat 요청 22:23:25.994514 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 904308066 ecr 523465112], length 112 0x0000: 4500 00a4 29f3 4000 4006 b82f ac13 0008 E...).@.@../.... 0x0010: ac13 0003 d202 2383 6b7a d4c9 248e 9937 ......#.kz..$..7 0x0020: 8018 01f5 58c8 0000 0101 080a 35e6 a562 ....X.......5..b 0x0030: 1f33 7198 0000 006c 000c 0003 0000 0012 .3q....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3964 _is_client_id-9d 0x0080: 3131 3932 3535 2d30 6264 362d 3431 3031 119255-0bd6-4101 0x0090: 2d39 3666 362d 6138 6235 3064 6636 6565 -96f6-a8b50df6ee 0x00a0: 6331 ffff c1.. // Consumer 1 의 Heartbeat 요청 22:23:25.995465 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 3620274784 ecr 2925915962], length 112 0x0000: 4500 00a4 5d95 4000 4006 848f ac13 0006 E...].@.@....... 0x0010: ac13 0003 9b82 2383 8f30 dd55 b22f d4e2 ......#..0.U./.. 0x0020: 8018 01f5 58c6 0000 0101 080a d7c9 0260 ....X..........` 0x0030: ae65 ef3a 0000 006c 000c 0003 0000 0018 .e.:...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3234 _is_client_id-24 0x0080: 3965 3133 6139 2d32 6639 302d 3461 6362 9e13a9-2f90-4acb 0x0090: 2d39 3934 332d 6137 3265 6639 6564 3532 -9943-a72ef9ed52 0x00a0: 3061 ffff 0a.. // Consumer 3 의 Heartbeat 요청 22:23:25.998548 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 448:560, ack 57, win 501, options [nop,nop,TS val 984616396 ecr 4171238859], length 112 0x0000: 4500 00a4 46dc 4000 4006 9b45 ac13 0009 E...F.@.@..E.... 0x0010: ac13 0003 8084 2383 cb9b 8c2a 8bdf 79e0 ......#....*..y. 0x0020: 8018 01f5 58c9 0000 0101 080a 3ab0 0dcc ....X.......:... 0x0030: f8a0 0dcb 0000 006c 000c 0003 0000 0012 .......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0002 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3239 _is_client_id-29 0x0080: 3863 6139 3861 2d34 3163 392d 3434 3436 8ca98a-41c9-4446 0x0090: 2d61 3334 332d 3265 3335 3163 6561 3062 -a343-2e351cea0b 0x00a0: 6265 ffff be.. // 브로커가 Consumer 2 에게 Rebalance In Progress 알림. 22:23:26.003516 IP kafka1.9091 > ecstatic_ramanujan.kafka-net.53762: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 523470105 ecr 904308066], length 14 0x0000: 4500 0042 d176 4000 4006 110e ac13 0003 E..B.v@.@....... 0x0010: ac13 0008 2383 d202 248e 9937 6b7a d539 ....#...$..7kz.9 0x0020: 8018 c526 5866 0000 0101 080a 1f33 8519 ...&Xf.......3.. 0x0030: 35e6 a562 0000 000a 0000 0012 0000 0000 5..b............ 0x0040: 001b .. // Consumer 2 의 JoinGroup 요청 22:23:26.004135 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 904308075 ecr 523470105], length 221 0x0000: 4500 0111 29f5 4000 4006 b7c0 ac13 0008 E...).@.@....... 0x0010: ac13 0003 d202 2383 6b7a d539 248e 9945 ......#.kz.9$..E 0x0020: 8018 01f5 5935 0000 0101 080a 35e6 a56b ....Y5......5..k 0x0030: 1f33 8519 0000 00d9 000b 0005 0000 0013 .3.............. 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0036 -group...p.....6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 3964 3131 3932 3535 2d30 6264 362d d-9d119255-0bd6- 0x0090: 3431 3031 2d39 3666 362d 6138 6235 3064 4101-96f6-a8b50d 0x00a0: 6636 6565 6331 ffff 0008 636f 6e73 756d f6eec1....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 . // 브로커가 Consumer 1 에게 Rebalance In Progress 알림. 22:23:26.011282 IP kafka1.9091 > sharp_nash.kafka-net.39810: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 2925920954 ecr 3620274784], length 14 0x0000: 4500 0042 1d59 4000 4006 c52d ac13 0003 E..B.Y@.@..-.... 0x0010: ac13 0006 2383 9b82 b22f d4e2 8f30 ddc5 ....#..../...0.. 0x0020: 8018 c526 5864 0000 0101 080a ae66 02ba ...&Xd.......f.. 0x0030: d7c9 0260 0000 000a 0000 0018 0000 0000 ...`............ 0x0040: 001b .. // Consumer 1 의 JoinGroup 요청 22:23:26.011686 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 3620274800 ecr 2925920954], length 221 0x0000: 4500 0111 5d97 4000 4006 8420 ac13 0006 E...].@.@....... 0x0010: ac13 0003 9b82 2383 8f30 ddc5 b22f d4f0 ......#..0.../.. 0x0020: 8018 01f5 5933 0000 0101 080a d7c9 0270 ....Y3.........p 0x0030: ae66 02ba 0000 00d9 000b 0005 0000 0019 .f.............. 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0036 -group...p.....6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 3234 3965 3133 6139 2d32 6639 302d d-249e13a9-2f90- 0x0090: 3461 6362 2d39 3934 332d 6137 3265 6639 4acb-9943-a72ef9 0x00a0: 6564 3532 3061 ffff 0008 636f 6e73 756d ed520a....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 . // 브로커가 Consumer 3 에게 Rebalance In Progress 알림. 22:23:26.016836 IP kafka1.9091 > cranky_herschel.kafka-net.32900: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 4171243856 ecr 984616396], length 14 0x0000: 4500 0042 fd92 4000 4006 e4f0 ac13 0003 E..B..@.@....... 0x0010: ac13 0009 2383 8084 8bdf 79e0 cb9b 8c9a ....#.....y..... 0x0020: 8018 c526 5867 0000 0101 080a f8a0 2150 ...&Xg........!P 0x0030: 3ab0 0dcc 0000 000a 0000 0012 0000 0000 :............... 0x0040: 001b .. // Consumer 3 의 JoinGroup 요청 22:23:26.017284 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 560:781, ack 71, win 501, options [nop,nop,TS val 984616414 ecr 4171243856], length 221 0x0000: 4500 0111 46de 4000 4006 9ad6 ac13 0009 E...F.@.@....... 0x0010: ac13 0003 8084 2383 cb9b 8c9a 8bdf 79ee ......#.......y. 0x0020: 8018 01f5 5936 0000 0101 080a 3ab0 0dde ....Y6......:... 0x0030: f8a0 2150 0000 00d9 000b 0005 0000 0013 ..!P............ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 1770 0004 93e0 0036 -group...p.....6 0x0070: 7468 6973 5f69 735f 636c 6965 6e74 5f69 this_is_client_i 0x0080: 642d 3239 3863 6139 3861 2d34 3163 392d d-298ca98a-41c9- 0x0090: 3434 3436 2d61 3334 332d 3265 3335 3163 4446-a343-2e351c 0x00a0: 6561 3062 6265 ffff 0008 636f 6e73 756d ea0bbe....consum 0x00b0: 6572 0000 0002 0005 7261 6e67 6500 0000 er......range... 0x00c0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x00d0: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x00e0: 0000 0a72 6f75 6e64 726f 6269 6e00 0000 ...roundrobin... 0x00f0: 2000 0300 0000 0100 0a74 6573 742d 746f .........test-to 0x0100: 7069 6300 0000 0000 0000 00ff ffff ff00 pic............. 0x0110: 00 . // Consumer 4 의 SyncGroup 요청 22:23:26.027169 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 388:504, ack 224, win 501, options [nop,nop,TS val 737705187 ecr 2072874654], length 116 0x0000: 4500 00a8 a2ce 4000 4006 3f4e ac13 000a E.....@.@.?N.... 0x0010: ac13 0003 cad2 2383 c44a 4e47 63ae 5355 ......#..JNGc.SU 0x0020: 8018 01f5 58ce 0000 0101 080a 2bf8 7ce3 ....X.......+.|. 0x0030: 7b8d 8e9e 0000 0070 000e 0003 0000 0004 {......p........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6131 _is_client_id-a1 0x0080: 6662 3530 3163 2d63 3061 332d 3435 3262 fb501c-c0a3-452b 0x0090: 2d62 6634 632d 3937 6330 6662 3437 3030 -bf4c-97c0fb4700 0x00a0: 3838 ffff 0000 0000 88...... // Consumer 2 의 SyncGroup 요청 22:23:26.027769 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 781:897, ack 212, win 501, options [nop,nop,TS val 904308099 ecr 523470129], length 116 0x0000: 4500 00a8 29f6 4000 4006 b828 ac13 0008 E...).@.@..(.... 0x0010: ac13 0003 d202 2383 6b7a d616 248e 99d2 ......#.kz..$... 0x0020: 8018 01f5 58cc 0000 0101 080a 35e6 a583 ....X.......5... 0x0030: 1f33 8531 0000 0070 000e 0003 0000 0014 .3.1...p........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3964 _is_client_id-9d 0x0080: 3131 3932 3535 2d30 6264 362d 3431 3031 119255-0bd6-4101 0x0090: 2d39 3666 362d 6138 6235 3064 6636 6565 -96f6-a8b50df6ee 0x00a0: 6331 ffff 0000 0000 c1...... // Consumer 3 의 SyncGroup 요청 22:23:26.027772 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 781:897, ack 212, win 501, options [nop,nop,TS val 984616425 ecr 4171243867], length 116 0x0000: 4500 00a8 46df 4000 4006 9b3e ac13 0009 E...F.@.@..>.... 0x0010: ac13 0003 8084 2383 cb9b 8d77 8bdf 7a7b ......#....w..z{ 0x0020: 8018 01f5 58cd 0000 0101 080a 3ab0 0de9 ....X.......:... 0x0030: f8a0 215b 0000 0070 000e 0003 0000 0014 ..![...p........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3239 _is_client_id-29 0x0080: 3863 6139 3861 2d34 3163 392d 3434 3436 8ca98a-41c9-4446 0x0090: 2d61 3334 332d 3265 3335 3163 6561 3062 -a343-2e351cea0b 0x00a0: 6265 ffff 0000 0000 be...... // Consumer 1 의 SyncGroup 요청 22:23:26.031485 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 845:1301, ack 844, win 501, options [nop,nop,TS val 3620274820 ecr 2925920974], length 456 0x0000: 4500 01fc 5d99 4000 4006 8333 ac13 0006 E...].@.@..3.... 0x0010: ac13 0003 9b82 2383 8f30 dee2 b22f d7f5 ......#..0.../.. 0x0020: 8018 01f5 5a1e 0000 0101 080a d7c9 0284 ....Z........... 0x0030: ae66 02ce 0000 01c4 000e 0003 0000 001b .f.............. 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3234 _is_client_id-24 0x0080: 3965 3133 6139 2d32 6639 302d 3461 6362 9e13a9-2f90-4acb 0x0090: 2d39 3934 332d 6137 3265 6639 6564 3532 -9943-a72ef9ed52 0x00a0: 3061 ffff 0000 0004 0036 7468 6973 5f69 0a.......6this_i 0x00b0: 735f 636c 6965 6e74 5f69 642d 3234 3965 s_client_id-249e 0x00c0: 3133 6139 2d32 6639 302d 3461 6362 2d39 13a9-2f90-4acb-9 0x00d0: 3934 332d 6137 3265 6639 6564 3532 3061 943-a72ef9ed520a 0x00e0: 0000 001e 0000 0000 0001 000a 7465 7374 ............test 0x00f0: 2d74 6f70 6963 0000 0001 0000 0000 0000 -topic.......... 0x0100: 0000 0036 7468 6973 5f69 735f 636c 6965 ...6this_is_clie 0x0110: 6e74 5f69 642d 6131 6662 3530 3163 2d63 nt_id-a1fb501c-c 0x0120: 3061 332d 3435 3262 2d62 6634 632d 3937 0a3-452b-bf4c-97 0x0130: 6330 6662 3437 3030 3838 0000 000a 0000 c0fb470088...... 0x0140: 0000 0000 0000 0000 0036 7468 6973 5f69 .........6this_i 0x0150: 735f 636c 6965 6e74 5f69 642d 3964 3131 s_client_id-9d11 0x0160: 3932 3535 2d30 6264 362d 3431 3031 2d39 9255-0bd6-4101-9 0x0170: 3666 362d 6138 6235 3064 6636 6565 6331 6f6-a8b50df6eec1 0x0180: 0000 001e 0000 0000 0001 000a 7465 7374 ............test 0x0190: 2d74 6f70 6963 0000 0001 0000 0002 0000 -topic.......... 0x01a0: 0000 0036 7468 6973 5f69 735f 636c 6965 ...6this_is_clie 0x01b0: 6e74 5f69 642d 3239 3863 6139 3861 2d34 nt_id-298ca98a-4 0x01c0: 3163 392d 3434 3436 2d61 3334 332d 3265 1c9-4446-a343-2e 0x01d0: 3335 3163 6561 3062 6265 0000 001e 0000 351cea0bbe...... 0x01e0: 0000 0001 000a 7465 7374 2d74 6f70 6963 ......test-topic 0x01f0: 0000 0001 0000 0001 0000 0000 ............ // 다시 모든 Consumer 들이 Heartbeat 전송 22:23:26.051939 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 504:616, ack 252, win 501, options [nop,nop,TS val 737705212 ecr 2072874679], length 112 0x0000: 4500 00a4 a2cf 4000 4006 3f51 ac13 000a E.....@.@.?Q.... 0x0010: ac13 0003 cad2 2383 c44a 4ebb 63ae 5371 ......#..JN.c.Sq 0x0020: 8018 01f5 58ca 0000 0101 080a 2bf8 7cfc ....X.......+.|. 0x0030: 7b8d 8eb7 0000 006c 000c 0003 0000 0005 {......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6131 _is_client_id-a1 0x0080: 6662 3530 3163 2d63 3061 332d 3435 3262 fb501c-c0a3-452b 0x0090: 2d62 6634 632d 3937 6330 6662 3437 3030 -bf4c-97c0fb4700 0x00a0: 3838 ffff 88.. 22:23:31.067122 IP jovial_jackson.kafka-net.51922 > kafka1.9091: Flags [P.], seq 616:728, ack 266, win 501, options [nop,nop,TS val 737710227 ecr 2072874706], length 112 0x0000: 4500 00a4 a2d1 4000 4006 3f4f ac13 000a E.....@.@.?O.... 0x0010: ac13 0003 cad2 2383 c44a 4f2b 63ae 537f ......#..JO+c.S. 0x0020: 8018 01f5 58ca 0000 0101 080a 2bf8 9093 ....X.......+... 0x0030: 7b8d 8ed2 0000 006c 000c 0003 0000 0006 {......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6131 _is_client_id-a1 0x0080: 6662 3530 3163 2d63 3061 332d 3435 3262 fb501c-c0a3-452b 0x0090: 2d62 6634 632d 3937 6330 6662 3437 3030 -bf4c-97c0fb4700 0x00a0: 3838 ffff 88.. 22:23:31.070401 IP sharp_nash.kafka-net.39810 > kafka1.9091: Flags [P.], seq 1373:1485, ack 962, win 501, options [nop,nop,TS val 3620279858 ecr 2925921022], length 112 0x0000: 4500 00a4 5d9c 4000 4006 8488 ac13 0006 E...].@.@....... 0x0010: ac13 0003 9b82 2383 8f30 e0f2 b22f d86b ......#..0.../.k 0x0020: 8018 01f5 58c6 0000 0101 080a d7c9 1632 ....X..........2 0x0030: ae66 02fe 0000 006c 000c 0003 0000 001d .f.....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3234 _is_client_id-24 0x0080: 3965 3133 6139 2d32 6639 302d 3461 6362 9e13a9-2f90-4acb 0x0090: 2d39 3934 332d 6137 3265 6639 6564 3532 -9943-a72ef9ed52 0x00a0: 3061 ffff 0a.. 22:23:31.071841 IP ecstatic_ramanujan.kafka-net.53762 > kafka1.9091: Flags [P.], seq 969:1081, ack 330, win 501, options [nop,nop,TS val 904313143 ecr 523470180], length 112 0x0000: 4500 00a4 29f9 4000 4006 b829 ac13 0008 E...).@.@..).... 0x0010: ac13 0003 d202 2383 6b7a d6d2 248e 9a48 ......#.kz..$..H 0x0020: 8018 01f5 58c8 0000 0101 080a 35e6 b937 ....X.......5..7 0x0030: 1f33 8564 0000 006c 000c 0003 0000 0016 .3.d...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3964 _is_client_id-9d 0x0080: 3131 3932 3535 2d30 6264 362d 3431 3031 119255-0bd6-4101 0x0090: 2d39 3666 362d 6138 6235 3064 6636 6565 -96f6-a8b50df6ee 0x00a0: 6331 ffff c1.. 22:23:31.080844 IP cranky_herschel.kafka-net.32900 > kafka1.9091: Flags [P.], seq 969:1081, ack 330, win 501, options [nop,nop,TS val 984621478 ecr 4171243919], length 112 0x0000: 4500 00a4 46e2 4000 4006 9b3f ac13 0009 E...F.@.@..?.... 0x0010: ac13 0003 8084 2383 cb9b 8e33 8bdf 7af1 ......#....3..z. 0x0020: 8018 01f5 58c9 0000 0101 080a 3ab0 21a6 ....X.......:.!. 0x0030: f8a0 218f 0000 006c 000c 0003 0000 0016 ..!....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0003 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3239 _is_client_id-29 0x0080: 3863 6139 3861 2d34 3163 392d 3434 3436 8ca98a-41c9-4446 0x0090: 2d61 3334 332d 3265 3335 3163 6561 3062 -a343-2e351cea0b 0x00a0: 6265 ffff be..
Rebalance In Progress 에러 코드.
REBALANCE_IN_PROGRESS 의 코드는 27 번으로 16진수로 0x1b 로 표현됩니다.
브로커는 아래와 같은 형식으로 컨슈머에게 리밸런싱임을 알리게 됩니다.
"kafka1.9091 > sharp_nash.kafka-net.39810" 이 브로커에서 클라이언트로 패킷이 전달되는 방향을 의미하며,
"0x0040, 0x0041" 위치의 바이트가 REBALANCE_IN_PROGRESS 에러 코드를 의미합니다.
22:23:26.011282 IP kafka1.9091 > sharp_nash.kafka-net.39810: Flags [P.], seq 57:71, ack 560, win 50470, options [nop,nop,TS val 2925920954 ecr 3620274784], length 14 0x0000: 4500 0042 1d59 4000 4006 c52d ac13 0003 E..B.Y@.@..-.... 0x0010: ac13 0006 2383 9b82 b22f d4e2 8f30 ddc5 ....#..../...0.. 0x0020: 8018 c526 5864 0000 0101 080a ae66 02ba ...&Xd.......f.. 0x0030: d7c9 0260 0000 000a 0000 0018 0000 0000 ...`............ 0x0040: 001b
Rebalance 2) Consumer 의 종료 상황 ( LeaveGroup ).
컨슈너는 Graceful Shutdown 방식으로 종료될 때에 LeaveGroup API 를 호출하게 됩니다.
아래는 종료되는 Kafka Consumer 가 브로커에게 LeaveGroup API 를 요청하는 TCP Packet 입니다.
0x0038,0x0039 바이트에서 표시된 000d 가 13번에 해당하는 LeaveGroup API 의 Key 입니다.
그리고 Group Coordinator 는 컨슈머들에서 Rebalance In Process (001b) 에러 응답을 전송하고 리밸런싱을 시작합니다.
21:53:01.351590 IP competent_rubin.kafka-net.40950 > kafka2.9091: Flags [P.], seq 2484:2590, ack 604, win 501, options [nop,nop,TS val 1006060414 ecr 2406605886], length 106 0x0000: 4500 009e bad6 4000 4006 2751 ac13 0008 E.....@.@.'Q.... 0x0010: ac13 0004 9ff6 2383 5966 07eb 8775 8b5a ......#.Yf...u.Z 0x0020: 8018 01f5 58c3 0000 0101 080a 3bf7 437e ....X.......;.C~ 0x0030: 8f71 e43e 0000 0066 000d 0001 0000 0017 .q.>...f........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0036 7468 6973 5f69 735f -group.6this_is_ 0x0070: 636c 6965 6e74 5f69 642d 3131 3038 6636 client_id-1108f6 0x0080: 6432 2d66 3362 322d 3431 6630 2d62 3635 d2-f3b2-41f0-b65 0x0090: 392d 3438 3231 3833 6365 6137 3231 9-482183cea721 21:53:02.449809 IP kafka2.9091 > affectionate_babbage.kafka-net.44196: Flags [P.], seq 1709:1723, ack 3643, win 50470, options [nop,nop,TS val 1315012574 ecr 692413949], length 14 0x0000: 4500 0042 4c4a 4000 4006 963b ac13 0004 E..BLJ@.@..;.... 0x0010: ac13 0006 2383 aca4 2384 5ad9 f567 f74d ....#...#.Z..g.M 0x0020: 8018 c526 5865 0000 0101 080a 4e61 7fde ...&Xe......Na.. 0x0030: 2945 65fd 0000 000a 0000 001e 0000 0000 )Ee............. 0x0040: 001b
Rebalance 3) Consumer 가 강제 종료되는 상황.
컨슈머가 만약에 강제로 종료된다면 이 상황에서는 즉시 리밸런싱이 발생하지 않습니다.
강제로 종료된 Consumer 가 session.timeout.ms 동안에 Heartbeat 를 전송하지 않을 것이고,
session.timeout.ms 시간 이후에 리밸런싱이 시작됩니다.
아래는 2개의 Consumer 들이 Heartbeat 를 전송하는 상황인데요.
22:33:18 시점에 하나의 Consumer 가 종료되었고, 22:33:28 인 10초 경과 이후에 리밸런싱이 본격적으로 시작됩니다.
( heartbeat.interval.ms : 5초, session.timeout.ms : 6초 )
// Consumer 1 Heartbeat 전송 22:33:18.489497 IP inspiring_torvalds.kafka-net.43690 > kafka2.9091: Flags [P.], seq 1963:2075, ack 1513, win 501, options [nop,nop,TS val 694827921 ecr 1317421521], length 112 0x0000: 4500 00a4 5fdf 4000 4006 8244 ac13 0006 E..._.@.@..D.... 0x0010: ac13 0004 aaaa 2383 573e c929 0632 84c1 ......#.W>.).2.. 0x0020: 8018 01f5 58c7 0000 0101 080a 296a 3b91 ....X.......)j;. 0x0030: 4e86 41d1 0000 006c 000c 0003 0000 0010 N.A....l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0013 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 6462 _is_client_id-db 0x0080: 3939 6261 3462 2d35 3239 632d 3430 6131 99ba4b-529c-40a1 0x0090: 2d61 6666 662d 3163 3666 3365 6139 6266 -afff-1c6f3ea9bf 0x00a0: 3762 ffff 7b.. // Consumer 2 Heartbeat 전송 22:33:18.493039 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1028:1140, ack 422, win 501, options [nop,nop,TS val 1008475469 ecr 2409019861], length 112 0x0000: 4500 00a4 471a 4000 4006 9b07 ac13 0008 E...G.@.@....... 0x0010: ac13 0004 8f2c 2383 342f 7efc d795 8535 .....,#.4/~....5 0x0020: 8018 01f5 58c9 0000 0101 080a 3c1c 1d4d ....X.......<..M 0x0030: 8f96 b9d5 0000 006c 000c 0003 0000 000a .......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0013 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3762 _is_client_id-7b 0x0080: 3630 3164 6631 2d62 3431 302d 3438 3434 601df1-b410-4844 0x0090: 2d39 6233 642d 3438 3837 6530 3864 3731 -9b3d-4887e08d71 0x00a0: 3663 ffff 6c.. // Consumer 2 Heartbeat 전송 22:33:23.525129 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1140:1252, ack 436, win 501, options [nop,nop,TS val 1008480501 ecr 2409024893], length 112 0x0000: 4500 00a4 471c 4000 4006 9b05 ac13 0008 E...G.@.@....... 0x0010: ac13 0004 8f2c 2383 342f 7f6c d795 8543 .....,#.4/.l...C 0x0020: 8018 01f5 58c9 0000 0101 080a 3c1c 30f5 ....X.......<.0. 0x0030: 8f96 cd7d 0000 006c 000c 0003 0000 000b ...}...l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0013 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3762 _is_client_id-7b 0x0080: 3630 3164 6631 2d62 3431 302d 3438 3434 601df1-b410-4844 0x0090: 2d39 6233 642d 3438 3837 6530 3864 3731 -9b3d-4887e08d71 0x00a0: 3663 ffff 6c.. // Consumer 2 Heartbeat 전송 22:33:28.545167 IP distracted_elgamal.kafka-net.36652 > kafka2.9091: Flags [P.], seq 1252:1364, ack 450, win 501, options [nop,nop,TS val 1008485522 ecr 2409029908], length 112 0x0000: 4500 00a4 471e 4000 4006 9b03 ac13 0008 E...G.@.@....... 0x0010: ac13 0004 8f2c 2383 342f 7fdc d795 8551 .....,#.4/.....Q 0x0020: 8018 01f5 58c9 0000 0101 080a 3c1c 4492 ....X.......<.D. 0x0030: 8f96 e114 0000 006c 000c 0003 0000 000c .......l........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 116d 792d 636f 6e73 756d 6572 _id..my-consumer 0x0060: 2d67 726f 7570 0000 0013 0036 7468 6973 -group.....6this 0x0070: 5f69 735f 636c 6965 6e74 5f69 642d 3762 _is_client_id-7b 0x0080: 3630 3164 6631 2d62 3431 302d 3438 3434 601df1-b410-4844 0x0090: 2d39 6233 642d 3438 3837 6530 3864 3731 -9b3d-4887e08d71 0x00a0: 3663 ffff 6c.. // Broker 가 Rebalance In Progress 를 Consumer 에게 알림. 22:33:28.560668 IP kafka2.9091 > distracted_elgamal.kafka-net.36652: Flags [P.], seq 450:464, ack 1364, win 50470, options [nop,nop,TS val 2409034929 ecr 1008485522], length 14 0x0000: 4500 0042 a43a 4000 4006 3e49 ac13 0004 E..B.:@.@.>I.... 0x0010: ac13 0008 2383 8f2c d795 8551 342f 804c ....#..,...Q4/.L 0x0020: 8018 c526 5867 0000 0101 080a 8f96 f4b1 ...&Xg.......... 0x0030: 3c1c 4492 0000 000a 0000 000c 0000 0000 <.D............. 0x0040: 001b
Rebalance 4) Partition 이 추가되는 상황.
토픽의 파티션이 추가되는 상황에서 리밸런싱이 발생합니다.
아래의 명령어는 기존의 Topic 에 Partition 을 변경하는 명령어의 예시입니다.
kafka-topics.sh \ --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \ --alter \ --topic test-topic \ --partitions 4
파티션이 추가되는 경우에는 Metadata 의 갱신과 리밸런싱이 관련됩니다.
Consumer 는 metadata.max.age.ms 를 주기로 메타데이터를 갱신하는데요.
기본적으로 metadata.max.age.ms 는 5분으로 설정됩니다.
그래서 metadata 를 갱신한 이후 Partition 의 정보가 기존의 정보와 일치하지 않는 경우에
JoinGroup, SyncGroup 를 새롭게 요청함으로써 리밸런싱이 발생합니다.
이는 모듈마다 차이가 존재합니다.
python 의 confluent-kafka 모듈의 경우에는 5분 이상의 시간이 소요된 이후에 Rebalancing 이 시작되었고,
java 의 kafka-clients 모듈의 경우에는 2분 이내에 Rebalacing 이 시작되었습니다.
파티션이 변경되는 경우에는 Kafka Consumer 를 재실행하는 방식으로 파티션-컨슈머의 할당을 시도하는 것이 효율적이라고 판단됩니다.
카프카 클러스터 실행하기.
아래는 카프카 클러스터를 실행하기 위한 Zookeeper 와 Kafka Broker 의 도커 명령어입니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2 docker run -d --name kafka3 --hostname kafka3 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=3 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:39092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9092,INNER://kafka3:9091,OUTER://localhost:39092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 39092:39092 \ bitnami/kafka:3.1.2
컨슈머 실행 스크립트.
cat <<EOF> /tmp/code.py import signal import sys from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_client_id', 'bootstrap.servers': 'kafka1:9091,kafka2:9091,kafka3:9091', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'heartbeat.interval.ms': 5000, 'session.timeout.ms': 6000, 'metadata.max.age.ms': 10000 } consumer = Consumer(config) consumer.subscribe(['test-topic']) def signal_handler(sig, frame): print("\nShutdown signal received. Closing Kafka consumer...") consumer.close() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: while True: msg = consumer.poll(1) if msg is not None and not msg.error(): print(f"Received message: {msg.value().decode('utf-8')}") except Exception as e: print(f"Unexpected error: {e}") finally: print("Closing Kafka consumer...") consumer.close() EOF docker run -d --rm --network kafka-net -v /tmp/code.py:/tmp/code.py python:3.9-bullseye sh -c 'pip install confluent-kafka && python /tmp/code.py'
docker run -it --rm --network kafka-net openjdk:11 bash -c ' wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar -P /app/libs/ wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar -P /app/libs/ cat <<EOF > /app/KafkaConsumerExample.java import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "kafka1:9092"); props.put("group.id", "my-consumer-group"); props.put("metadata.max.age.ms", 10000L); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); // 토픽 구독 System.out.println("Kafka Consumer started. Waiting for messages..."); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n", record.key(), record.value(), record.partition(), record.offset()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } } EOF javac -cp "/app/libs/kafka-clients-3.5.1.jar:/app/libs/slf4j-api-1.7.36.jar:/app/libs/slf4j-nop-1.7.36.jar" /app/KafkaConsumerExample.java cd /app && java -cp "/app/libs/kafka-clients-3.5.1.jar:/app/libs/slf4j-api-1.7.36.jar:/app/libs/slf4j-nop-1.7.36.jar:." KafkaConsumerExample '
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Static Membership 과 Partition Assignment 관계 알아보기 ( group.instance.id ) (0) 2024.06.30 [Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기 (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 [Kafka] SyncGroup API 알아보기 (0) 2024.06.30