-
[Kafka] Transaction Coordinator 알아보기Kafka 2024. 2. 7. 23:02728x90반응형
- 목차
들어가며.
Kafka Consumer 입장에서 Group Coordinator 가 존재하듯이, Kafka Producer 는 Transaction Coordinator 가 존재합니다.
Kafka Producer 의 Transaction 처리를 위해서 특정 Broker 는 Transaction Coordinator 로써 동작하게 됩니다.
Transaction Coordinator 는 어떻게 결정될까 ?
먼저 Transactional Producer 에 의해서 Transaction 이 최초로 실행되면,
Transaction State 를 기록하기 위한 __transaction_state Topic 이 생성됩니다.
Transaction Coordinator 는 하나의 Broker 입니다.
아래 이미지는 생성된 __transaction_state Topic 의 이미지이구요.
__consumer_offsets 토픽과 같이 최초 실행되는 시점에 __transaction_state Topic 이 생성됩니다.
이는 transaction.state.log.num.partitions 설정에 의해서 Partition 의 갯수가 결정되는 Internal Topic 인데요.
기본값은 50개입니다.
그래서 50개의 Partition 을 가지는 __transaction_state Topic 이 생성됩니다.
Transactional Producer 가 자신의 transactional.id 를 가지고 트랜잭션을 시작하게되면,
카프카 브로커에게 자신의 transactional.id 를 전달합니다.
이 과정에서 transactional.id 에 해당하는 __transaction_state 의 Partition 이 결정되는데요.
이때에 Modulo Hashing 방식이 사용됩니다.
그리고 결정된 __transaction_state 의 Partition 을 소유한 Leader Broker 가 Transaction Coordinator 가 됩니다.
이는 Group Coordinator 를 결정하는 방식과 동일합니다.
Kafka Transaction Protocol.
Kafka 는 Transactional Producer 와 Transaction Coordinator 사이에 Transaction 통신을 위한 Protocol 이 존재합니다.
이를 Transaction Protocol 이라고 부르구요.
해당 과정은 Transaction API 레벨에서 살펴보도록 하겠습니다.
initTransactions.
Transactional Producer 는 레코드를 전송하기 이전에 Transaction 초기화를 시작합니다.
이를 위한 고수준 API 가 initTransactions 입니다.
먼저 Transactional Producer 는 반드시 transactional.id 를 설정해야합니다.
Java Kafka-Clients 모듈에서 보통 아래와 같이 설정하게 됩니다.
Properties properties = new Properties(); properties.put("transactional.id", "t-id-1"); KafkaProducer producer = new KafkaProducer(properties);
initTransactions 과정에서 Transactional Producer 는 Kafka Broker 에게 자신의 transactional.id 를 전달합니다.
그리고 이러한 Transaction 관련한 요청이 최초의 요청이라면 Kafka Broker 는 __transaction_state 라는 Internal Topic 을 생성합니다.
그리고 transactional.id 에 대응되는 ( 해싱되는 ) __transaction_state 의 Partition 에 Transaction Metadata 를 생성합니다.
또한 전달된 transactional.id 는 Kafka Broker 에 의해서 관리됩니다.
Kafka Broker 는 등록된 transactional.id 를 관리합니다.
Fencing Zombie.
initTransactions 는 transactional.id 를 Kafka Broker 에 등록하는 기능 뿐만 아니라 Zombie 상태의 Producer 가 생성하는 Record 들을 차단합니다.
이를 간단히 테스트해보도록 하겠습니다.
< Docker-Compose >
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka1:9092" depends_on: - "kafka1" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:22181" networks: - kafka kafka1: image: confluentinc/cp-kafka:7.4.3 container_name: kafka1 depends_on: - zookeeper ports: - "29091:29091" - "29191:29191" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:29091,EXTERNALDOCKER://host.docker.internal:29191 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNALDOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1 networks: - kafka networks: kafka: driver: bridge EOF
< Docker-Compose run >
docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d
위의 세가지 Docker Run Command 를 실행하게 되면 http://localhost:9000 에서 아래의 Kafdrop 상태를 확인하실 수 있습니다.
아래의 테스트 케이스는 동일한 transactional.id 로 두개의 Transactional Producer 를 구성하였습니다.
그리고 두 테스트를 동시에 진행하게 되면 먼저 실행된 테스트는 실패하게 됩니다.
@Test void test1() { test(); } @Test void test2() { test(); } void test () { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-producer"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); String topic = "test-topic"; try { KafkaProducer<String, String> producer = new KafkaProducer<>(properties); producer.initTransactions(); producer.beginTransaction(); for (long i = 0; i < 100L;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, String.valueOf(i)); Thread.sleep(100); producer.send(record); producer.flush(); System.out.println(i); } producer.commitTransaction(); producer.close(); } catch (Exception e) { System.out.println(e.getMessage()); } }
< 에러 출력 >
test-transactional-producer 라는 동일한 transactional.id 를 사용하는 여러 Producer 들이 존재하는 경우,
아래처럼 Transaction 이 Aborted 됩니다.
Producer with transactionalId 'test-transactional-producer' and ProducerIdAndEpoch(producerId=1000, epoch=15) attempted to produce with an old epoch
beginTransaction.
beginTransaction 은 Transaction Scope 을 시작하는 API 입니다.
그리고 commitTransaction, abortTransaction 과 함께 Transaction 의 Scope 를 결정합니다.
Transaction Scope 에 대해서 자세히 알아보기 위해서 이어지는 글에서 Transaction Marker 와 Two Phase Commit 에 대해서 알아보도록 하겠습니다.
Transaction Marker.
Transactional Producer 에 의해서 생성되는 카프카 레코드들은 목적지 Topic 의 Partition 으로 저장됩니다.
여기서 중요한 점은 Transaction 이 Aborted 되어도 Record 들은 Topic 에 저장이 된다는 점입니다.
예를 들어보겠습니다.
아래의 예시는 Kafka Record 들을 Abort 하는 Transaction 코드입니다.
void testAbort () { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-producer"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); String topic = "abort-topic"; try { KafkaProducer<String, String> producer = new KafkaProducer<>(properties); producer.initTransactions(); producer.beginTransaction(); for (long i = 0; i < 1000L;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, String.valueOf(i)); producer.send(record); producer.flush(); System.out.println(i); } producer.abortTransaction(); producer.close(); } catch (Exception e) { System.out.println(e.getMessage()); } }
위 Abort Transaction 예시 Function 을 실행하게 되면 아래와 같이 abort-topic 에 레코드들이 쌓입니다.
Aborted Record 조회하기.
kafka-console-consumer shell 을 통해서 abort-topic 을 조회해보도록 하겠습니다.
조회 결과로써 아래처럼 aborted 된 메시지들이 조회됩니다.
kafka-console-consumer --bootstrap-server localhost:9092 --topic abort-topic \ --group test --from-beginning
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
하지만 아래와 같이 read_committed Isolation-level 을 적용하게 되면 aborted 된 레코드들은 조회되지 않습니다.
kafka-console-consumer --bootstrap-server localhost:9092 --topic abort-topic \ --group test --from-beginning \ --isolation-level read_committed
// Empty
여기서 사용되는 특수한 Record 가 바로 Transaction Marker 입니다.
Transaction Marker 는 Transaction Control 의 용도로 사용되는 특수한 Marker 이구요.
read_committed Isolation Level 로 토픽을 읽게되면 Marker 를 기준으로 Marker 이후의 레코드를 Consume 을 할지 말지를 결정합니다.
kafka-dump-log Shell 을 통해서 Transaction Marker 를 확인할 수 있습니다.
kafka-dump-log --files /var/lib/kafka/data/abort-topic-0/00000000000000000000.log \ --print-data-log --skip-record-metadata
baseOffset: 996 lastOffset: 996 count: 1 baseSequence: 996 lastSequence: 996 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70606 CreateTime: 1707791372315 size: 71 magic: 2 compresscodec: none crc: 3922908830 isvalid: true payload: 996 baseOffset: 997 lastOffset: 997 count: 1 baseSequence: 997 lastSequence: 997 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70677 CreateTime: 1707791372316 size: 71 magic: 2 compresscodec: none crc: 2798098045 isvalid: true payload: 997 baseOffset: 998 lastOffset: 998 count: 1 baseSequence: 998 lastSequence: 998 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70748 CreateTime: 1707791372316 size: 71 magic: 2 compresscodec: none crc: 3912080259 isvalid: true payload: 998 baseOffset: 999 lastOffset: 999 count: 1 baseSequence: 999 lastSequence: 999 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70819 CreateTime: 1707791372317 size: 71 magic: 2 compresscodec: none crc: 2512340082 isvalid: true payload: 999 baseOffset: 1000 lastOffset: 1000 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 70890 CreateTime: 1707791372322 size: 78 magic: 2 compresscodec: none crc: 712950927 isvalid: true baseOffset: 1001 lastOffset: 1001 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 4 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70968 CreateTime: 1707791378039 size: 69 magic: 2 compresscodec: none crc: 3793188450 isvalid: true payload: 0 baseOffset: 1002 lastOffset: 1002 count: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 4 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 71037 CreateTime: 1707791378063 size: 69 magic: 2 compresscodec: none crc: 2700982723 isvalid: true payload: 1
위 출력에서 baseOffset 1000 에 해당하는 Record 가 Transaction Marker 로 사용된 Record 입니다.
이를 풀어보면 아래와 같구요.
baseOffset: 1000 lastOffset: 1000 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 70890 CreateTime: 1707791372322 size: 78 magic: 2 compresscodec: none crc: 712950927 isvalid: true
반응형'Kafka' 카테고리의 다른 글
[Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 [Kafka-Streams] Json 기반 Custom Serdes 구현하기 (0) 2024.02.17 [Kafka] Rebalance 가 발생하는 경우들 알아보기 ( Rebalance Scenario ) (0) 2024.02.04 [Kafka] Kafka Rebalance Protocol 알아보기 ( JoinGroup, LeaveGroup ) (0) 2024.02.04 [Kafka] API Version 알아보기 ( Protocol ) (0) 2024.01.31