ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] SyncGroup API 알아보기
    Kafka/Kafka Consumer 2024. 6. 30. 14:56
    반응형

    - 목차

     

    들어가며.

    Kafka Consumer 는 리밸런싱이라는 과정을 통해서 Consumer Group 을 형성합니다.

    이 과정에서 FindCoordinator, JoinGroup, SyncGroup 등의 API 가 사용됩니다.

    SyncGroup 은 각 Consumer 와 Topic-Partition 의 할당을 마무리짓고, 본격적인 Record Fetch 를 수행하는 마지막 과정입니다.

    이번 글에서 SyncGroup API 에 대해서 상세히 알아보도록 하겠습니다.

     

    https://westlife0615.tistory.com/944

     

    [Kafka] FindCoordinator API 와 Group Coordinator 알아보기

    - 목차 들어가며.Kafka Consumer 는 group.id 라는 설정을 통해서 Consumer Group 을 형성합니다.동일한 group.id 를 가지는 여러 Kafka Consumer 들은 하나의 Consumer Group 의 구성원이 됩니다.이 과정에서 Group Coordi

    westlife0615.tistory.com

    https://westlife0615.tistory.com/951

     

    [Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ?

    - 목차 들어가며.Kafka Consumer 는 group.id 라는 속성을 가집니다.group.id 라는 속성은 원격에서 실행되는 여러 개의 Kafka Consumer 들을 하나의 Group 으로 동작하게 만듭니다.오늘의 주제인 "JoinGroup API"

    westlife0615.tistory.com

     

     

    Consumer - Partition Assignment.

     

    Consumer 들은 Group Coordinator 에게 JoinGroup API 를 요청하여 Consumer Group 에 참여 의사를 밝힙니다.

    이 과정에서 Leader Consumer 가 선정되고, 나머지 Consumer 들은 Follower Consumer 가 됩니다.

    Leader Consumer 는 JoinGroup API 의 응답으로써 모든 Consumer 들의 member id 정보를 획득합니다.

    그리고 Leader Consumer 는 Group Coordinator 로부터 전달받은 Consumer 의 정보를 기반으로 Consumer - Partition 의 할당을 수행합니다.

     

    이 과정에서 Leader Consumer 는 2개의 API 를 필요로합니다.

    먼저 Metadata API 를 통해서 토픽과 파티션의 정보를 획득합니다.

    그리고 JoinGroup API 를 통해서 모든 Consumer 들의 member-id 를 가지게 됩니다.

    이제 Partition Assignment 전략에 의거해서 파티션을 Consumer 에게 할당할 수 있게 됩니다.

     

    할당 방식은 크게 Range, Sticky, RoundRobin 으로 나뉩니다.

    이러한 방식 중 하나를 통해서 Consumer 와 Partition 을 할당합니다.

    그리고 SyncGroup API 의 요청으로 Member-Partition 할당 정보는 Group Coordinator 에게 제공합니다.

    Leader Consumer 는 대략 아래와 같은 형식으로 Group Coordinator 에게 할당 정보를 전달합니다.

     

    {
      "group_id": "my-consumer-group",
      "generation_id": 5,
      "member_id": "consumer-1",
      "assignments": [
        { "member_id": "consumer-1", "assignment": "topicA-0, topicA-1" },
        { "member_id": "consumer-2", "assignment": "topicA-2" }
      ]
    }

     

    그리고 Group Coordinator 는 Leader Consumer 가 할당한 정보를 Follower Consumer 들에게 제공합니다.

    Group Coordinator 가 Follower Consumer 에게 Assignment 정보를 제공하는 방식을 SyncGroup 의 응답을 통해서 전달합니다.

    이러한 방식이 가능한 이유는 모든 Consumer 들은 의무적으로 SyncGroup API 를 Group Coordinator 에게 전달하기 때문입니다.

     

    Leader Consumer 와 Follower Consumer 의 SyncGroup API 요청의 차이점이 있다면,

    Leader Consumer 는 Assignment 정보를 Group Coordinator 에게 제공하지만,

    Follower Consumer 는 Assignment 정보를 제공하지 않습니다.

    그 이유는 Assignment 행위는 Leader Consumer 만이 가능하기 때문이죠.

     

    이렇게 Follower Consumer 는 Leader Consumer 가 제출한 Assignment 정보를 Group Coordinator 에 의해서 전달받을 수 있습니다.

    그리고 Follower Consumer 는 자신에게 할당된 Partition 을 Fetch 할 수 있게 됩니다.

     

    SyncGroup API Request 살펴보기.

    우선 SyncGroup Request 의 포맷에 대해서 알아보도록 하겠습니다.

    ( 이 글에서는 Version 5 를 기준으로 설명합니다. )

    SyncGroup Request (Version: 5) => group_id generation_id member_id group_instance_id protocol_type protocol_name [assignments] TAG_BUFFER 
      group_id => COMPACT_STRING
      generation_id => INT32
      member_id => COMPACT_STRING
      group_instance_id => COMPACT_NULLABLE_STRING
      protocol_type => COMPACT_NULLABLE_STRING
      protocol_name => COMPACT_NULLABLE_STRING
      assignments => member_id assignment TAG_BUFFER 
        member_id => COMPACT_STRING
        assignment => COMPACT_BYTES

     

    • group_id
      • group_id 는 Consumer Group 을 식별할 수 있는 고유값입니다.
      • group_id 를 기준으로 하나의 Consumer Group 이 생성됩니다. 

     

    • generation_id
      • generation_id 는 얼마나 리밸런싱이 발생하였는지에 대한 수치입니다.
      • Consumer 의 추가/제거 또는 파티션의 변경 등 여러가지 요소로 인해 리밸런싱이 발생하면 generation_id 의 값을 증가합니다.

     

    • member_id
      • member_id 는 JoinGroup API 를 통해서 Group Coordinator 로부터 부여받은 고유 식별값입니다.

     

    • protocol_type
      • protocol_type 은 consumer 또는 connect 중 한가지 값이 사용됩니다.
      • Kafka Connect 또한 SyncGroup API 를 사용하기 때문에 구분을 위해서 protocol_type 값이 활용됩니다.

     

    • protocol_name
      • protocol_name 은 Consumer 와 Partition 의 할당 방식을 의미합니다.
      • range, sticky, roundrobin 등의 값이 사용됩니다.

     

    • assignments
      • SyncGroup API 의 목적이라고도 할 수 있는 Member 와 Partition 의 할당 정보입니다.
      • 이 데이터는 오로지 Leader Consumer 만이 사용합니다.
      • 그 이유는 Member Partition Assignment 는 Leader Consumer 의 역할이자 책임이기 때문입니다.
      • Follower Consumer 는 빈값으로 SyncGroup API 를 요청합니다.

     

    SyncGroup Request TCP Packet 분석하기.

    테스트 상황은 다음과 같습니다.

    "test-topic" 이라는 이름의 Topic 이 존재하며, 이는 3개의 Partition 을 가집니다.

    그리고 3개의 Consumer 들이 실행됩니다.

    아래에는 3개의 TCP Packet 을 포착하여 작성하였구요.

    첫 2개의 TCP Packet 은 Follower Consumer 의 SyncGroup API Request 이며,

    마지막 TCP Packet 은 Leader Consumer 의 SyncGroup API Request 입니다.

    IP sharp_hypatia.kafka-net.58928 > kafka1.9091: Flags [P.], seq 3074615952:3074616068, ack 879954872, win 501, options [nop,nop,TS val 848193994 ecr 467356024], length 116
    	0x0000:  4500 00a8 054a 4000 4006 dcd4 ac13 0008  E....J@.@.......
    	0x0010:  ac13 0003 e630 2383 b742 ea90 3473 0bb8  .....0#..B..4s..
    	0x0020:  8018 01f5 58cc 0000 0101 080a 328e 69ca  ....X.......2.i.
    	0x0030:  1bdb 4978 0000 0070 000e 0003 0000 0004  ..Ix...p........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 3466  _is_client_id-4f
    	0x0080:  3335 6639 6436 2d66 6235 662d 3433 6436  35f9d6-fb5f-43d6
    	0x0090:  2d38 3336 342d 6431 6361 3438 3233 6135  -8364-d1ca4823a5
    	0x00a0:  3563 ffff 0000 0000                      5c......
    
    
    IP condescending_lehmann.kafka-net.59688 > kafka1.9091: Flags [P.], seq 3288129610:3288129726, ack 3850761461, win 501, options [nop,nop,TS val 90071823 ecr 1304358968], length 116
    	0x0000:  4500 00a8 9bbc 4000 4006 4663 ac13 0007  E.....@.@.Fc....
    	0x0010:  ac13 0003 e928 2383 c3fc e04a e585 f4f5  .....(#....J....
    	0x0020:  8018 01f5 58cb 0000 0101 080a 055e 630f  ....X........^c.
    	0x0030:  4dbe f038 0000 0070 000e 0003 0000 0004  M..8...p........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6561  _is_client_id-ea
    	0x0080:  3861 3866 3236 2d64 3263 332d 3463 3964  8a8f26-d2c3-4c9d
    	0x0090:  2d61 3238 612d 3233 3539 3662 6632 6136  -a28a-23596bf2a6
    	0x00a0:  6433 ffff 0000 0000                      d3......
    
    
    IP happy_booth.kafka-net.59782 > kafka1.9091: Flags [P.], seq 2743252020:2743252406, ack 3754466135, win 501, options [nop,nop,TS val 928502324 ecr 4115129766], length 386
    	0x0000:  4500 01b6 a7f1 4000 4006 391e ac13 0009  E.....@.@.9.....
    	0x0010:  ac13 0003 e986 2383 a382 b434 dfc8 9b57  ......#....4...W
    	0x0020:  8018 01f5 59db 0000 0101 080a 3757 d234  ....Y.......7W.4
    	0x0030:  f547 e5a6 0000 017e 000e 0003 0000 0005  .G.....~........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 116d 792d 636f 6e73 756d 6572  _id..my-consumer
    	0x0060:  2d67 726f 7570 0000 0003 0036 7468 6973  -group.....6this
    	0x0070:  5f69 735f 636c 6965 6e74 5f69 642d 6534  _is_client_id-e4
    	0x0080:  6661 3662 6239 2d65 3032 382d 3464 3963  fa6bb9-e028-4d9c
    	0x0090:  2d61 6366 652d 6663 3034 6533 3831 6563  -acfe-fc04e381ec
    	0x00a0:  3835 ffff 0000 0003 0036 7468 6973 5f69  85.......6this_i
    	0x00b0:  735f 636c 6965 6e74 5f69 642d 6534 6661  s_client_id-e4fa
    	0x00c0:  3662 6239 2d65 3032 382d 3464 3963 2d61  6bb9-e028-4d9c-a
    	0x00d0:  6366 652d 6663 3034 6533 3831 6563 3835  cfe-fc04e381ec85
    	0x00e0:  0000 001e 0000 0000 0001 000a 7465 7374  ............test
    	0x00f0:  2d74 6f70 6963 0000 0001 0000 0001 0000  -topic..........
    	0x0100:  0000 0036 7468 6973 5f69 735f 636c 6965  ...6this_is_clie
    	0x0110:  6e74 5f69 642d 3466 3335 6639 6436 2d66  nt_id-4f35f9d6-f
    	0x0120:  6235 662d 3433 6436 2d38 3336 342d 6431  b5f-43d6-8364-d1
    	0x0130:  6361 3438 3233 6135 3563 0000 001e 0000  ca4823a55c......
    	0x0140:  0000 0001 000a 7465 7374 2d74 6f70 6963  ......test-topic
    	0x0150:  0000 0001 0000 0000 0000 0000 0036 7468  .............6th
    	0x0160:  6973 5f69 735f 636c 6965 6e74 5f69 642d  is_is_client_id-
    	0x0170:  6561 3861 3866 3236 2d64 3263 332d 3463  ea8a8f26-d2c3-4c
    	0x0180:  3964 2d61 3238 612d 3233 3539 3662 6632  9d-a28a-23596bf2
    	0x0190:  6136 6433 0000 001e 0000 0000 0001 000a  a6d3............
    	0x01a0:  7465 7374 2d74 6f70 6963 0000 0001 0000  test-topic......
    	0x01b0:  0002 0000 0000                           ......

     

    Leader Consumer 의 SyncGroup API Request 인 마지막 TCP Packet 에 주목하면,

    Follower Consumer 의 SyncGroup API Request 에 비해서 길이가 더 깁니다.

    Length 는 바이트의 갯수가 386 개로 Follower Consumer 의 Packet Payload 의 116 보다 훨씬 깁니다.

    이렇게 길이의 차이가 발생하는 이유는 Assignments 정보의 유무입니다.

    Leader Consumer 는 Group Coordinator 에게 Assignments 정보를 제공합니다.

     

    Packet 의 Payload 를 디코딩하면 아래와 같이 표현되며,

    각 Consumer 의 member id 와 Topic-Partition 의 연결정보를 나타냅니다.

    this_is_client_id-e4fa6bb9-e028-4d9c-acfe-fc04e381ec85 - test-topic 2
    this_is_client_id-4f35f9d6-fb5f-43d6-8364-d1ca4823a55c - test-topic 0
    this_is_client_id-ea8a8f26-d2c3-4c9d-a28a-23596bf2a6d3 - test-topic 1

     

    SyncGroup Response 살펴보기.

    SyncGroup Response 의 포맷은 단순합니다.

    Group Coordinator 가 Consumer 들에게 자신에게 할당된 Partition 의 정보를 제공합니다.

    SyncGroup Response (Version: 5) => throttle_time_ms error_code protocol_type protocol_name assignment TAG_BUFFER 
      throttle_time_ms => INT32
      error_code => INT16
      protocol_type => COMPACT_NULLABLE_STRING
      protocol_name => COMPACT_NULLABLE_STRING
      assignment => COMPACT_BYTES

     

     

    아래의 세개의 Packet 는 각 Consumer 가 자신에게 할당된 Partition 정보를 SyncGroup Response 를 통해서 전달받습니다. 

    Leader / Follower Consumer 에 구분없이 자신에 할당된 Partition 을 전달받게 됩니다.

     

    IP kafka1.9091 > loving_morse.kafka-net.37940: Flags [P.], seq 1201:1249, ack 915, win 50470, options [nop,nop,TS val 468485600 ecr 849323560], length 48
    	0x0000:  4500 0064 5028 4000 4006 923a ac13 0003  E..dP(@.@..:....
    	0x0010:  ac13 0008 2383 9434 ba89 964b 04cb 9248  ....#..4...K...H
    	0x0020:  8018 c526 5888 0000 0101 080a 1bec 85e0  ...&X...........
    	0x0030:  329f a628 0000 002c 0000 0005 0000 0000  2..(...,........
    	0x0040:  0000 0000 001e 0000 0000 0001 000a 7465  ..............te
    	0x0050:  7374 2d74 6f70 6963 0000 0001 0000 0000  st-topic........
    	0x0060:  0000 0000     
    
    IP kafka1.9091 > sharp_williams.kafka-net.47790: Flags [P.], seq 663:711, ack 581, win 50470, options [nop,nop,TS val 4116259338 ecr 929631885], length 48
    	0x0000:  4500 0064 c69b 4000 4006 1bc6 ac13 0003  E..d..@.@.......
    	0x0010:  ac13 0009 2383 baae e454 af40 3cfe 00ab  ....#....T.@<...
    	0x0020:  8018 c526 5889 0000 0101 080a f559 220a  ...&X........Y".
    	0x0030:  3769 0e8d 0000 002c 0000 0004 0000 0000  7i.....,........
    	0x0040:  0000 0000 001e 0000 0000 0001 000a 7465  ..............te
    	0x0050:  7374 2d74 6f70 6963 0000 0001 0000 0002  st-topic........
    	0x0060:  0000 0000   
    	                             ....
    IP kafka1.9091 > priceless_cannon.kafka-net.51794: Flags [P.], seq 663:711, ack 581, win 50470, options [nop,nop,TS val 1305488544 ecr 91201388], length 48
    	0x0000:  4500 0064 84b1 4000 4006 5db2 ac13 0003  E..d..@.@.].....
    	0x0010:  ac13 0007 2383 ca52 167c 42b1 6577 b109  ....#..R.|B.ew..
    	0x0020:  8018 c526 5887 0000 0101 080a 4dd0 2ca0  ...&X.......M.,.
    	0x0030:  056f 9f6c 0000 002c 0000 0004 0000 0000  .o.l...,........
    	0x0040:  0000 0000 001e 0000 0000 0001 000a 7465  ..............te
    	0x0050:  7374 2d74 6f70 6963 0000 0001 0000 0001  st-topic........
    	0x0060:  0000 0000                                ....

     

     

     

     

    Kafka Cluster 를 실행하기 위한 Docker Command.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka3 --hostname kafka3 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=3 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:39092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:39092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 39092:39092 \
      bitnami/kafka:3.1.2

     

     

    반응형
Designed by Tistory.