ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Connect Worker 와 FindCoordinator API 알아보기
    Kafka/kafka Connect 2024. 7. 1. 06:15
    반응형

    - 목차

     

    FindCoordinator API 란 ?

    FindCoordinatorGroup Coordinator 또는 Transaction Coordinator 를 찾는 작업을 의미합니다.

    Coordinator 는 여러 브로커 중 하나의 브로커를 의미하며, Kafka Consumer 나 Producer 의 동작을 지원하기 위해서 존재합니다.

    예를 들어, Kafka Producer 는 transactional.id 와 매칭되는 Broker 가 있는데, 이를 Transaction Coordinator 라고 합니다.

    이 Transaction Coordinator 는 Producer 의 Producer ID 를 생성해주며, Producer Epoch 를 관리합니다.

    또한 __transaction_state 토픽이 트랜잭션의 상태를 기록하기도 합니다.

    Kafka Consumer 또한 Group Coordinator 라는 Coordinator 와 상호작용하며, 이 Coordinator 는 group.id 속성과 매칭됩니다.

    이러한 방식으로 FindCoordinator 는 Transaction 또는 Group 과 관련된 동작을 지원하는 전담 브로커를 찾기 위한 API 입니다.

     

    Kafka Connect 의 Worker 들 또한 Kafka Consumer 들처럼 하나의 Group 으로 동작합니다.

    따라서 Group Coordinator 가 필요하며, 이 과정에서 FindCoordinator API 를 사용하게 됩니다.

     

    FindCoordinator API 살펴보기.

    아래는 FindCoordinator API Request 의 포맷입니다.

    Kafka Worker 는 아래의 형식에 맞추어 Bootstrap Servers 로 등록된 브로커에게 Group Coordinator 의 정보를 요청합니다.

    FindCoordinator Request (Version: 4) => key_type [coordinator_keys] TAG_BUFFER 
      key_type => INT8
      coordinator_keys => COMPACT_STRING

     

    아래는 FindCoordinator API 의 요청과 응답 패킷입니다.

    요청 패킷의 0x0038 ~ 0x0039 위치의 바이트에는 "000a" 로 쓰여진 16진수 데이터가 있고 이는 십진수로 10번에 해당합니다.

    ( FindCoordinator API 의 API KEY 는 10번입니다. )

     

    그리고 응답 패킷을 살펴보면 "6b61666b6131 2383" 가 확인되며, 이는 kafka1:9091 을 의미합니다.

    즉, Group Coordinator 의 주소 정보를 의미합니다.

     

    // Worker 가 Broker 에게 FindCoordinator API 요청.
    13:03:35.653416 IP worker1.kafka-net.60090 > kafka1.9091: Flags [P.], seq 94:137, ack 440, win 31681, options [nop,nop,TS val 826916165 ecr 1414126673], length 43
    	0x0000:  4500 005f a0b5 4000 4006 41b5 ac13 0005  E.._..@.@.A.....
    	0x0010:  ac13 0003 eaba 2383 3f23 0806 2d88 0ae2  ......#.?#..-...
    	0x0020:  8018 7bc1 5880 0000 0101 080a 3149 bd45  ..{.X.......1I.E
    	0x0030:  5449 dc51 0000 0027 000a 0004 0000 0000  TI.Q...'........
    	0x0040:  0011 7468 6973 5f69 735f 636c 6965 6e74  ..this_is_client
    	0x0050:  5f69 6400 0002 0863 6f6e 6e65 6374 00    _id....connect.
        
    // Broker 가 Worker 에게 Group Coordinator 정보를 응답.
    13:03:35.657955 IP kafka1.9091 > worker1.kafka-net.60090: Flags [P.], seq 2787:2829, ack 137, win 50613, options [nop,nop,TS val 1414126679 ecr 826916167], length 42
    	0x0000:  4500 005e aa2a 4000 4006 3841 ac13 0003  E..^.*@.@.8A....
    	0x0010:  ac13 0005 2383 eaba 2d88 140d 3f23 0831  ....#...-...?#.1
    	0x0020:  8018 c5b5 587f 0000 0101 080a 5449 dc57  ....X.......TI.W
    	0x0030:  3149 bd47 0000 0026 0000 0000 0000 0000  1I.G...&........
    	0x0040:  0002 0863 6f6e 6e65 6374 0000 0001 076b  ...connect.....k
    	0x0050:  6166 6b61 3100 0023 8300 0001 0000       afka1..#......

     

     

    Kafka Connect Worker 는 __consumer_offsets 토픽에 어떤 데이터를 저장할까 ?

    group.id 가 "connect" 로 설정된 Worker 를 실행하였습니다. 

    bootstrap.servers=kafka1:9091,kafka2:9091
    group.id=connect
    client.id=this_is_client_id
    rest.port=8083
    
    offset.storage.topic=test-connect-offsets
    config.storage.topic=test-connect-configs
    status.storage.topic=test-connect-status
    
    offset.storage.replication.factor=1
    config.storage.replication.factor=1
    status.storage.replication.factor=1
    
    plugin.path=/tmp/
    tasks.max=1
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

     

    group.id 가 "connect" 이므로 __consumer_offsets 토픽 내부로 "connect" 라는 Key 를 가지는 여러 Record 들이 생성됩니다.

     

    아래는 1개의 Worker 가 실행된 후 생성되는 __consumer_offsets 의 레코드입니다.

    하나의 Worker 만이 존재하므로 이 Worker 가 Leader 이자, 유일한 member 입니다.

    Key : 
    {
    	"group": "connect"
    }
    
    Value : 
    {
    	"protocol_type": "connect",
    	"generation": 5,
    	"protocol": "sessioned",
    	"leader": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50",
    	"current_state_timestamp": 1610703734342,
    	"members": [
    		{
    			"member_id": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.6",
    			"rebalance_timeout": 60000,
    			"session_timeout": 10000,
    			"subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC42OjgwODMvAAAAAAAAAAL/////",
    			"assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAACAAAAAAAAAAAAAAAA"
    		}
    	]
    }

     

     

    아래는 총 4개의 Worker 들이 실행된 후의 __consumer_offsets 레코드의 정보입니다.

     

    {
    	"protocol_type": "connect",
    	"generation": 7,
    	"protocol": "sessioned",
    	"leader": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50",
    	"current_state_timestamp": 1610704896695,
    	"members": [
    		{
    			"member_id": "this_is_client_id-e3d196a7-acca-41de-b1c4-6e9de6d1ef03",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.8",
    			"rebalance_timeout": 60000,
    			"session_timeout": 10000,
    			"subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC44OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA",
    			"assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA"
    		},
    		{
    			"member_id": "this_is_client_id-5cf5be95-717c-42b2-908c-77a456f53ae9",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.9",
    			"rebalance_timeout": 60000,
    			"session_timeout": 10000,
    			"subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC45OjgwODMvAAAAAAAAAAP/////",
    			"assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA"
    		},
    		{
    			"member_id": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.6",
    			"rebalance_timeout": 60000,
    			"session_timeout": 10000,
    			"subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC42OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA",
    			"assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA"
    		},
    		{
    			"member_id": "this_is_client_id-82b72367-2f54-4be8-877e-3cb78c7b7e5c",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.7",
    			"rebalance_timeout": 60000,
    			"session_timeout": 10000,
    			"subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC43OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA",
    			"assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA"
    		}
    	]
    }

     

     

    카프카와 관련된 명령어들.

    카프카 클러스터 실행.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
      
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka3 --hostname kafka3 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=3 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:39092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9092,INNER://kafka3:9091,OUTER://localhost:39092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 39092:39092 \
      bitnami/kafka:3.1.2
      
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    tcpdump 명령어.

    // FindCoordinator API Packet 포착하는 명령어.
    tcpdump -i eth0 '(dst port 9091 and (tcp[37] = 0x0a))' -X

     

    Kafka Connect Worker 실행.

    cat <<EOF> /tmp/connect.properties
    
    bootstrap.servers=kafka1:9091,kafka2:9091
    group.id=connect
    client.id=this_is_client_id
    rest.port=8083
    
    offset.storage.topic=test-connect-offsets
    config.storage.topic=test-connect-configs
    status.storage.topic=test-connect-status
    
    offset.storage.replication.factor=1
    config.storage.replication.factor=1
    status.storage.replication.factor=1
    
    plugin.path=/tmp/
    tasks.max=1
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    EOF
    
    docker run -d --rm --net kafka-net -v /tmp/connect.properties:/tmp/connect.properties \
    bitnami/kafka:3.1.2 connect-distributed.sh /tmp/connect.properties

     

    반응형
Designed by Tistory.