ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Producer 의 EndTxn API 알아보기 ( Commit or Abort , Transaction Coordinator )
    Kafka/Kafka Producer 2024. 7. 4. 07:16
    반응형

    - 목차

     

    EndTxn API 란 ?

    EndTxn API 는 Transactional Producer 가 트랜잭션을 종료하기 위해서 브로커에게 전달하는 API 입니다.

    카프카 라이브러리에서는 commit_transaction or abort_transaction 와 같은 추상화된 함수로 표현되곤 합니다.

    이러한 함수가 실행되면 내부적으로 EndTxn API 의 Request/Response 가 동작하게 됩니다.

     

    EndTxn API 의 형식은 아래와 같습니다.

    EndTxn Request (Version: 1) => transactional_id producer_id producer_epoch committed 
      transactional_id => STRING
      producer_id => INT64
      producer_epoch => INT16
      committed => BOOLEAN

     

    transactional_id 와 producer_id, producer_epoch 는 현재 자신의 트랜잭션의 식별값 또는 메타정보를 의미합니다.

    이는 트랜잭션 시작 시에 프로듀서가 브로커로부터 전달받은 값들입니다. ( InitProducerid 를 통해서 )

    그리고 committed 의 위치에 Commit 또는 Aborted 여부를 전달합니다.

     

    아래의 TCP Packet 은 EndTxn API Request 의 데이터인데요.

    0x0038 과 0x0039 위치의 001a 정보가 26의 16진수 표현입니다. 

    그리고 EndTxn API 의 Key 가 26이므로 아래와 같은 형식으로 End Transaction 이 요청됩니다.

     

    • transactional_id : this_is_my_transactional_id
      • 0x42 ~ 0x55
    • producer_id : 0
      • 0x73 ~ 0x7a
    • producer_epoch: 2
      • 0x7b ~ 0x7c
    • committed : 1
      • 0x7d
    23:11:09.555987 IP 172.19.0.6.56010 > kafka1.9091: Flags [P.], seq 1745703631:1745703705, ack 109238315, win 501, options [nop,nop,TS val 3842498311 ecr 1802147001], length 74
    	0x0000:  4500 007e 8ca8 4000 4006 55a2 ac13 0006  E..~..@.@.U.....
    	0x0010:  ac13 0003 daca 2383 680d 52cf 0682 d82b  ......#.h.R....+
    	0x0020:  8018 01f5 58a0 0000 0101 080a e507 df07  ....X...........
    	0x0030:  6b6a 94b9 0000 0046 001a 0001 0000 0005  kj.....F........
    	0x0040:  0014 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 745f 6964 001b 7468 6973 5f69 735f  ent_id..this_is_
    	0x0060:  6d79 5f74 7261 6e73 6163 7469 6f6e 616c  my_transactional
    	0x0070:  5f69 6400 0000 0000 0000 0000 0201       _id...........

     

     

     

    EndTxn API 는 Transaction Coordinator 에게 전달되는가 ?

    EndTxn API 는 Transaction Coordinator 에게 요청됩니다.

    그리고 EndTxn API 요청을 전달받은 Transaction Coordinator 는 __transaction_state 토픽에 트랙잭션 상태를 저장합니다.

    EndTxn API 를 요청한 transactional.id 를 기준으로 Committed 또는 Aborted 여부를 __transaction_state 토픽에 기록하게 됩니다.

    그리고 각 Partition 의 Leader Broker 에게 Commit 또는 Abort Marker 를 생성할 수 있도록 Marker 생성을 요청합니다.

     

    Transaction Coordinator 는 다른 Broker 들에게 WriteTxnMarkers Request 를 요청합니다.

    WriteTxnMarkers API 는 Commit 또는 Abort Marker 를 다른 브로커에게 생성하도록 요청합니다.

     

     

     

    아래와 같은 형식으로 Producer 는 Transaction Coordinator 에게 EndTxn 을 요청하게 되고,

    Transaction Coordinator 는 다른 Broker 에게 WriteTxnMarkers 를 요청합니다.

    // Producer 가 Transaction Coordinator 에게 EndTxn 을 요청함.
    23:25:51.654619 IP friendly_nash.kafka-net.50334 > kafka1.9092: Flags [P.], seq 2120630149:2120630223, ack 3221377303, win 501, options [nop,nop,TS val 3843380432 ecr 1803029118], length 74
    	0x0000:  4500 007e 42a2 4000 4006 9fa8 ac13 0006  E..~B.@.@.......
    	0x0010:  ac13 0003 c49e 2384 7e66 3f85 c002 5117  ......#.~f?...Q.
    	0x0020:  8018 01f5 58a0 0000 0101 080a e515 54d0  ....X.........T.
    	0x0030:  6b78 0a7e 0000 0046 001a 0001 0000 0005  kx.~...F........
    	0x0040:  0014 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 745f 6964 001b 7468 6973 5f69 735f  ent_id..this_is_
    	0x0060:  6d79 5f74 7261 6e73 6163 7469 6f6e 616c  my_transactional
    	0x0070:  5f69 6400 0000 0000 0000 0000 0701       _id...........
    
    // Transaction Coordinator 가 다른 브로커에게 WriteTxnMarkers 을 요청함. 
    23:25:51.658823 IP kafka1.42984 > kafka2.kafka-net.9092: Flags [P.], seq 4216826102:4216826173, ack 1541120937, win 51015, options [nop,nop,TS val 1451462624 ecr 864208375], length 71
    	0x0000:  4500 007b a547 4000 4006 3d07 ac13 0003  E..{.G@.@.=.....
    	0x0010:  ac13 0005 a7e8 2384 fb57 a8f6 5bdb a3a9  ......#..W..[...
    	0x0020:  8018 c747 589c 0000 0101 080a 5683 8fe0  ...GX.......V...
    	0x0030:  3382 c5f7 0000 0043 001b 0001 0000 0007  3......C........
    	0x0040:  001a 6272 6f6b 6572 2d31 2d74 786e 2d6d  ..broker-1-txn-m
    	0x0050:  6172 6b65 722d 7365 6e64 6572 0002 0000  arker-sender....
    	0x0060:  0000 0000 0000 0007 0102 0574 6573 7402  ...........test.
    	0x0070:  0000 0001 0000 0000 0000 00              ...........

     

     

    카프카 관련 명령어들.

     

    카프카 클러스터 실행하기.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
      
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2
      
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

     

    Topic 생성.

    # Topic Create
    kafka-topics.sh --bootstrap-server kafka1:9092 \
    	--topic test \
    	--create \
    	--replication-factor 2 \
    	--partitions 2
        
    # Topic Describe    
    kafka-topics.sh --bootstrap-server kafka1:9092 --topic test --describe
    kafka-topics.sh --bootstrap-server kafka1:9092 --topic __transaction_state --describe
    
    
    # Topic Delete
    kafka-topics.sh --bootstrap-server kafka1:9092 --topic test --delete

     

    Producer 실행.

    <<EOF> /tmp/code.py
    from confluent_kafka import Producer
    
    producer_conf = {
        'bootstrap.servers': 'kafka1:9091',
        'enable.idempotence': "true",
        'client.id': 'this_is_my_client_id',
        'transactional.id': 'this_is_my_transactional_id'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()
    producer.begin_transaction()
    producer.produce("test", key=b'1', value=f"this is a message".encode())
    producer.flush()
    producer.commit_transaction()
    # or 
    # producer.abort_transaction()
    
    EOF
    
    docker run -d --rm --network kafka-net -v /tmp/code.py:/tmp/code.py python:3.9-bullseye \
    	sh -c 'pip install confluent-kafka && python /tmp/code.py'

     

     

    EndTxn TCP Packet 탐색.

    tcpdump -i eth0 '(dst port 9091 and (tcp[37] = 0x1a))' -X
    tcpdump -i eth0 '(dst port 9092 and (tcp[37] = 0x1a or tcp[37] = 0x1b))' -X

     

     

     

     

     

     

     

    반응형
Designed by Tistory.