-
[Kafka] Connect Worker 와 FindCoordinator API 알아보기Kafka/kafka Connect 2024. 7. 1. 06:15반응형
- 목차
FindCoordinator API 란 ?
FindCoordinator 란 Group Coordinator 또는 Transaction Coordinator 를 찾는 작업을 의미합니다.
Coordinator 는 여러 브로커 중 하나의 브로커를 의미하며, Kafka Consumer 나 Producer 의 동작을 지원하기 위해서 존재합니다.
예를 들어, Kafka Producer 는 transactional.id 와 매칭되는 Broker 가 있는데, 이를 Transaction Coordinator 라고 합니다.
이 Transaction Coordinator 는 Producer 의 Producer ID 를 생성해주며, Producer Epoch 를 관리합니다.
또한 __transaction_state 토픽이 트랜잭션의 상태를 기록하기도 합니다.
Kafka Consumer 또한 Group Coordinator 라는 Coordinator 와 상호작용하며, 이 Coordinator 는 group.id 속성과 매칭됩니다.
이러한 방식으로 FindCoordinator 는 Transaction 또는 Group 과 관련된 동작을 지원하는 전담 브로커를 찾기 위한 API 입니다.
Kafka Connect 의 Worker 들 또한 Kafka Consumer 들처럼 하나의 Group 으로 동작합니다.
따라서 Group Coordinator 가 필요하며, 이 과정에서 FindCoordinator API 를 사용하게 됩니다.
FindCoordinator API 살펴보기.
아래는 FindCoordinator API Request 의 포맷입니다.
Kafka Worker 는 아래의 형식에 맞추어 Bootstrap Servers 로 등록된 브로커에게 Group Coordinator 의 정보를 요청합니다.
FindCoordinator Request (Version: 4) => key_type [coordinator_keys] TAG_BUFFER key_type => INT8 coordinator_keys => COMPACT_STRING
아래는 FindCoordinator API 의 요청과 응답 패킷입니다.
요청 패킷의 0x0038 ~ 0x0039 위치의 바이트에는 "000a" 로 쓰여진 16진수 데이터가 있고 이는 십진수로 10번에 해당합니다.
( FindCoordinator API 의 API KEY 는 10번입니다. )
그리고 응답 패킷을 살펴보면 "6b61666b6131 2383" 가 확인되며, 이는 kafka1:9091 을 의미합니다.
즉, Group Coordinator 의 주소 정보를 의미합니다.
// Worker 가 Broker 에게 FindCoordinator API 요청. 13:03:35.653416 IP worker1.kafka-net.60090 > kafka1.9091: Flags [P.], seq 94:137, ack 440, win 31681, options [nop,nop,TS val 826916165 ecr 1414126673], length 43 0x0000: 4500 005f a0b5 4000 4006 41b5 ac13 0005 E.._..@.@.A..... 0x0010: ac13 0003 eaba 2383 3f23 0806 2d88 0ae2 ......#.?#..-... 0x0020: 8018 7bc1 5880 0000 0101 080a 3149 bd45 ..{.X.......1I.E 0x0030: 5449 dc51 0000 0027 000a 0004 0000 0000 TI.Q...'........ 0x0040: 0011 7468 6973 5f69 735f 636c 6965 6e74 ..this_is_client 0x0050: 5f69 6400 0002 0863 6f6e 6e65 6374 00 _id....connect. // Broker 가 Worker 에게 Group Coordinator 정보를 응답. 13:03:35.657955 IP kafka1.9091 > worker1.kafka-net.60090: Flags [P.], seq 2787:2829, ack 137, win 50613, options [nop,nop,TS val 1414126679 ecr 826916167], length 42 0x0000: 4500 005e aa2a 4000 4006 3841 ac13 0003 E..^.*@.@.8A.... 0x0010: ac13 0005 2383 eaba 2d88 140d 3f23 0831 ....#...-...?#.1 0x0020: 8018 c5b5 587f 0000 0101 080a 5449 dc57 ....X.......TI.W 0x0030: 3149 bd47 0000 0026 0000 0000 0000 0000 1I.G...&........ 0x0040: 0002 0863 6f6e 6e65 6374 0000 0001 076b ...connect.....k 0x0050: 6166 6b61 3100 0023 8300 0001 0000 afka1..#......
Kafka Connect Worker 는 __consumer_offsets 토픽에 어떤 데이터를 저장할까 ?
group.id 가 "connect" 로 설정된 Worker 를 실행하였습니다.
bootstrap.servers=kafka1:9091,kafka2:9091 group.id=connect client.id=this_is_client_id rest.port=8083 offset.storage.topic=test-connect-offsets config.storage.topic=test-connect-configs status.storage.topic=test-connect-status offset.storage.replication.factor=1 config.storage.replication.factor=1 status.storage.replication.factor=1 plugin.path=/tmp/ tasks.max=1 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
group.id 가 "connect" 이므로 __consumer_offsets 토픽 내부로 "connect" 라는 Key 를 가지는 여러 Record 들이 생성됩니다.
아래는 1개의 Worker 가 실행된 후 생성되는 __consumer_offsets 의 레코드입니다.
하나의 Worker 만이 존재하므로 이 Worker 가 Leader 이자, 유일한 member 입니다.
Key : { "group": "connect" } Value : { "protocol_type": "connect", "generation": 5, "protocol": "sessioned", "leader": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50", "current_state_timestamp": 1610703734342, "members": [ { "member_id": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.6", "rebalance_timeout": 60000, "session_timeout": 10000, "subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC42OjgwODMvAAAAAAAAAAL/////", "assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAACAAAAAAAAAAAAAAAA" } ] }
아래는 총 4개의 Worker 들이 실행된 후의 __consumer_offsets 레코드의 정보입니다.
{ "protocol_type": "connect", "generation": 7, "protocol": "sessioned", "leader": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50", "current_state_timestamp": 1610704896695, "members": [ { "member_id": "this_is_client_id-e3d196a7-acca-41de-b1c4-6e9de6d1ef03", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.8", "rebalance_timeout": 60000, "session_timeout": 10000, "subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC44OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA", "assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA" }, { "member_id": "this_is_client_id-5cf5be95-717c-42b2-908c-77a456f53ae9", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.9", "rebalance_timeout": 60000, "session_timeout": 10000, "subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC45OjgwODMvAAAAAAAAAAP/////", "assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA" }, { "member_id": "this_is_client_id-41f8da84-2220-4fa3-9690-dc2ba8460f50", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.6", "rebalance_timeout": 60000, "session_timeout": 10000, "subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC42OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA", "assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA" }, { "member_id": "this_is_client_id-82b72367-2f54-4be8-877e-3cb78c7b7e5c", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.7", "rebalance_timeout": 60000, "session_timeout": 10000, "subscription": "AAIAF2h0dHA6Ly8xNzIuMTkuMC43OjgwODMvAAAAAAAAAAMAAABpAAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA", "assignment": "AAEAAAA2dGhpc19pc19jbGllbnRfaWQtNDFmOGRhODQtMjIyMC00ZmEzLTk2OTAtZGMyYmE4NDYwZjUwABdodHRwOi8vMTcyLjE5LjAuNjo4MDgzLwAAAAAAAAADAAAAAAAAAAAAAAAA" } ] }
카프카와 관련된 명령어들.
카프카 클러스터 실행.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2 docker run -d --name kafka3 --hostname kafka3 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=3 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:39092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9092,INNER://kafka3:9091,OUTER://localhost:39092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 39092:39092 \ bitnami/kafka:3.1.2 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
tcpdump 명령어.
// FindCoordinator API Packet 포착하는 명령어. tcpdump -i eth0 '(dst port 9091 and (tcp[37] = 0x0a))' -X
Kafka Connect Worker 실행.
cat <<EOF> /tmp/connect.properties bootstrap.servers=kafka1:9091,kafka2:9091 group.id=connect client.id=this_is_client_id rest.port=8083 offset.storage.topic=test-connect-offsets config.storage.topic=test-connect-configs status.storage.topic=test-connect-status offset.storage.replication.factor=1 config.storage.replication.factor=1 status.storage.replication.factor=1 plugin.path=/tmp/ tasks.max=1 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false EOF docker run -d --rm --net kafka-net -v /tmp/connect.properties:/tmp/connect.properties \ bitnami/kafka:3.1.2 connect-distributed.sh /tmp/connect.properties
반응형'Kafka > kafka Connect' 카테고리의 다른 글
Debezium Connector 실습 환경 구축하기 (0) 2024.06.19 [Kafka Connect] JdbcSourceConnector Bulk Mode 알아보기 (0) 2024.06.18 [Kafka Connect] JdbcSourceConnector table.whitelist 알아보기 (0) 2024.06.18 [Kafka Connect] JdbcSourceConnector 구현해보기 (1) (0) 2024.06.17 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18