ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Transaction Coordinator 알아보기
    Kafka 2024. 2. 7. 23:02
    728x90
    반응형

    - 목차

     

    들어가며.

    Kafka Consumer 입장에서 Group Coordinator 가 존재하듯이, Kafka Producer 는 Transaction Coordinator 가 존재합니다.

    Kafka Producer 의 Transaction 처리를 위해서 특정 Broker 는 Transaction Coordinator 로써 동작하게 됩니다.

     

    Transaction Coordinator 는 어떻게 결정될까 ?

    먼저 Transactional Producer 에 의해서 Transaction 이 최초로 실행되면,

    Transaction State 를 기록하기 위한 __transaction_state Topic 이 생성됩니다.

    Transaction Coordinator 는 하나의 Broker 입니다.

    아래 이미지는 생성된 __transaction_state Topic 의 이미지이구요.

    __consumer_offsets 토픽과 같이 최초 실행되는 시점에 __transaction_state Topic 이 생성됩니다.

     

    이는 transaction.state.log.num.partitions 설정에 의해서 Partition 의 갯수가 결정되는 Internal Topic 인데요.

    기본값은 50개입니다.

    그래서 50개의 Partition 을 가지는 __transaction_state Topic 이 생성됩니다.

     

    Transactional Producer 가 자신의 transactional.id 를 가지고 트랜잭션을 시작하게되면,

    카프카 브로커에게 자신의 transactional.id 를 전달합니다.

    이 과정에서 transactional.id 에 해당하는 __transaction_state 의 Partition 이 결정되는데요.

    이때에 Modulo Hashing 방식이 사용됩니다.

     

    그리고 결정된 __transaction_state 의 Partition 을 소유한 Leader Broker 가 Transaction Coordinator 가 됩니다.

    이는 Group Coordinator 를 결정하는 방식과 동일합니다.

     

     

    Kafka Transaction Protocol.

    Kafka 는 Transactional Producer 와 Transaction Coordinator 사이에 Transaction 통신을 위한 Protocol 이 존재합니다.

    이를 Transaction Protocol 이라고 부르구요.

    해당 과정은 Transaction API 레벨에서 살펴보도록 하겠습니다.

    initTransactions.

    Transactional Producer 는 레코드를 전송하기 이전에 Transaction 초기화를 시작합니다.

    이를 위한 고수준 API 가 initTransactions 입니다.

    먼저 Transactional Producer 는 반드시 transactional.id 를 설정해야합니다.

    Java Kafka-Clients 모듈에서 보통 아래와 같이 설정하게 됩니다.

    Properties properties = new Properties();
    properties.put("transactional.id", "t-id-1");
    KafkaProducer producer = new KafkaProducer(properties);

     

    initTransactions 과정에서 Transactional Producer 는 Kafka Broker 에게 자신의 transactional.id 를 전달합니다.

    그리고 이러한 Transaction 관련한 요청이 최초의 요청이라면 Kafka Broker 는 __transaction_state 라는 Internal Topic 을 생성합니다.

    그리고 transactional.id 에 대응되는 ( 해싱되는 ) __transaction_state 의 Partition 에 Transaction Metadata 를 생성합니다.

    또한 전달된 transactional.id 는 Kafka Broker 에 의해서 관리됩니다.

    Kafka Broker 는 등록된 transactional.id 를 관리합니다.

     

     

     

    Fencing Zombie.

    initTransactions 는 transactional.id 를 Kafka Broker 에 등록하는 기능 뿐만 아니라 Zombie 상태의 Producer 가 생성하는 Record 들을 차단합니다.

    이를 간단히 테스트해보도록 하겠습니다.

     

    < Docker-Compose >

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:9092"
        depends_on:
          - "kafka1" 
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:22181"
        networks:
          - kafka      
      kafka1:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka1
        depends_on:
          - zookeeper
        ports:
          - "29091:29091"
          - "29191:29191"      
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:29091,EXTERNALDOCKER://host.docker.internal:29191
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNALDOCKER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1      
        networks:
          - kafka    
    networks:
      kafka:
        driver: bridge
    EOF

     

    < Docker-Compose run >

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

     

     

    위의 세가지 Docker Run Command 를 실행하게 되면 http://localhost:9000 에서 아래의 Kafdrop 상태를 확인하실 수 있습니다.

     

     

    아래의 테스트 케이스는 동일한 transactional.id 로 두개의 Transactional Producer 를 구성하였습니다.

    그리고 두 테스트를 동시에 진행하게 되면 먼저 실행된 테스트는 실패하게 됩니다.

      @Test
      void test1() {
        test();
      }
    
      @Test
      void test2() {
        test();
      }
    
      void test () {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-producer");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        String topic = "test-topic";
    
        try {
          KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
          producer.initTransactions();
          producer.beginTransaction();
          for (long i = 0; i < 100L;i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, String.valueOf(i));
            Thread.sleep(100);
            producer.send(record);
            producer.flush();
            System.out.println(i);
          }
          producer.commitTransaction();
          producer.close();
        } catch (Exception e) {
          System.out.println(e.getMessage());
        }
      }

     

    < 에러 출력 >

     

    test-transactional-producer 라는 동일한 transactional.id 를 사용하는 여러 Producer 들이 존재하는 경우,

    아래처럼 Transaction 이 Aborted 됩니다.

    Producer with transactionalId 'test-transactional-producer' and ProducerIdAndEpoch(producerId=1000, epoch=15) 
    attempted to produce with an old epoch

     

    beginTransaction.

    beginTransaction 은 Transaction Scope 을 시작하는 API 입니다.

    그리고 commitTransaction, abortTransaction 과 함께 Transaction 의 Scope 를 결정합니다.

    Transaction Scope 에 대해서 자세히 알아보기 위해서 이어지는 글에서 Transaction Marker 와 Two Phase Commit 에 대해서 알아보도록 하겠습니다.

     

     

    Transaction Marker.

    Transactional Producer 에 의해서 생성되는 카프카 레코드들은 목적지 Topic 의 Partition 으로 저장됩니다.

    여기서 중요한 점은 Transaction 이 Aborted 되어도 Record 들은 Topic 에 저장이 된다는 점입니다.

    예를 들어보겠습니다.

     

    아래의 예시는 Kafka Record 들을 Abort 하는 Transaction 코드입니다.

      void testAbort () {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-producer");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        String topic = "abort-topic";
    
        try {
          KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
          producer.initTransactions();
          producer.beginTransaction();
          for (long i = 0; i < 1000L;i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, String.valueOf(i));
            producer.send(record);
            producer.flush();
            System.out.println(i);
          }
          producer.abortTransaction();
          producer.close();
        } catch (Exception e) {
          System.out.println(e.getMessage());
        }
      }

     

    위 Abort Transaction 예시 Function 을 실행하게 되면 아래와 같이 abort-topic 에 레코드들이 쌓입니다.

     

     

    Aborted Record 조회하기.

    kafka-console-consumer shell 을 통해서 abort-topic 을 조회해보도록 하겠습니다.

    조회 결과로써 아래처럼 aborted 된 메시지들이 조회됩니다.

    kafka-console-consumer --bootstrap-server localhost:9092 --topic abort-topic \
    --group test --from-beginning
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

     

    하지만 아래와 같이 read_committed Isolation-level 을 적용하게 되면 aborted 된 레코드들은 조회되지 않습니다.

    kafka-console-consumer --bootstrap-server localhost:9092 --topic abort-topic \
    --group test --from-beginning \
    --isolation-level read_committed

     

    // Empty

     

     

    여기서 사용되는 특수한 Record 가 바로 Transaction Marker 입니다.

    Transaction Marker 는 Transaction Control 의 용도로 사용되는 특수한 Marker 이구요.

    read_committed Isolation Level 로 토픽을 읽게되면 Marker 를 기준으로 Marker 이후의 레코드를 Consume 을 할지 말지를 결정합니다.

     

     

    kafka-dump-log Shell 을 통해서 Transaction Marker 를 확인할 수 있습니다.

    kafka-dump-log --files /var/lib/kafka/data/abort-topic-0/00000000000000000000.log \
    --print-data-log --skip-record-metadata

     

    baseOffset: 996 lastOffset: 996 count: 1 baseSequence: 996 lastSequence: 996 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70606 CreateTime: 1707791372315 size: 71 magic: 2 compresscodec: none crc: 3922908830 isvalid: true
     payload: 996
    baseOffset: 997 lastOffset: 997 count: 1 baseSequence: 997 lastSequence: 997 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70677 CreateTime: 1707791372316 size: 71 magic: 2 compresscodec: none crc: 2798098045 isvalid: true
     payload: 997
    baseOffset: 998 lastOffset: 998 count: 1 baseSequence: 998 lastSequence: 998 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70748 CreateTime: 1707791372316 size: 71 magic: 2 compresscodec: none crc: 3912080259 isvalid: true
     payload: 998
    baseOffset: 999 lastOffset: 999 count: 1 baseSequence: 999 lastSequence: 999 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70819 CreateTime: 1707791372317 size: 71 magic: 2 compresscodec: none crc: 2512340082 isvalid: true
     payload: 999
    baseOffset: 1000 lastOffset: 1000 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 3 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 70890 CreateTime: 1707791372322 size: 78 magic: 2 compresscodec: none crc: 712950927 isvalid: true
    
    baseOffset: 1001 lastOffset: 1001 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 4 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 70968 CreateTime: 1707791378039 size: 69 magic: 2 compresscodec: none crc: 3793188450 isvalid: true
     payload: 0
    baseOffset: 1002 lastOffset: 1002 count: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 4 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 71037 CreateTime: 1707791378063 size: 69 magic: 2 compresscodec: none crc: 2700982723 isvalid: true
     payload: 1

     

     

    위 출력에서 baseOffset 1000 에 해당하는 Record 가 Transaction Marker 로 사용된 Record 입니다.

    이를 풀어보면 아래와 같구요.

    baseOffset: 1000 
    lastOffset: 1000 
    count: 1 
    baseSequence: -1 
    lastSequence: -1 
    producerId: 0 
    producerEpoch: 3 
    partitionLeaderEpoch: 0 
    isTransactional: true 
    isControl: true 
    deleteHorizonMs: OptionalLong.empty 
    position: 70890 
    CreateTime: 1707791372322 
    size: 78 
    magic: 2 
    compresscodec: none 
    crc: 712950927 
    isvalid: true

     

     

    반응형
Designed by Tistory.