ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Transaction 과 Commit / Abort Maker 알아보기
    Kafka/Kafka Producer 2024. 6. 29. 14:46
    반응형

    - 목차

     

    들어가며.

    카프카 프로듀서의 트랜잭션이 활성화되면 프로듀서는 Commit / Abort Marker 에 해당하는 레코드를 생성할 수 있습니다.

    이는 Kafka Clients 관점에선 CommitTransaction 이나 Abort Transaction 함수로써 제공되는 기능인데요.

    Kafka Protocol API 관점에서는 EndTxn 와 같은 API 가 제공되고, Commit/Abort Transaction 함수 내부적으로 EndTxn 가 호출됩니다.

    이번 글에서는 Commit 과 Abort Marker 가 어떻게 생성되고, Log Segment File 에서 어떻게 저장되는지에 대해서 알아보도록 하겠습니다.

     

    Transactional Producer 실행하기.

    프로듀서의 Transaction 을 활성화하는 방법에 대해서 설명해보겠습니다. 

     

    enable.idempotence

    Producer 의 설정 중에서 enable.idempotence 를 활성화시킵니다.

    enable.idempotence 는 기본적으로 True 로써 설정되어 있습니다.

    따라서 기본적으로 카프카 프로듀서는 멱등성있게 데이터를 생성할 수 있는 구조입니다.

     

    https://westlife0615.tistory.com/933

     

    [Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number )

    - 목차 들어가며.Kafka Producer 는 Acks 모드로 동작을 하게 되면 데이터의 중복 생성의 위험이 존재합니다.카프카에서 중복 데이터 생성이 발생하는 원인은 at least once 방식을 사용하는 카프카의 데

    westlife0615.tistory.com

     

    transactional.id

    카프카 프로듀서가 트랜잭션을 활성화시키기 위해서 transactional.id 라는 식별값이 필요합니다.

    프로듀서의 transactional.id 는 하나의 프로듀서의 고유한 값으로 설정되어야 합니다.

    그리고 프로듀서는 내부적으로 initTransactions 함수를 호출하게 되면, transactional.id 를 Bootstrap Server 로 등록된 브로커에게 전달합니다.

     

    내부적으로는 FindCoordinator 라는 API 를 Bootstrap Server 인 브로커에게 요청하게 되구요.

    브로커는 프로듀서가 전달한 transactional.id 를 분석해서 "너의 트랜잭션을 처리할 브로커는 broker-XX 이 좋겠구나" 라고 판단 후,

    특정 브로커를 Transaction Coordinator 로 지정합니다.

     

    Transaction Coordinator 에 대해서 간단히 설정드리면,

    하나의 Kafka Producer 는 자신의 트랜잭션을 전담해주는 하나의 Broker 를 배정받습니다.

    그리고 여러 개의 브로커들 중에서 Transaction Coordinator 를 배정받는 원리는 transactional.id 를 Hashing 또는 Modulo 연산을 적용해 하나의 Broker 를 선택하는 방식입니다.

     

     

    https://westlife0615.tistory.com/945

     

    [Kafka] FindCoordinator 와 Transaction Coordinator 알아보기

    - 목차 들어가며.카프카 프로듀서는 트랜잭션을 활성화하면 이 프로듀서를 전담하는 Transaction Coordinator 가 선출됩니다. Transaction Coordinator 는 여러 브로커들 중에서 하나의 브로커가 Transaction Coo

    westlife0615.tistory.com

     

     

    __transaction_state Topic.

    카프카 프로듀서가 트랜잭션을 사용하기 위해서는 "__transaction_state" 라는 내장 토픽이 필요합니다.

    이는 kafka-topics.sh --create 이러한 방식으로 사용자가 직접 생성하는 토픽이 아닙니다.

    카프카 시스템에서 자동으로 생성해줍니다.

     

    카프카의 구조상, "__transaction_state" 토픽은 Lazy 한 방식으로 생성됩니다.

    그래서 카프카 클러스터를 실행한 즉시 "__transaction_state" 토픽이 관찰되지는 않습니다.

    Kafka Producer 가 트랜잭션 모드로 실행되는 그 시점에 "__transaction_state" 이 생성됩니다.

     

    "__transaction_state" 토픽은 Transaction Metadata 라는 정보를 저장합니다.

    어떤 transactional.id 가 어떤 트랜잭션 동작을 시도하는지에 대한 기록을 합니다.

     

    예를 들어 아래와 같은 정보가 저장되는데요. 

    this_is_my_transactional_id::TransactionMetadata(
        transactionalId=this_is_my_transactional_id, 
        producerId=0, 
        producerEpoch=11, 
        txnTimeoutMs=60000, 
        state=CompleteAbort, 
        pendingState=None, 
        topicPartitions=Set(), 
        txnStartTimestamp=1610109333275, 
        txnLastUpdateTimestamp=1610109342955
    )

     

    1. transactional.id 가 "this_is_my_transactional_id" 로 설정된 Producer 가 있다.
    2. producer id 가 0 인 것으로보아 카프카 클러스터에서 최초로 실행된 Producer 이다.
    3. producer epoch 가 11 이기 때문에 Producer 는 11번 재실행되었다.
    4. state 가 CompleteAbort 이기 때문에 가장 마지막의 요청은 AbortTransaction 이겠군. 

    등과 같은 정보를 확인할 수 있습니다.

     

     

    Commit / Abort Marker 알아보기.

    일반적으로 Kafka Transactional Producer 는 아래와 같은 형식의 코드가 작성됩니다.

    주목할 점은 init_transactions, begin_transaction, commit_transaction, abort_transaction 입니다.

     

    from confluent_kafka import Producer
    
    producer_conf = {
        'bootstrap.servers': 'localhost:19092',
        'enable.idempotence': "true",
        'client.id': 'this_is_my_client_id',
        'transactional.id': 'this_is_my_transactional_id'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()
    producer.begin_transaction()
    producer.produce("test", key=b'1', value=f"this is a message".encode())
    producer.flush()
    producer.commit_transaction()
    # or 
    # producer.abort_transaction()

     

    그리고 이번 글의 주제인 commit 과 abort Marker 는 commit_transaction 과 abort_transaction 함수와 관련이 깊습니다.

     

    Commit Transaction 은 Commit Marker 를 생성.

    commit transaction 을 호출하면 카프카의 Topic/Partition 의 Log Segment FIle 에 Commit Marker 라는 레코드가 생성됩니다.

    직접 눈으로 확인해보겠습니다.

    혹시 Kafka 를 실행시켜야 한다면 현 페이지의 가장 하단에 Kafka 를 실행할 수 있는 Docker Command 를 작성해두었으니 참고 바랍니다.

     

    아래의 스크립트는 Transaction 내에서 "test" 라는 Topic 에 1개의 Record 를 추가하고, commit 을 적용하는 예시입니다.

    from confluent_kafka import Producer
    
    producer_conf = {
        'bootstrap.servers': 'localhost:19092',
        'enable.idempotence': "true",
        'client.id': 'this_is_my_client_id',
        'transactional.id': 'this_is_my_transactional_id'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()
    producer.begin_transaction()
    producer.produce("test", key=b'1', value=f"this is a message".encode())
    producer.flush()
    producer.commit_transaction()

     

    위 스크립트의 실행 후에 생성된 Topic 의 레코드 목록을 확인해봅니다.

    kafka-dump-log.sh \
        --files /bitnami/kafka/data/test-0/00000000000000000000.log \
        --print-data-log \
        --deep-iteration

     

    kafka-dump-log.sh 을 통해서 특정 Log Segment FIle 을 해독할 수 있습니다.

    그리고 아래와 같이 2개의 Record 가 존재합니다.

    1번째 레코드는 Produce 과정을 통해서 생성된 데이터이구요.

    2번째 레코드는 Commit Marker 입니다.  (endTxnMarker: COMMIT)

     

    Dumping /bitnami/kafka/data/test-0/00000000000000000000.log
    Starting offset: 0
    
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1610171195030 size: 86 magic: 2 compresscodec: none crc: 1022704055 isvalid: true
    | offset: 0 CreateTime: 1610171195030 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message
    
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 86 CreateTime: 1610171196315 size: 78 magic: 2 compresscodec: none crc: 2573039889 isvalid: true
    | offset: 1 CreateTime: 1610171196315 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

     

     

    Abort Transaction 은 Abort Marker 를 생성.

    여기에서 추가적으로 Abort Marker 를 추가해봅니다.

     

    from confluent_kafka import Producer
    
    producer_conf = {
        'bootstrap.servers': 'localhost:19092',
        'enable.idempotence': "true",
        'client.id': 'this_is_my_client_id',
        'transactional.id': 'this_is_my_transactional_id'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()
    producer.begin_transaction()
    producer.produce("test", key=b'1', value=f"this is a message".encode())
    producer.flush()
    producer.abort_transaction()

     

    위 스크립트가 실행되면 아래와 같이 2개의 레코드가 추가됩니다.

    세번째 레코드는 새롭게 추가된 데이터 레코드이구요.

    마지막 레코드는 Abort Marker 가 추가되었습니다. (endTxnMarker: ABORT)

     

    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1610171195030 size: 86 magic: 2 compresscodec: none crc: 1022704055 isvalid: true
    | offset: 0 CreateTime: 1610171195030 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message
    
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 86 CreateTime: 1610171196315 size: 78 magic: 2 compresscodec: none crc: 2573039889 isvalid: true
    | offset: 1 CreateTime: 1610171196315 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
    
    
    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 164 CreateTime: 1610171480550 size: 86 magic: 2 compresscodec: none crc: 1496301709 isvalid: true
    | offset: 2 CreateTime: 1610171480550 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message
    
    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 250 CreateTime: 1610171481552 size: 78 magic: 2 compresscodec: none crc: 699889751 isvalid: true
    | offset: 3 CreateTime: 1610171481552 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 0

     

    __transaction_state Topic 확인해보기.

    __transaction_state Topic 은 kafka-console-consumer.sh 를 통해서 확인합니다.

    kafka-console-consumer.sh \
    	--bootstrap-server kafka1:9092 \
    	--topic __transaction_state \
    	--from-beginning \
    	--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

     

    아래와 같은 출력 결과가 확인됩니다.

     

    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=0, 
    	txnTimeoutMs=60000, 
    	state=Empty, 
    	pendingState=None, 
    	topicPartitions=Set(), 
    	txnStartTimestamp=-1, 
    	txnLastUpdateTimestamp=1610171194970
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=0, 
    	txnTimeoutMs=60000, 
    	state=Ongoing, 
    	pendingState=None, 
    	topicPartitions=Set(test-0), 
    	txnStartTimestamp=1610171196233, 
    	txnLastUpdateTimestamp=1610171196233
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=0, 
    	txnTimeoutMs=60000, 
    	state=PrepareCommit, 
    	pendingState=None, 
    	topicPartitions=Set(test-0), 
    	txnStartTimestamp=1610171196233, 
    	txnLastUpdateTimestamp=1610171196283
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=0, 
    	txnTimeoutMs=60000, 
    	state=CompleteCommit, 
    	pendingState=None, 
    	topicPartitions=Set(), 
    	txnStartTimestamp=1610171196233, 
    	txnLastUpdateTimestamp=1610171196286
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=1, 
    	txnTimeoutMs=60000, 
    	state=Empty, 
    	pendingState=None, 
    	topicPartitions=Set(), 
    	txnStartTimestamp=-1, 
    	txnLastUpdateTimestamp=1610171480544
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=1, 
    	txnTimeoutMs=60000, 
    	state=Ongoing, 
    	pendingState=None, 
    	topicPartitions=Set(test-0), 
    	txnStartTimestamp=1610171481520, 
    	txnLastUpdateTimestamp=1610171481520
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=1, 
    	txnTimeoutMs=60000, 
    	state=PrepareAbort, 
    	pendingState=None, 
    	topicPartitions=Set(test-0), 
    	txnStartTimestamp=1610171481520, 
    	txnLastUpdateTimestamp=1610171481537
    	)
    
    this_is_my_transactional_id::TransactionMetadata(
    	transactionalId=this_is_my_transactional_id, 
    	producerId=0, 
    	producerEpoch=1, 
    	txnTimeoutMs=60000, 
    	state=CompleteAbort, 
    	pendingState=None, 
    	topicPartitions=Set(), 
    	txnStartTimestamp=1610171481520, 
    	txnLastUpdateTimestamp=1610171481538
    	)

     

    유의해서 살펴보아야 할 몇가지 포인트에 대해서 설명드리도록 하겠습니다.

     

    1. transactional.id

    총 Transaction Metadata 의 갯수는 8개입니다.

    그리고 이 모든 transactiona.id 는 동일합니다. 

     

    2. producer id

    producer id 또한 동일한 transactional.id 를 사용하였기 때문에 동일한 값을 가집니다.

    다만 생성된 카프카 클러스터에서 최초로 연결된 Producer 이기 때문에 producer id 를 0 으로 지정받았습니다.

     

    3. producer epoch

    저는 commit marker 실험과 abort marker 를 생성하는 실험을 두차례 나누어서 진행하였습니다.

    따라서 위 결과를 확인해보시면 처음 4개의 Transaction Metadata 는 Epoch 가 0 입니다.

    그리고 마지막 4개의 Transaction Metadata 는 Epoch 가 1 로 지정되어 있습니다.

     

    4. state

    State 는 현재 이 transactional.id 를 가지는 프로듀서가 어떤 트랜잭션 상황에 있는지를 알려줍니다.

    당연히 현재는 CompleteAbort 상태입니다.

    왜냐면 저희 마지막 행동이 abort_transaction 함수의 호출이였기 때문입니다.

     

    Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort 등의 상태가 존재합니다.

    다른 글에서 이 상태들에 대한 심도있는 이야기를 하려고 합니다.

     

    5. topicPartitions

    트랜잭션에 관여한 Partition 의 목록들을 저장합니다.

    그러니깐 Transaction 범위 내에서 여러 partition 들에게 레코드가 추가될 수 있겠죠 ??

    이렇게 트랜잭션 동안에 레코드가 추가된 Partition 들이 topicPartitions 에 기록됩니다.

    그리고 제일 마지막에 Commit / Abort Marker 를 추가할 Topic-Partition 정보를 이 Transaction Metadata 를 통해서 확인합니다.

     

     

     

    카프카 실행을 위한 Docker 명령어.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

     

    반응형
Designed by Tistory.