ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Producer Process (카프카 프로듀서 과정)
    Kafka 2023. 9. 9. 00:09
    728x90
    반응형

    소개

    kafka producer 가 kafka broker 에게 메시지를 전송하는 과정에 대한 설명을 해보겠습니다.
    kafka 는 binary wire protocol 이라는 네트워크 프로토콜을 활용하여 브로커와 통신합니다.
    여러가지 언어로 작성된 kafka client library 들 또한 binary wire protocol 을 구현한 결과물입니다.
     
    효율적인 통신을 구현하기 위해서 여러가지 단계를 거치게 되는데 각 단계에 대해서 알아보겠습니다.
     

    ProducerRecord

    ProducerRecord 는 kafka producer 와 kafka broker 가 통신하기 위해서 사용되는 데이터의 기본적인 형태입니다.
     
    ProduerRecord 의 형태는 대략 아래와 같은 구조체의 모양을 가지며, topic 부터 value 까지의 데이터로 구성됩니다.

    {
    	topic: String,
    	partition: Integer | Null,
    	timestamp: Long,
    	key: String | Null,
    	value: String
    }

     
    ProducerRecord 는 크게 5가지 정보를 가집니다.
    topic 은 메시지가 저장될 토픽을 의미합니다.
    partition 과 key 는 토픽의 어떤 파티션에 저장되어야하는지를 결정하는 정보이구요.
    timestamp 는 ProducerRecord의 생성시각 정보를 나타냅니다.
    value 는 메시지의 실제 정보입니다.
     
    여기서 주목할 점은 partition 과 key 이 Nullable 하다는 사실입니다.
    왜냐하면 카프카는 메시지가 저장될 파티션을 클라이언트가 결정하는 방식을 취하기 때문입니다.

     
    partition 에 Integer 값이 주어진다면, 토픽의 파티션이 명시되어 Sticky 한 방식으로 파티션이 결정됩니다.
    반면, partition 이 Null 이고 key 에 특정 값이 설정된다면 해당 ProducerRecord 는 key 를 hashing 한 결과에 따라 파티션이 지정됩니다.
    그리고 partition 과 key 둘 다 Null 인 경우는 Round-Robin 방식으로 파티션이 결정됩니다.
     
    이렇게 ProducerRecord 가 생성된다면 카프카 통신을 위한 일차적인 데이터 준비는 완료됩니다.
     

    Serializer

    Serializer 는 카프카 메시지를 직렬화하는 kafka producer 의 요소입니다.
    카프카가 사용하는 네트워크 통신인 binary wire protocol 의 규격을 준수하기 위해서 binary 데이터를 사용하게 되는데요.
    몇몇 정의된 Serializer Class 들를 통해 데이터를 Byte Array 로 변환해야합니다.
     
    직렬화되어야되는 데이터의 대상은 ProducerRecord 의 Key 와 Value 입니다.
     
    Serializer 의 종류는 여러가지가 있습니다.

    • StringSerializer
    • ShortSerializer
    • IntegerSerializer
    • LongSerializer
    • DoubleSerializer
    • BytesSerializer

    StringSerializer 는 일반적인 Character json 과 같은 텍스트 포맷 데이터들은 직렬화할 때에 사용되고
    온도나 가격처럼 numeric 한 데이터가 ProducerRecord 의 값이 될땐 나머지 Short, Integer, Long, Double Serializer 가 사용됩니다.
     
     
    Serializer 는 코드레벨에서 key.serializer, value.serializer 에 값을 할당하는 방식으로 진행됩니다.
    java 의 경우에는 Serializer 클래스를 적용하는 경우가 많고,
    python, nodejs 같은 경우는 lambda function 을 적용합니다.
     

    // <java>
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", "broker:9092,broker:9093");
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     

    ## <pytho>
    producer = KafkaProducer(
        bootstrap_servers=['broker1', 'broker2', 'broker3'],
        client_id='client-1',
        key_serializer=None,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

     

    // nodejs
    const responses = await producer.send({
          topic: "topic1",
          messages: [{
            key: event.name,
            value: JSON.stringify({data : ""})
          }]
        })

     

    Partitioner

    메시지가 저장될 토픽과 파티션을 결정하는 주체는 broker 가 아닌 producer 입니다.
    즉, 카프카 클라이언트도 데이터의 저장에 중대한 영향력을 가집니다.
    왜냐하면 데이터의 구체적인 저장소를 결정하는 것이 kafka producer 이기 때문이죠.
     
    위에서 언급했듯이 ProducerRecord 의 partition 과 key 가 저장될 파티션을 결정합니다.
    파티셔닝은 3가지 케이스가 존재합니다.
     
    1. partition is not null.
    2. key is not null.
    3. both partition and key are null.
     
    ProducerRecord 의 partition 이 명시된 경우에 지정된 파티션으로 ProducerRecord 가 저장됩니다.
    예를 들어 토픽의 파티션이 3개이고 ProducerRecord 의 partition 이 1 인 경우에 해당 ProducerRecord 는 파티션 1에 저장됩니다.
     
    ProducerRecord 의 partition 이 null 이고 특정 key 가 존재하는 경우에 해당 key 값에 따라 저장될 토픽의 파티션이 결정됩니다.
    key 는 직렬화된 값이며, 그 값의 hashing 된 결과가 저장될 토픽의 파티션의 값이 됩니다.
     
    ProducerRecord 의 partition 과 key 모두 null 인 경우는 간단히 Round-Robin 방식으로 토픽의 파티션이 결정됩니다.
     

    Record Accumulator

    위 과정에서 카프카 통신을 위한 데이터의 준비는 되었습니다.
    하지만 효율적인 통신을 위해서 Compression 과 Batch 가 사용됩니다.
    전송되는 데이터의 양을 줄이는 목적에서 Compression 이 필요하구요.
    Batch 또한 네트워크 트래픽을 줄이는 목적이 있습니다.
     
    Record Accumulator 는 Batch 를 구현하기 위해서 활용됩니다.
    Record Accumulator 는 버퍼들의 집합인데요.
    각 토픽 별 그리고 각 파티션 별로 버퍼가 존재합니다.
     

    <출처 : https://www.linkedin.com/>
     
    ProducerRecord 의 topic 과 partition 정보를 기반으로 Record Accumulator 의 버퍼에 ProducerRecord 가 쌓입니다.
    토픽이 3개가 존재하며 각 토픽이 3개의 파티션이 있다면
    총 9개의 버퍼가 Record Accumulator 에 생성되게 됩니다.
    batch 는 batch.size 라는 카프카 설정에 따라 사이즈가 결정됩니다.

     

    Sender IO Thread

    마지막 단계입니다.
    kafka client 는 kafka broker 통신을 하기 위해서 비동기 방식을 취합니다.
    kafka broker 로부터의 response 를 대기하지 않기 때문에 레코드를 효과적으로 전달할 수 있습니다.
    Sender 단계서 Error Handling 과 Retry 에 대한 구체적인 구현을 할 수 있습니다.
     

    반응형

    'Kafka' 카테고리의 다른 글

    kafka __consumer_offsets topic 이해하기  (0) 2023.09.18
    Kafka Consumer 개념  (0) 2023.09.13
    kafka consumer  (0) 2023.03.02
    kafka producer  (0) 2023.02.23
    [Kafka] Kafka Cluster 구축을 위한 docker-compose 파일  (0) 2023.02.21
Designed by Tistory.