-
[Kafka] ProducerInterceptors 알아보기Kafka 2024. 1. 5. 06:12728x90반응형
- 목차
소개.
ProducerInterceptor 는 Kafka Producer 의 한가지 기능입니다.
ProducerInterceptor 의 Fully Qualified Name 은 org.apache.kafka.clients.producer.ProducerInterceptor 이며,
org.apache.kafka.clients.producer 패키지에 속합니다.
ProducerInterceptor 의 주된 기능은 카프카 메시지가 생성되는 Lifecycle 에서 Callback 을 제공합니다.
Callback Function 은 onSend, onAcknowledgement 두가지입니다.
onSend 는 메시지가 생성되는 과정에서의 Callback 이구요.
onAcknowledgement 는 메시지의 생성 이후 브로커의 결과를 받는 Callback 입니다.
이어지는 글에서 ProducerInterceptors 에 대해서 자세히 알아보려고 합니다.
간단하게 구현해보기.
ProducerInterceptor 를 적용하는 방법은 크게 아래와 같습니다.
1. Custom ProducerInterceptor 구현하기
2. KafkaProducer 와 연결하기
1. Custom ProducerInterceptor 구현하기
아래 예시는 ProducerInterceptor 클래스를 구현한 LoggingProducerInterceptor 를 만들어 보았습니다.
LoggingProducerInterceptor 클래스는 onSend, onAcknowledgement 에 대한 로그를 남기는 역할을 수행합니다.
package com.westlife.kafka.components.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class LoggingProducerInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord onSend(ProducerRecord record) { System.out.println(String.format( "onSend #### topic : %s, partition : %s, key : %s, headers : %s, value : %s, timestamp : %s", record.topic(), record.partition(), record.key(), record.headers(), record.value(), record.timestamp())); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println(String.format( "onAcknowledgement #### topic : %s, partition : %s, offset : %s, serializedKeySize : %s, serializedValueSize : %s", metadata.topic(), metadata.partition(), metadata.offset(), metadata.serializedKeySize(), metadata.serializedValueSize())); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
2. KafkaProducer 와 연결하기
Custom ProducerInterceptor 와 KafkaProducer 를 연결하는 방식은 아래와 같습니다.
Producer 의 설정 중 "interceptor.classes" 의 값으로 Custom ProducerInterceptor 의 Fully Qualified Name 을 등록하며 됩니다.
package com.westlife.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerTest { public static void main (String[] args) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:29092,localhost:29092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.westlife.kafka.components.interceptor.LoggingProducerInterceptor"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); String topic = "test_topic"; String key = null; String value = "value"; long counter = 0; while (counter < 100000) { ProducerRecord<String, String> record = new ProducerRecord(topic, key, value); producer.send(record); counter++; } } }
위 코드의 실행 결과는 아래와 같습니다.
저희가 의도한대로 onSend, onAcknowledgement Callback 이 실행됩니다.
onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null onAcknowledgement #### topic : test_topic, partition : 0, offset : 92960, serializedKeySize : -1, serializedValueSize : 5 onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null onAcknowledgement #### topic : test_topic, partition : 0, offset : 92961, serializedKeySize : -1, serializedValueSize : 5 onSend #### topic : test_topic, partition : null, key : null, headers : RecordHeaders(headers = [], isReadOnly = false), value : value, timestamp : null onAcknowledgement #### topic : test_topic, partition : 0, offset : 92962, serializedKeySize : -1, serializedValueSize : 5
여러 Interceptor 등록하기.
ProducerInterceptor 는 하나만 등록할 수 있는 것은 아닙니다.
여러 ProducerInterceptor 를 등록할 수 있고, 순서 또한 지정할 수 있습니다.
아래와 같이 "," Comma 를 기준으로 여러 ProducerInterceptor 클리스를 지정하는 방식이구요.
등록된 순서대로 Interceptor 의 실행 순서가 보장됩니다.
properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.westlife.kafka.components.interceptor.LoggingProducerInterceptor," + "com.westlife.kafka.components.interceptor.HeaderProducerInterceptor");
반응형'Kafka' 카테고리의 다른 글
[Kafka-Connect] S3 Sink Connector 따라해보기 1 (0) 2024.01.08 [Kafka-Streams] KStream 알아보기 (0) 2024.01.06 [Kafka] Partition Ownership 알아보기 (0) 2024.01.05 [Kafka] Log Index File 알아보기 (0) 2024.01.03 [Kafka] listeners, advertised.listeners 알아보기 (2) 2024.01.01