-
Kafka Producer Process (카프카 프로듀서 과정)Kafka 2023. 9. 9. 00:09728x90반응형
소개
kafka producer 가 kafka broker 에게 메시지를 전송하는 과정에 대한 설명을 해보겠습니다.
kafka 는 binary wire protocol 이라는 네트워크 프로토콜을 활용하여 브로커와 통신합니다.
여러가지 언어로 작성된 kafka client library 들 또한 binary wire protocol 을 구현한 결과물입니다.
효율적인 통신을 구현하기 위해서 여러가지 단계를 거치게 되는데 각 단계에 대해서 알아보겠습니다.
ProducerRecord
ProducerRecord 는 kafka producer 와 kafka broker 가 통신하기 위해서 사용되는 데이터의 기본적인 형태입니다.
ProduerRecord 의 형태는 대략 아래와 같은 구조체의 모양을 가지며, topic 부터 value 까지의 데이터로 구성됩니다.{ topic: String, partition: Integer | Null, timestamp: Long, key: String | Null, value: String }
ProducerRecord 는 크게 5가지 정보를 가집니다.
topic 은 메시지가 저장될 토픽을 의미합니다.
partition 과 key 는 토픽의 어떤 파티션에 저장되어야하는지를 결정하는 정보이구요.
timestamp 는 ProducerRecord의 생성시각 정보를 나타냅니다.
value 는 메시지의 실제 정보입니다.
여기서 주목할 점은 partition 과 key 이 Nullable 하다는 사실입니다.
왜냐하면 카프카는 메시지가 저장될 파티션을 클라이언트가 결정하는 방식을 취하기 때문입니다.
partition 에 Integer 값이 주어진다면, 토픽의 파티션이 명시되어 Sticky 한 방식으로 파티션이 결정됩니다.
반면, partition 이 Null 이고 key 에 특정 값이 설정된다면 해당 ProducerRecord 는 key 를 hashing 한 결과에 따라 파티션이 지정됩니다.
그리고 partition 과 key 둘 다 Null 인 경우는 Round-Robin 방식으로 파티션이 결정됩니다.
이렇게 ProducerRecord 가 생성된다면 카프카 통신을 위한 일차적인 데이터 준비는 완료됩니다.
Serializer
Serializer 는 카프카 메시지를 직렬화하는 kafka producer 의 요소입니다.
카프카가 사용하는 네트워크 통신인 binary wire protocol 의 규격을 준수하기 위해서 binary 데이터를 사용하게 되는데요.
몇몇 정의된 Serializer Class 들를 통해 데이터를 Byte Array 로 변환해야합니다.
직렬화되어야되는 데이터의 대상은 ProducerRecord 의 Key 와 Value 입니다.
Serializer 의 종류는 여러가지가 있습니다.- StringSerializer
- ShortSerializer
- IntegerSerializer
- LongSerializer
- DoubleSerializer
- BytesSerializer
StringSerializer 는 일반적인 Character 나 json 과 같은 텍스트 포맷 데이터들은 직렬화할 때에 사용되고
온도나 가격처럼 numeric 한 데이터가 ProducerRecord 의 값이 될땐 나머지 Short, Integer, Long, Double Serializer 가 사용됩니다.
Serializer 는 코드레벨에서 key.serializer, value.serializer 에 값을 할당하는 방식으로 진행됩니다.
java 의 경우에는 Serializer 클래스를 적용하는 경우가 많고,
python, nodejs 같은 경우는 lambda function 을 적용합니다.
// <java> Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "broker:9092,broker:9093"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
## <pytho> producer = KafkaProducer( bootstrap_servers=['broker1', 'broker2', 'broker3'], client_id='client-1', key_serializer=None, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
// nodejs const responses = await producer.send({ topic: "topic1", messages: [{ key: event.name, value: JSON.stringify({data : ""}) }] })
Partitioner
메시지가 저장될 토픽과 파티션을 결정하는 주체는 broker 가 아닌 producer 입니다.
즉, 카프카 클라이언트도 데이터의 저장에 중대한 영향력을 가집니다.
왜냐하면 데이터의 구체적인 저장소를 결정하는 것이 kafka producer 이기 때문이죠.
위에서 언급했듯이 ProducerRecord 의 partition 과 key 가 저장될 파티션을 결정합니다.
파티셔닝은 3가지 케이스가 존재합니다.
1. partition is not null.
2. key is not null.
3. both partition and key are null.
ProducerRecord 의 partition 이 명시된 경우에 지정된 파티션으로 ProducerRecord 가 저장됩니다.
예를 들어 토픽의 파티션이 3개이고 ProducerRecord 의 partition 이 1 인 경우에 해당 ProducerRecord 는 파티션 1에 저장됩니다.
ProducerRecord 의 partition 이 null 이고 특정 key 가 존재하는 경우에 해당 key 값에 따라 저장될 토픽의 파티션이 결정됩니다.
key 는 직렬화된 값이며, 그 값의 hashing 된 결과가 저장될 토픽의 파티션의 값이 됩니다.
ProducerRecord 의 partition 과 key 모두 null 인 경우는 간단히 Round-Robin 방식으로 토픽의 파티션이 결정됩니다.
Record Accumulator
위 과정에서 카프카 통신을 위한 데이터의 준비는 되었습니다.
하지만 효율적인 통신을 위해서 Compression 과 Batch 가 사용됩니다.
전송되는 데이터의 양을 줄이는 목적에서 Compression 이 필요하구요.
Batch 또한 네트워크 트래픽을 줄이는 목적이 있습니다.
Record Accumulator 는 Batch 를 구현하기 위해서 활용됩니다.
Record Accumulator 는 버퍼들의 집합인데요.
각 토픽 별 그리고 각 파티션 별로 버퍼가 존재합니다.
<출처 : https://www.linkedin.com/>
ProducerRecord 의 topic 과 partition 정보를 기반으로 Record Accumulator 의 버퍼에 ProducerRecord 가 쌓입니다.
토픽이 3개가 존재하며 각 토픽이 3개의 파티션이 있다면
총 9개의 버퍼가 Record Accumulator 에 생성되게 됩니다.
batch 는 batch.size 라는 카프카 설정에 따라 사이즈가 결정됩니다.Sender IO Thread
마지막 단계입니다.
kafka client 는 kafka broker 통신을 하기 위해서 비동기 방식을 취합니다.
kafka broker 로부터의 response 를 대기하지 않기 때문에 레코드를 효과적으로 전달할 수 있습니다.
Sender 단계서 Error Handling 과 Retry 에 대한 구체적인 구현을 할 수 있습니다.
반응형'Kafka' 카테고리의 다른 글
Kafka Replication (메시지 복제) 이해하기 (0) 2023.09.21 kafka __consumer_offsets topic 이해하기 (0) 2023.09.18 Kafka Consumer 개념 (0) 2023.09.13 kafka consumer (0) 2023.03.02 kafka producer (0) 2023.02.23