ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] FindCoordinator 와 Transaction Coordinator 알아보기
    Kafka/Kafka Producer 2024. 6. 29. 19:50
    반응형

     

    - 목차

     

    들어가며.

    카프카 프로듀서는 트랜잭션을 활성화하면 이 프로듀서를 전담하는 Transaction Coordinator 가 선출됩니다. 

    Transaction Coordinator 는 여러 브로커들 중에서 하나의 브로커가 Transaction Coordinator 로 선택됩니다. 

    그래서 Transaction 과 관련된 여러가지 요청들, 예를 들어 AddPartitionsToTxn, EndTxn 등이 Transaction Coordinator 에게 전송됩니다.

    특히 이번 글의 주제인 FindCoordinator 요청은 Producer 가 자신을 전담하는 Transaction Coordinator 를 조회하는 API 요청인데요.

    이번 글에서 FindCoordinator API 와 Transaction Coordinator 에 대해서 알아보려고 합니다.

     

    Transactional Producer.

    카프카 프로듀서의 트랜잭션을 활성화하는 방법은 아래와 같습니다.

    1. enable.idempotence 를 True 로 활성화시키고,
    2. transactional.id 에 식별값을 설정합니다.

    Producer 의 enable.idempotence 와 transactional.id 설정을 통해서 Producer 는 트랜잭션을 활성화시킬 수 있는 자격을 가집니다.

     

    enable.idempotence

    enable.idempotence 는 카프카 프로듀서의 멱등성 모드를 활성화시키는 옵션입니다.

    카프카에서의 멱등성이란 Producer 의 retries 로 인해서 레코드 생성 요청이 중복적으로 발생하더라도 데이터가 중복적으로 생성되는 것을 방지합니다.

    데이터베이스에서 Primary Key, Unique Index 등과 같이 유일함을 보장하는 제약 사항처럼 카프카에서도 중복을 방지하는 장치가 제공됩니다.

    세부적으로는 Producer Id, Epoch, Sequence Number 등을 사용해서 카프카 레코드의 유일성을 보장합니다.

    자세한 설명은 아래의 웹 링크를 참고해주시기 바랍니다.

     

    https://westlife0615.tistory.com/933

     

    [Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number )

    - 목차 들어가며.Kafka Producer 는 Acks 모드로 동작을 하게 되면 데이터의 중복 생성의 위험이 존재합니다.카프카에서 중복 데이터 생성이 발생하는 원인은 at least once 방식을 사용하는 카프카의 데

    westlife0615.tistory.com

     

     

    transactional.id

    transactional.id 는 카프카 프로듀서의 식별값입니다.

    카프카 프로듀서는 client.id, Producer ID, transactional.id 등 여러가지 식별값을 가집니다.

    모든 id 설정은 모두 공통적으로 프로듀서를 식별하기 위해서 사용되지만 그 의미가 다릅니다.

    transactional.id 는 오직 트랜잭션을 위해서 사용되는 정보입니다.

    이어지는 내용에서 transactional.id 의 자세한 쓰임새를 알아보도록 하겠습니다.

     

    FindCoordinator.

    FindCoordinator API 는 카프카 프로듀서가 브로커에게 자신의 트랜잭션을 전담하는 Transaction Coordinator 를 조회하는 기능을 수행합니다.

    일반적으로 Producer 는 bootstrap.servers 로 설정된 브로커의 advertised.listeners 주소를 가지고 실행됩니다.

    그리고 KafkaProducer 는 트랜잭션 실행을 위해서 initTransactions 함수를 호출합니다.

     

    from confluent_kafka import Producer
    
    producer_conf = {
        'bootstrap.servers': 'localhost:19092',
        'enable.idempotence': "true",
        'client.id': 'this_is_my_client_id',
        'transactional.id': 'this_is_my_transactional_id'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()

     

    이때 Producer 의 init_transactions() 함수가 호출되면, 내부적으로 FindCoordinator API 가 bootstrap.servers 에 설정된 브로커 중 하나에게 전송됩니다.

     

    아래의 자료는 tcpdump 를 통해서 TCP Packet 의 기록을 발췌한 내용입니다.

    저는 bootstrap.servers 설정은 일부로 localhost:19092 인 브로커로 고정하였습니다.

    그래서 아래와 같이 "172.19.0.1:62736" 에 해당하는 Producer "kafka1.19092" 인 브로커에게 API 요청을 전송합니다.

    아래는 2개의 TCP Packet 이 관찰됩니다.

     

    첫번째는 Producer 가 Broker 에게 전송한 TCP Packet 입니다.

    이는 0x0034 ~ 0x0037 까지인 "0000 003c" 는 요청의 크기를 나타냅니다.

    즉, Payload 의 크기가 60bytes 임을 의미합니다.

    그리고 가장 중요한 0x0038 ~ 0x0039 까지의 "000a" 가 바로 API Key 를 의미하는데요.

    "000a" 인 10 번의 API  가 바로 FindCoordinator 요청입니다.

     

    그리고 두번째 Packet 은 Kafka Broker 가 Producer 에게 보내는 응답입니다.

    이는 Transaction Coordinator 의 메타데이터를 반환합니다.

    주목할 부분은 "6c6f 6361 6c68 6f73 7400 0098 b4" 인데요.

    이 정보는 localhost:39092 를 의미하며, 98b4 를 십진수로 변환하면 39092 를 뜻합니다.

     

    IP 172.19.0.1.62736 > kafka1.19092: Flags [P.], seq 311:375, ack 700, win 512, options [nop,nop,TS val 2701319600 ecr 1893796122], length 64
    	0x0000:  4500 0074 ea4e 4000 4006 f80a ac13 0001  E..t.N@.@.......
    	0x0010:  ac13 0003 f510 4a94 dcba 3569 636c 788f  ......J...5iclx.
    	0x0020:  8018 0200 5891 0000 0101 080a a102 ddb0  ....X...........
    	0x0030:  70e1 091a 0000 003c 000a 0002 0000 0006  p......<........
    	0x0040:  0014 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 745f 6964 001b 7468 6973 5f69 735f  ent_id..this_is_
    	0x0060:  6d79 5f74 7261 6e73 6163 7469 6f6e 616c  my_transactional
    	0x0070:  5f69 6401                                _id.
        
    IP kafka1.19092 > 172.19.0.1.62736: Flags [P.], seq 700:739, ack 375, win 50494, options [nop,nop,TS val 1893796631 ecr 2701319600], length 39
    	0x0000:  4500 005b 5630 4000 4006 8c42 ac13 0003  E..[V0@.@..B....
    	0x0010:  ac13 0001 4a94 f510 636c 788f dcba 35a9  ....J...clx...5.
    	0x0020:  8018 c53e 5878 0000 0101 080a 70e1 0b17  ...>Xx......p...
    	0x0030:  a102 ddb0 0000 0023 0000 0006 0000 0000  .......#........
    	0x0040:  0000 0004 4e4f 4e45 0000 0003 0009 6c6f  ....NONE......lo
    	0x0050:  6361 6c68 6f73 7400 0098 b4              calhost....

     

    이러한 방식으로 FindCoordinator 는 아무 브로커에게 요청되며, 브로커는 Transaction Coordinator 의 주소 정보를 반환해줍니다.

     

    transactional.id

    Transaction Coordinator 를 선택하는 기준은 transactional.id 입니다.

    transactional.id 와 hashing 또는 Modulo 연산을 통해서 여러 브로커들 중에서 Transaction Coordinator 을 선택합니다.

    어떤 Hash 함수 또는 Modulo 기능을 활용하는지는 알 수 없지만,

    중요한건 transactional.id 를 기준으로 Transaction Coordinator 를 선별하며 이는 항상 동일한 Broker 를 지칭하게 됩니다.

     

     

    __transaction_state Topic.

    카프카에서 Transaction Mode 를 사용하려면 반드시 "__transaction_state" 토픽이 필요합니다.

    하지만 __transaction_state 토픽은 기본적으로 생성되어 있지 않으며, Transaction 을 활성화시킨 최초의 Producer 가 실행될 때에 생성됩니다.

    즉, 카프카는 여러가지 효율적인 관점에서 이렇게 Lazy 한 방식으로 "__transaction_state Topic" 이 생성되는 방식을 채택할 것으로 보입니다.

     

    정확히 "__transaction_state Topic" 이 생성되는 시점은 FindCoordinator API 가 브로커로 전달되는 시점입니다.

    Producer 가 FindCoordinator 요청을 브로커에게 전송한 후에 브로커는 자신의 "__transaction_state Topic" 이 존재하는지 아닌지 여부를 판단합니다.

    그리고 "__transaction_state Topic" 가 존재하지 않다면 이 시점에 생성합니다.

    그래서 최초로 Transactional Producer 가 실행되면 약간의 시간적인 지연이 발생합니다.

    왜냐하면 "__transaction_state Topic" 을 생성해야하기 때문이죠.

     

    아래의 명령어를 통해서 생성된 __transaction_state 토픽이 생성됨을 확인할 수 있습니다.

     

    kafka-topics.sh --bootstrap-server kafka2:9092 --list
    >> __transaction_state

     

    Transaction Coordinator 에서의 API 요청들.

    프로듀서는 Bootstrap Server 인 브로커에게 FindCoordinator API 를 요청하고, 그 응답으로 Transaction Coordinator 의 주소 정보를 획득합니다.

    그리고 프로듀서는 Transaction Coordinator 와 통신을 시작합니다.

     

    아래의 TCP Packet 들은 Producer 와 Transaction Coordinator 사이의 통신 기록입니다.

    첫번째 요청은 Kafka Producer 가 Transaction Coordinator 에게 요청한 TCP Packet 입니다.

    0x00304 에 해당하는 0016 는 십진수로 22번에 해당하는데요.

    API Key 22 번은 InitProducerId 라는 API 요청입니다.

    이 요청은 브로커에서 자신을 위한 Producer ID 와 Epoch 를 요청하는 내용입니다.

     

    그리고 두번째 Packet 은 InitProducerId 요청의 응답에 해당합니다.

    이는 해독이 좀 어렵긴한데, 아마도 0번의 Producer ID 와 0번의 Producer Epoch 를 응답으로 제공했을 가능성이 높습니다.

     

    이러한 방식으로 Producer 는 Transaction Coordinator 로부터 트랜잭션을 수행할 준비를 하게 됩니다.

     

    IP 172.19.0.1.55198 > kafka3.39092: Flags [P.], seq 119:197, ack 440, win 512, options [nop,nop,TS val 2701319132 ecr 3801264823], length 78
    	0x0000:  4500 0082 ae23 4000 4006 3426 ac13 0001  E....#@.@.4&....
    	0x0010:  ac13 0005 d79e 98b4 7e8a 2965 ebbc 8c29  ........~.)e...)
    	0x0020:  8018 0200 58a1 0000 0101 080a a102 dbdc  ....X...........
    	0x0030:  e292 b2b7 0000 004a 0016 0004 0000 0003  .......J........
    	0x0040:  0014 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 745f 6964 001c 7468 6973 5f69 735f  ent_id..this_is_
    	0x0060:  6d79 5f74 7261 6e73 6163 7469 6f6e 616c  my_transactional
    	0x0070:  5f69 6400 00ea 60ff ffff ffff ffff ffff  _id...`.........
    	0x0080:  ff00                                     ..
    
    
    
    IP kafka3.39092 > 172.19.0.1.55198: Flags [P.], seq 440:543, ack 197, win 50583, options [nop,nop,TS val 3801264832 ecr 2701319132], length 103
    	0x0000:  4500 009b d52f 4000 4006 0d01 ac13 0005  E..../@.@.......
    	0x0010:  ac13 0001 98b4 d79e ebbc 8c29 7e8a 29b3  ...........)~.).
    	0x0020:  8018 c597 58ba 0000 0101 080a e292 b2c0  ....X...........
    	0x0030:  a102 dbdc 0000 0063 0000 0002 0000 0000  .......c........
    	0x0040:  0004 0000 0002 0a6c 6f63 616c 686f 7374  .......localhost
    	0x0050:  0000 71a4 0000 0000 0003 0a6c 6f63 616c  ..q........local
    	0x0060:  686f 7374 0000 98b4 0000 0000 0001 0a6c  host...........l
    	0x0070:  6f63 616c 686f 7374 0000 4a94 0000 176e  ocalhost..J....n
    	0x0080:  4d58 7552 7256 4d51 5071 3939 6162 5650  MXuRrVMQPq99abVP
    	0x0090:  3058 7846 7700 0000 0101 00              0XxFw......

     

    __transaction_state 의 Metadata 확인하기.

    아래의 명령을 통해서 현재의 __transaction_state 를 확인할 수 있습니다.

    kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic __transaction_state  --from-beginning --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

     

    아래와 같은 TransactionMetadata 가 확인되는데요.

    위의 FindCoordinator 와 InitProducerId 요청의 결과입니다.

    새로운 글에서 TransactionMetadata 에 대한 상세한 설명을 할 기회를 가져보도록 하겠습니다.

     

    this_is_my_transactional_id::TransactionMetadata(
        transactionalId=this_is_my_transactional_id, 
        producerId=0, 
        producerEpoch=0, 
        txnTimeoutMs=60000, 
        state=Empty, 
        pendingState=None, 
        topicPartitions=Set(), 
        txnStartTimestamp=-1, 
        txnLastUpdateTimestamp=1610129162736
    )

     

     

     

     

     

     

    카프카 실행을 위한 Docker 명령어.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2  
    
    docker run -d --name kafka3 --hostname kafka3 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=3 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:39092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9092,OUTER://localhost:39092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 39092:39092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

     

    반응형
Designed by Tistory.