ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] ProducerInterceptors 알아보기
    Kafka 2024. 1. 5. 06:12
    728x90
    반응형

    - 목차

     

    소개.

    ProducerInterceptor 는 Kafka Producer 의 한가지 기능입니다.

    ProducerInterceptor 의 Fully Qualified Name 은 org.apache.kafka.clients.producer.ProducerInterceptor 이며,

    org.apache.kafka.clients.producer 패키지에 속합니다.

    ProducerInterceptor 의 주된 기능은 카프카 메시지가 생성되는 Lifecycle 에서 Callback 을 제공합니다.

    Callback Function 은 onSend, onAcknowledgement 두가지입니다.

     

    onSend 는 메시지가 생성되는 과정에서의 Callback 이구요.

    onAcknowledgement 는 메시지의 생성 이후 브로커의 결과를 받는 Callback 입니다.

    이어지는 글에서 ProducerInterceptors 에 대해서 자세히 알아보려고 합니다.

     

    간단하게 구현해보기.

    ProducerInterceptor 를 적용하는 방법은 크게 아래와 같습니다.

     

    1. Custom ProducerInterceptor 구현하기

    2. KafkaProducer 와 연결하기

     

    1. Custom ProducerInterceptor 구현하기

    아래 예시는 ProducerInterceptor 클래스를 구현한 LoggingProducerInterceptor 를 만들어 보았습니다.

    LoggingProducerInterceptor 클래스는 onSend, onAcknowledgement 에 대한 로그를 남기는 역할을 수행합니다.

    package com.westlife.kafka.components.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class LoggingProducerInterceptor implements ProducerInterceptor<String, String> {
      @Override
      public ProducerRecord onSend(ProducerRecord record) {
        System.out.println(String.format(
                "onSend #### topic : %s, partition : %s, key : %s, headers : %s, value : %s, timestamp : %s",
                record.topic(), record.partition(), record.key(), record.headers(), record.value(), record.timestamp()));
        return record;
      }
    
      @Override
      public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println(String.format(
                "onAcknowledgement #### topic : %s, partition : %s, offset : %s, serializedKeySize : %s, serializedValueSize : %s",
                metadata.topic(), metadata.partition(), metadata.offset(), metadata.serializedKeySize(), metadata.serializedValueSize()));
      }
    
      @Override
      public void close() {
    
      }
    
      @Override
      public void configure(Map<String, ?> configs) {
    
      }
    }

     

    2. KafkaProducer 와 연결하기 

    Custom ProducerInterceptor 와 KafkaProducer 를 연결하는 방식은 아래와 같습니다.

    Producer 의 설정 중 "interceptor.classes" 의 값으로 Custom ProducerInterceptor 의 Fully Qualified Name 을 등록하며 됩니다.

    package com.westlife.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class ProducerTest {
      public static void main (String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:29092,localhost:29092,localhost:29092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                "com.westlife.kafka.components.interceptor.LoggingProducerInterceptor");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
        String topic = "test_topic";
        String key = null;
        String value = "value";
        long counter = 0;
        while (counter < 100000) {
          ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
          producer.send(record);
          counter++;
        }
      }
    }

     

    위 코드의 실행 결과는 아래와 같습니다.

    저희가 의도한대로 onSend, onAcknowledgement Callback 이 실행됩니다.

    onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null
    onAcknowledgement #### topic : test_topic, partition : 0, offset : 92960, serializedKeySize : -1, serializedValueSize : 5
    onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null
    onAcknowledgement #### topic : test_topic, partition : 0, offset : 92961, serializedKeySize : -1, serializedValueSize : 5
    onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null
    onAcknowledgement #### topic : test_topic, partition : 0, offset : 92962, serializedKeySize : -1, serializedValueSize : 5

     

    여러 Interceptor 등록하기.

    ProducerInterceptor 는 하나만 등록할 수 있는 것은 아닙니다.

    여러 ProducerInterceptor 를 등록할 수 있고, 순서 또한 지정할 수 있습니다.

    아래와 같이 "," Comma 를 기준으로 여러 ProducerInterceptor 클리스를 지정하는 방식이구요.

    등록된 순서대로 Interceptor 의 실행 순서가 보장됩니다.

    properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                "com.westlife.kafka.components.interceptor.LoggingProducerInterceptor," +
                        "com.westlife.kafka.components.interceptor.HeaderProducerInterceptor");

     

     

     

    반응형
Designed by Tistory.