ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number )
    Kafka/Kafka Producer 2024. 6. 25. 06:29
    반응형

    - 목차

     

    들어가며.

    Kafka Producer 는 Acks 모드로 동작을 하게 되면 데이터의 중복 생성의 위험이 존재합니다.

    카프카에서 중복 데이터 생성이 발생하는 원인은 at least once 방식을 사용하는 카프카의 데이터 생성 방식 때문입니다.

    프로듀서는 request.timeout.ms 라는 시간 내에 브로커로부터 Ack 응답을 받지 못하면 ProduceRequest 를 재시도하게 됩니다.

    따라서 네트워크적인 문제로 인해서 혹은 브로커의 결함으로 인해서 리더 브로커로부터 Ack 응답을 받지 못하면 ProduceRequest 가 재시도되어 데이터가 중복 생성될 가능성이 있습니다.

    이와 관련된 상세한 내용은 아래의 링크로 대체하도록 하겠습니다.

    이를 참고하면 어떠한 경우에 데이터 중복이 발생하는지 이해하시는데에 도움이 될 것 같네요.

     

    https://westlife0615.tistory.com/932

     

    [Kafka] request.timeout.ms 와 데이터 중복 생성 알아보기

    - 목차 들어가며.카프카 프로듀서는 request.timeout.ms 라는 설정을 가집니다.카프카 프로듀서는 request.timeout.ms 로 지정된 시간 내에 데이터를 생성하지 못하면 Retry 를 시도합니다.여기서 request.timeo

    westlife0615.tistory.com

     

    하지만 이러한 데이터의 중복 이슈를 해결하는 방안으로 enable.idempotence 설정이 존재합니다.

    이는 한국말로 멱등성이라고 표현하는데요.

    데이터의 중복 생성되는 이슈를 방지해줍니다.

    이는 일반적인 데이터베이스에서 사용되는 Primary Key, Clustered Index, Uniqueness Index 등과 유사한 성격을 지닙니다.

    즉, 카프카의 특정 레코드가 유일할 수 있도록 Constraint 를 적용할 수 있습니다.

    이 기능을 구현하기 위해서 카프카는 Producer ID, Sequence Number 등을 활용합니다.

    이어지는 내용에서 카프카 프로듀서의 멱등성에 대한 자세한 이야기를 설명하도록 하겠습니다.

     

    멱등성이란 ?

    우선 간단하게 멱등성이라는 것의 의미를 간단히 살펴보겠습니다.

    멱등성이라는 여러번 시도해도 동일한 결과를 만들어내는 구조를 의미합니다.

    MySQL 에서 동일한 Primary Key 를 가지는 데이터를 여러번 Insert 해도 해당하는 데이터의 유일함을 보장합니다.

    하지만 Primary Key, Unique Index 를 적용하지 않는다면 Insert Query 의 횟수만큼 데이터가 생성되겠죠 ?
    카프카에서도 멱등성을 적용하여 데이터의 중복 생성을 방지할 수 있습니다.

    Producer ID.

    Kafka Producer 는 enable.idempotence 설정을 ON 하게 되면, Producer ID 라는 것을 가지게 됩니다.

    Producer ID 는 Kafka Producer 가 자신을 식별하기 위해서 사용하는 ID 입니다.

    사실 프로듀서는 Client ID, Transactional ID, Producer ID 등 여러가지 ID 를 가지는데요.

    Client ID 는 프로듀서에서 설정하는 닉네임과 유사합니다.

    그리고 Producer ID 는 브로커가 Producer 를 위해서 생성해주는 ID 입니다.

    이 Producer ID 를 기반으로 카프카의 트랜잭션과 멱등성이 적용됩니다.

     

    Producer ID 는 Kafka 의 API 중에서 InitProducerId 라는 요청을 통해서 획득할 수 있습니다.

    그래서 enable.idempotence 가 활성화된 프로듀서는 반드시 InitProducerId 라는 요청을 브로커에게 전송하게 됩니다.

    아래의 이미지와 같이 Instance 0 에 해당하는 Producer 는 Txn Coordinator 라고 불리는 Kafka Broker 에게 Producer ID 를 요청하게 됩니다.

     

    출처 : https://www.responsive.dev/blog/guide-to-kafka-streams-exactly-once-transactions

     

    TCP Packet 관점에서 이를 살펴보면 아래와 같은 2개의 Packet 을 확인할 수 있습니다.

    먼저 "172.19.0.1.58412" 인 Producer kafka1.19092 인 브로커에게 InitProducerId 를 요청합니다.

    InitProducerId 의 API Key 가 22 이며, 이는 16진수로 0016 을 표현됩니다.

    따라서 아래 두 요청은 0x0030 라인에서 0016이 확인됩니다.

    첫번째 Packet 은 InitProducerId Request 그리고 두번째 Packet 은 InitProducerId Response 에 해당합니다.

     

    tcpdump -i eth0 port 19092 -X
    IP 172.19.0.1.58412 > kafka1.19092: Flags [P.], seq 119:170, ack 523, win 512, options [nop,nop,TS val 2663580818 ecr 1856057846], length 51
      0x0000:  4500 0067 28fc 4000 4006 b96a ac13 0001  E..g(.@.@..j....
      0x0010:  ac13 0003 e42c 4a94 4459 a7d0 7589 a8f1  .....,J.DY..u...
      0x0020:  8018 0200 5884 0000 0101 080a 9ec3 0492  ....X...........
      0x0030:  6ea1 31f6 0000 002f 0016 0004 0000 0003  n.1..../........
      0x0040:  0014 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
      0x0050:  656e 745f 6964 0000 ffff ffff ffff ffff  ent_id..........
      0x0060:  ffff ffff ffff 00                        .......
    
    IP kafka1.19092 > 172.19.0.1.58412: Flags [P.], seq 523:549, ack 170, win 50597, options [nop,nop,TS val 1856057849 ecr 2663580818], length 26
      0x0000:  4500 004e 5743 4000 4006 8b3c ac13 0003  E..NWC@.@..<....
      0x0010:  ac13 0001 4a94 e42c 7589 a8f1 4459 a803  ....J..,u...DY..
      0x0020:  8018 c5a5 586b 0000 0101 080a 6ea1 31f9  ....Xk......n.1.
      0x0030:  9ec3 0492 0000 0016 0000 0003 0000 0000  ................
      0x0040:  0000 0000 0000 0000 0000 1e00 0000       ..............

     

     

    InitProducerId 의 Response 는 아래와 같은 포맷을 취하며 Kafka Producer 는 InitProducerId Request 의 요청을 통해서

    Producer ID 를 가지게 됩니다.

    InitProducerId Response 를 가지게 된 이후로부터 Producer 는 데이터를 생성하는 요청에 Producer ID 를 추가합니다.

    InitProducerId Response (Version: 1) => throttle_time_ms error_code producer_id producer_epoch 
      throttle_time_ms => INT32
      error_code => INT16
      producer_id => INT64
      producer_epoch => INT16

     

    Producer Epoch.

    프로듀서의 데이터 생성에 멱등성을 적용하기 위해서 Producer Epoch 라는 데이터가 함께 사용됩니다.

    이는 기존의 Producer 가 종료되고 새롭게 브로커와 연결될 때에 Producer Epoch 는 1씩 단조증가하게 됩니다.

    즉, Producer 와 Broker 사이에 연결이 갱신된 횟수 ? 또는 InitProducerId 를 요청한 횟수라고 생각하셔도 될 것 같습니다.

    Producer Epoch 또한 InitProducerId 의 응답으로 획득할 수 있습니다.

    즉, Kafka Producer 는 InitProducerId 를 통해서 id 와 epoch 데이터를 획득합니다.

    Kafka Producer 는 데이터를 생성할 때에 Producer ID 와 Producer Epoch 를 함께 전달합니다.

     

     

    Sequence Number.

    Sequence Number 는 프로듀서가 각 레코드에 부여하는 숫자 데이터입니다.

    0부터 시작하여 Producer 레벨에서 생성되는 레코드가 단조증가하는 형식으로 Sequence Number 가 부여됩니다.

    이렇게 Producer ID, Producer Epoch, Sequence Number 를 조합하여 Record 의 Unique 함을 부여합니다.

     

    예를 들어, Producer ID 가 1, Epoch : 1, Sequence Number 가 1 인 레코드는 아무리 Retry 를 시도해도 중복 생성되지 않습니다.

    왜냐하면 이러한 조합을 가지는 레코드는 카프카 상에서 유일해야하는 제약조건을 가지기 때문입니다.

     

    kafka-dump-log.sh 를 통해서 Log Segment File 에 생성된 데이터를 분석해보면

    아래와 같이 Producer ID, Epoch, Sequence Number 를 확인할 수 있습니다.

    아래의 10개의 Record 들은 Producer ID : 31, Producer Epoch: 0, Sequence Number 는 0  ~ 9 까지의 값을 가집니다.

    이렇게 유일성을 보장함으로써 데이터 중복 생성 문제가 해결됩니다.

     

    kafka-dump-log.sh --files 00000000000000000000.log --print-data-log
    baseOffset: 352 lastOffset: 361 count: 10 baseSequence: 0 lastSequence: 9 producerId: 31 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 33792 CreateTime: 1722012315573 size: 411 magic: 2 compresscodec: none crc: 272712126 isvalid: true
    | offset: 352 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 0 headerKeys: [] payload: this is duplicated message 0
    | offset: 353 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 1 headerKeys: [] payload: this is duplicated message 1
    | offset: 354 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 2 headerKeys: [] payload: this is duplicated message 2
    | offset: 355 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 3 headerKeys: [] payload: this is duplicated message 3
    | offset: 356 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 4 headerKeys: [] payload: this is duplicated message 4
    | offset: 357 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 5 headerKeys: [] payload: this is duplicated message 5
    | offset: 358 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 6 headerKeys: [] payload: this is duplicated message 6
    | offset: 359 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 7 headerKeys: [] payload: this is duplicated message 7
    | offset: 360 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 8 headerKeys: [] payload: this is duplicated message 8
    | offset: 361 CreateTime: 1722012315573 keySize: -1 valueSize: 28 sequence: 9 headerKeys: [] payload: this is duplicated message 9

     

     

    InitProducerId 와 관련된 Kafka API 문서.

     

    https://kafka.apache.org/protocol.html#The_Messages_InitProducerId

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

     

     

    반응형
Designed by Tistory.