-
[Kafka] Producer 의 EndTxn API 알아보기 ( Commit or Abort , Transaction Coordinator )Kafka/Kafka Producer 2024. 7. 4. 07:16반응형
- 목차
EndTxn API 란 ?
EndTxn API 는 Transactional Producer 가 트랜잭션을 종료하기 위해서 브로커에게 전달하는 API 입니다.
카프카 라이브러리에서는 commit_transaction or abort_transaction 와 같은 추상화된 함수로 표현되곤 합니다.
이러한 함수가 실행되면 내부적으로 EndTxn API 의 Request/Response 가 동작하게 됩니다.
EndTxn API 의 형식은 아래와 같습니다.
EndTxn Request (Version: 1) => transactional_id producer_id producer_epoch committed transactional_id => STRING producer_id => INT64 producer_epoch => INT16 committed => BOOLEAN
transactional_id 와 producer_id, producer_epoch 는 현재 자신의 트랜잭션의 식별값 또는 메타정보를 의미합니다.
이는 트랜잭션 시작 시에 프로듀서가 브로커로부터 전달받은 값들입니다. ( InitProducerid 를 통해서 )
그리고 committed 의 위치에 Commit 또는 Aborted 여부를 전달합니다.
아래의 TCP Packet 은 EndTxn API Request 의 데이터인데요.
0x0038 과 0x0039 위치의 001a 정보가 26의 16진수 표현입니다.
그리고 EndTxn API 의 Key 가 26이므로 아래와 같은 형식으로 End Transaction 이 요청됩니다.
- transactional_id : this_is_my_transactional_id
- 0x42 ~ 0x55
- producer_id : 0
- 0x73 ~ 0x7a
- producer_epoch: 2
- 0x7b ~ 0x7c
- committed : 1
- 0x7d
23:11:09.555987 IP 172.19.0.6.56010 > kafka1.9091: Flags [P.], seq 1745703631:1745703705, ack 109238315, win 501, options [nop,nop,TS val 3842498311 ecr 1802147001], length 74 0x0000: 4500 007e 8ca8 4000 4006 55a2 ac13 0006 E..~..@.@.U..... 0x0010: ac13 0003 daca 2383 680d 52cf 0682 d82b ......#.h.R....+ 0x0020: 8018 01f5 58a0 0000 0101 080a e507 df07 ....X........... 0x0030: 6b6a 94b9 0000 0046 001a 0001 0000 0005 kj.....F........ 0x0040: 0014 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 745f 6964 001b 7468 6973 5f69 735f ent_id..this_is_ 0x0060: 6d79 5f74 7261 6e73 6163 7469 6f6e 616c my_transactional 0x0070: 5f69 6400 0000 0000 0000 0000 0201 _id...........
EndTxn API 는 Transaction Coordinator 에게 전달되는가 ?
EndTxn API 는 Transaction Coordinator 에게 요청됩니다.
그리고 EndTxn API 요청을 전달받은 Transaction Coordinator 는 __transaction_state 토픽에 트랙잭션 상태를 저장합니다.
EndTxn API 를 요청한 transactional.id 를 기준으로 Committed 또는 Aborted 여부를 __transaction_state 토픽에 기록하게 됩니다.
그리고 각 Partition 의 Leader Broker 에게 Commit 또는 Abort Marker 를 생성할 수 있도록 Marker 생성을 요청합니다.
Transaction Coordinator 는 다른 Broker 들에게 WriteTxnMarkers Request 를 요청합니다.
WriteTxnMarkers API 는 Commit 또는 Abort Marker 를 다른 브로커에게 생성하도록 요청합니다.
아래와 같은 형식으로 Producer 는 Transaction Coordinator 에게 EndTxn 을 요청하게 되고,
Transaction Coordinator 는 다른 Broker 에게 WriteTxnMarkers 를 요청합니다.
// Producer 가 Transaction Coordinator 에게 EndTxn 을 요청함. 23:25:51.654619 IP friendly_nash.kafka-net.50334 > kafka1.9092: Flags [P.], seq 2120630149:2120630223, ack 3221377303, win 501, options [nop,nop,TS val 3843380432 ecr 1803029118], length 74 0x0000: 4500 007e 42a2 4000 4006 9fa8 ac13 0006 E..~B.@.@....... 0x0010: ac13 0003 c49e 2384 7e66 3f85 c002 5117 ......#.~f?...Q. 0x0020: 8018 01f5 58a0 0000 0101 080a e515 54d0 ....X.........T. 0x0030: 6b78 0a7e 0000 0046 001a 0001 0000 0005 kx.~...F........ 0x0040: 0014 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 745f 6964 001b 7468 6973 5f69 735f ent_id..this_is_ 0x0060: 6d79 5f74 7261 6e73 6163 7469 6f6e 616c my_transactional 0x0070: 5f69 6400 0000 0000 0000 0000 0701 _id........... // Transaction Coordinator 가 다른 브로커에게 WriteTxnMarkers 을 요청함. 23:25:51.658823 IP kafka1.42984 > kafka2.kafka-net.9092: Flags [P.], seq 4216826102:4216826173, ack 1541120937, win 51015, options [nop,nop,TS val 1451462624 ecr 864208375], length 71 0x0000: 4500 007b a547 4000 4006 3d07 ac13 0003 E..{.G@.@.=..... 0x0010: ac13 0005 a7e8 2384 fb57 a8f6 5bdb a3a9 ......#..W..[... 0x0020: 8018 c747 589c 0000 0101 080a 5683 8fe0 ...GX.......V... 0x0030: 3382 c5f7 0000 0043 001b 0001 0000 0007 3......C........ 0x0040: 001a 6272 6f6b 6572 2d31 2d74 786e 2d6d ..broker-1-txn-m 0x0050: 6172 6b65 722d 7365 6e64 6572 0002 0000 arker-sender.... 0x0060: 0000 0000 0000 0007 0102 0574 6573 7402 ...........test. 0x0070: 0000 0001 0000 0000 0000 00 ...........
카프카 관련 명령어들.
카프카 클러스터 실행하기.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
Topic 생성.
# Topic Create kafka-topics.sh --bootstrap-server kafka1:9092 \ --topic test \ --create \ --replication-factor 2 \ --partitions 2 # Topic Describe kafka-topics.sh --bootstrap-server kafka1:9092 --topic test --describe kafka-topics.sh --bootstrap-server kafka1:9092 --topic __transaction_state --describe # Topic Delete kafka-topics.sh --bootstrap-server kafka1:9092 --topic test --delete
Producer 실행.
<<EOF> /tmp/code.py from confluent_kafka import Producer producer_conf = { 'bootstrap.servers': 'kafka1:9091', 'enable.idempotence': "true", 'client.id': 'this_is_my_client_id', 'transactional.id': 'this_is_my_transactional_id' } producer = Producer(producer_conf) producer.init_transactions() producer.begin_transaction() producer.produce("test", key=b'1', value=f"this is a message".encode()) producer.flush() producer.commit_transaction() # or # producer.abort_transaction() EOF docker run -d --rm --network kafka-net -v /tmp/code.py:/tmp/code.py python:3.9-bullseye \ sh -c 'pip install confluent-kafka && python /tmp/code.py'
EndTxn TCP Packet 탐색.
tcpdump -i eth0 '(dst port 9091 and (tcp[37] = 0x1a))' -X
tcpdump -i eth0 '(dst port 9092 and (tcp[37] = 0x1a or tcp[37] = 0x1b))' -X
반응형'Kafka > Kafka Producer' 카테고리의 다른 글
[Kafka] FindCoordinator 와 Transaction Coordinator 알아보기 (0) 2024.06.29 [Kafka] Transaction 과 Commit / Abort Maker 알아보기 (0) 2024.06.29 [Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number ) (0) 2024.06.25 [Kafka] request.timeout.ms 와 데이터 중복 생성 알아보기 (0) 2024.06.25 [Kafka] Pull 방식의 복제와 Producer 속도 관계 알아보기 ( replica.fetch.wait.max.ms ) (0) 2024.06.25 - transactional_id : this_is_my_transactional_id