-
[Kafka] Transaction 과 Commit / Abort Maker 알아보기Kafka/Kafka Producer 2024. 6. 29. 14:46반응형
- 목차
들어가며.
카프카 프로듀서의 트랜잭션이 활성화되면 프로듀서는 Commit / Abort Marker 에 해당하는 레코드를 생성할 수 있습니다.
이는 Kafka Clients 관점에선 CommitTransaction 이나 Abort Transaction 함수로써 제공되는 기능인데요.
Kafka Protocol API 관점에서는 EndTxn 와 같은 API 가 제공되고, Commit/Abort Transaction 함수 내부적으로 EndTxn 가 호출됩니다.
이번 글에서는 Commit 과 Abort Marker 가 어떻게 생성되고, Log Segment File 에서 어떻게 저장되는지에 대해서 알아보도록 하겠습니다.
Transactional Producer 실행하기.
프로듀서의 Transaction 을 활성화하는 방법에 대해서 설명해보겠습니다.
enable.idempotence
Producer 의 설정 중에서 enable.idempotence 를 활성화시킵니다.
enable.idempotence 는 기본적으로 True 로써 설정되어 있습니다.
따라서 기본적으로 카프카 프로듀서는 멱등성있게 데이터를 생성할 수 있는 구조입니다.
https://westlife0615.tistory.com/933
[Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number )
- 목차 들어가며.Kafka Producer 는 Acks 모드로 동작을 하게 되면 데이터의 중복 생성의 위험이 존재합니다.카프카에서 중복 데이터 생성이 발생하는 원인은 at least once 방식을 사용하는 카프카의 데
westlife0615.tistory.com
transactional.id
카프카 프로듀서가 트랜잭션을 활성화시키기 위해서 transactional.id 라는 식별값이 필요합니다.
프로듀서의 transactional.id 는 하나의 프로듀서의 고유한 값으로 설정되어야 합니다.
그리고 프로듀서는 내부적으로 initTransactions 함수를 호출하게 되면, transactional.id 를 Bootstrap Server 로 등록된 브로커에게 전달합니다.
내부적으로는 FindCoordinator 라는 API 를 Bootstrap Server 인 브로커에게 요청하게 되구요.
브로커는 프로듀서가 전달한 transactional.id 를 분석해서 "너의 트랜잭션을 처리할 브로커는 broker-XX 이 좋겠구나" 라고 판단 후,
특정 브로커를 Transaction Coordinator 로 지정합니다.
Transaction Coordinator 에 대해서 간단히 설정드리면,
하나의 Kafka Producer 는 자신의 트랜잭션을 전담해주는 하나의 Broker 를 배정받습니다.
그리고 여러 개의 브로커들 중에서 Transaction Coordinator 를 배정받는 원리는 transactional.id 를 Hashing 또는 Modulo 연산을 적용해 하나의 Broker 를 선택하는 방식입니다.
https://westlife0615.tistory.com/945
[Kafka] FindCoordinator 와 Transaction Coordinator 알아보기
- 목차 들어가며.카프카 프로듀서는 트랜잭션을 활성화하면 이 프로듀서를 전담하는 Transaction Coordinator 가 선출됩니다. Transaction Coordinator 는 여러 브로커들 중에서 하나의 브로커가 Transaction Coo
westlife0615.tistory.com
__transaction_state Topic.
카프카 프로듀서가 트랜잭션을 사용하기 위해서는 "__transaction_state" 라는 내장 토픽이 필요합니다.
이는 kafka-topics.sh --create 이러한 방식으로 사용자가 직접 생성하는 토픽이 아닙니다.
카프카 시스템에서 자동으로 생성해줍니다.
카프카의 구조상, "__transaction_state" 토픽은 Lazy 한 방식으로 생성됩니다.
그래서 카프카 클러스터를 실행한 즉시 "__transaction_state" 토픽이 관찰되지는 않습니다.
Kafka Producer 가 트랜잭션 모드로 실행되는 그 시점에 "__transaction_state" 이 생성됩니다.
"__transaction_state" 토픽은 Transaction Metadata 라는 정보를 저장합니다.
어떤 transactional.id 가 어떤 트랜잭션 동작을 시도하는지에 대한 기록을 합니다.
예를 들어 아래와 같은 정보가 저장되는데요.
this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=11, txnTimeoutMs=60000, state=CompleteAbort, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1610109333275, txnLastUpdateTimestamp=1610109342955 )
- transactional.id 가 "this_is_my_transactional_id" 로 설정된 Producer 가 있다.
- producer id 가 0 인 것으로보아 카프카 클러스터에서 최초로 실행된 Producer 이다.
- producer epoch 가 11 이기 때문에 Producer 는 11번 재실행되었다.
- state 가 CompleteAbort 이기 때문에 가장 마지막의 요청은 AbortTransaction 이겠군.
등과 같은 정보를 확인할 수 있습니다.
Commit / Abort Marker 알아보기.
일반적으로 Kafka Transactional Producer 는 아래와 같은 형식의 코드가 작성됩니다.
주목할 점은 init_transactions, begin_transaction, commit_transaction, abort_transaction 입니다.
from confluent_kafka import Producer producer_conf = { 'bootstrap.servers': 'localhost:19092', 'enable.idempotence': "true", 'client.id': 'this_is_my_client_id', 'transactional.id': 'this_is_my_transactional_id' } producer = Producer(producer_conf) producer.init_transactions() producer.begin_transaction() producer.produce("test", key=b'1', value=f"this is a message".encode()) producer.flush() producer.commit_transaction() # or # producer.abort_transaction()
그리고 이번 글의 주제인 commit 과 abort Marker 는 commit_transaction 과 abort_transaction 함수와 관련이 깊습니다.
Commit Transaction 은 Commit Marker 를 생성.
commit transaction 을 호출하면 카프카의 Topic/Partition 의 Log Segment FIle 에 Commit Marker 라는 레코드가 생성됩니다.
직접 눈으로 확인해보겠습니다.
혹시 Kafka 를 실행시켜야 한다면 현 페이지의 가장 하단에 Kafka 를 실행할 수 있는 Docker Command 를 작성해두었으니 참고 바랍니다.
아래의 스크립트는 Transaction 내에서 "test" 라는 Topic 에 1개의 Record 를 추가하고, commit 을 적용하는 예시입니다.
from confluent_kafka import Producer producer_conf = { 'bootstrap.servers': 'localhost:19092', 'enable.idempotence': "true", 'client.id': 'this_is_my_client_id', 'transactional.id': 'this_is_my_transactional_id' } producer = Producer(producer_conf) producer.init_transactions() producer.begin_transaction() producer.produce("test", key=b'1', value=f"this is a message".encode()) producer.flush() producer.commit_transaction()
위 스크립트의 실행 후에 생성된 Topic 의 레코드 목록을 확인해봅니다.
kafka-dump-log.sh \ --files /bitnami/kafka/data/test-0/00000000000000000000.log \ --print-data-log \ --deep-iteration
kafka-dump-log.sh 을 통해서 특정 Log Segment FIle 을 해독할 수 있습니다.
그리고 아래와 같이 2개의 Record 가 존재합니다.
1번째 레코드는 Produce 과정을 통해서 생성된 데이터이구요.
2번째 레코드는 Commit Marker 입니다. (endTxnMarker: COMMIT)
Dumping /bitnami/kafka/data/test-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1610171195030 size: 86 magic: 2 compresscodec: none crc: 1022704055 isvalid: true | offset: 0 CreateTime: 1610171195030 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 86 CreateTime: 1610171196315 size: 78 magic: 2 compresscodec: none crc: 2573039889 isvalid: true | offset: 1 CreateTime: 1610171196315 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
Abort Transaction 은 Abort Marker 를 생성.
여기에서 추가적으로 Abort Marker 를 추가해봅니다.
from confluent_kafka import Producer producer_conf = { 'bootstrap.servers': 'localhost:19092', 'enable.idempotence': "true", 'client.id': 'this_is_my_client_id', 'transactional.id': 'this_is_my_transactional_id' } producer = Producer(producer_conf) producer.init_transactions() producer.begin_transaction() producer.produce("test", key=b'1', value=f"this is a message".encode()) producer.flush() producer.abort_transaction()
위 스크립트가 실행되면 아래와 같이 2개의 레코드가 추가됩니다.
세번째 레코드는 새롭게 추가된 데이터 레코드이구요.
마지막 레코드는 Abort Marker 가 추가되었습니다. (endTxnMarker: ABORT)
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1610171195030 size: 86 magic: 2 compresscodec: none crc: 1022704055 isvalid: true | offset: 0 CreateTime: 1610171195030 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 86 CreateTime: 1610171196315 size: 78 magic: 2 compresscodec: none crc: 2573039889 isvalid: true | offset: 1 CreateTime: 1610171196315 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0 baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 164 CreateTime: 1610171480550 size: 86 magic: 2 compresscodec: none crc: 1496301709 isvalid: true | offset: 2 CreateTime: 1610171480550 keySize: 1 valueSize: 17 sequence: 0 headerKeys: [] key: 1 payload: this is a message baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 250 CreateTime: 1610171481552 size: 78 magic: 2 compresscodec: none crc: 699889751 isvalid: true | offset: 3 CreateTime: 1610171481552 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 0
__transaction_state Topic 확인해보기.
__transaction_state Topic 은 kafka-console-consumer.sh 를 통해서 확인합니다.
kafka-console-consumer.sh \ --bootstrap-server kafka1:9092 \ --topic __transaction_state \ --from-beginning \ --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"
아래와 같은 출력 결과가 확인됩니다.
this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1610171194970 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(test-0), txnStartTimestamp=1610171196233, txnLastUpdateTimestamp=1610171196233 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(test-0), txnStartTimestamp=1610171196233, txnLastUpdateTimestamp=1610171196283 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1610171196233, txnLastUpdateTimestamp=1610171196286 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1610171480544 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(test-0), txnStartTimestamp=1610171481520, txnLastUpdateTimestamp=1610171481520 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=PrepareAbort, pendingState=None, topicPartitions=Set(test-0), txnStartTimestamp=1610171481520, txnLastUpdateTimestamp=1610171481537 ) this_is_my_transactional_id::TransactionMetadata( transactionalId=this_is_my_transactional_id, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=CompleteAbort, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1610171481520, txnLastUpdateTimestamp=1610171481538 )
유의해서 살펴보아야 할 몇가지 포인트에 대해서 설명드리도록 하겠습니다.
1. transactional.id
총 Transaction Metadata 의 갯수는 8개입니다.
그리고 이 모든 transactiona.id 는 동일합니다.
2. producer id
producer id 또한 동일한 transactional.id 를 사용하였기 때문에 동일한 값을 가집니다.
다만 생성된 카프카 클러스터에서 최초로 연결된 Producer 이기 때문에 producer id 를 0 으로 지정받았습니다.
3. producer epoch
저는 commit marker 실험과 abort marker 를 생성하는 실험을 두차례 나누어서 진행하였습니다.
따라서 위 결과를 확인해보시면 처음 4개의 Transaction Metadata 는 Epoch 가 0 입니다.
그리고 마지막 4개의 Transaction Metadata 는 Epoch 가 1 로 지정되어 있습니다.
4. state
State 는 현재 이 transactional.id 를 가지는 프로듀서가 어떤 트랜잭션 상황에 있는지를 알려줍니다.
당연히 현재는 CompleteAbort 상태입니다.
왜냐면 저희 마지막 행동이 abort_transaction 함수의 호출이였기 때문입니다.
Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort 등의 상태가 존재합니다.
다른 글에서 이 상태들에 대한 심도있는 이야기를 하려고 합니다.
5. topicPartitions
트랜잭션에 관여한 Partition 의 목록들을 저장합니다.
그러니깐 Transaction 범위 내에서 여러 partition 들에게 레코드가 추가될 수 있겠죠 ??
이렇게 트랜잭션 동안에 레코드가 추가된 Partition 들이 topicPartitions 에 기록됩니다.
그리고 제일 마지막에 Commit / Abort Marker 를 추가할 Topic-Partition 정보를 이 Transaction Metadata 를 통해서 확인합니다.
카프카 실행을 위한 Docker 명령어.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
반응형'Kafka > Kafka Producer' 카테고리의 다른 글