-
[Kafka] max.block.ms 알아보기Kafka 2024. 2. 18. 07:04728x90반응형
- 목차
들어가며.
이번 글에서는 Kafka Producer 설정인 max.block.ms 에 대해서 알아보겠습니다.
max.block.ms 는 Kafka Producer 가 레코드를 생성하는 과정에서 대기하는 최대 시간을 설정하는 값인데요.
이를 이해하기 위해서 몇가지 배경지식이 필요합니다.
Accumulator & Batch.
먼저 카프카 프로듀서는 데이터를 생성할 때에 레코드를 하나씩 하나씩 생성하지 않습니다.
카프카 프로듀서가 브로커에게 one by one 형식으로 레코드를 전송하지 않는 구조이구요.
대신 레코드들을 하나의 Batch 에 모은 후에 Batch 를 브로커에게 전달하게 됩니다.
"batch.size" 라는 카프카 프로듀서의 설정은 하나의 Batch 에 얼마 만큼의 레코드를 저장할지에 대한 설정입니다.
그리고 "Record Accumulator" 는 Batch 처리를 담당하는 카프카 프로듀서의 구성요소입니다.
Accumulator 가 레코드들을 배치 단위로 모으고, 적절한 시기에 브로커로 전달합니다.
브로커로 전달되는 적절한 시기는 하나의 배치의 크기가 batch.size 이상의 되는 시점을 뜻합니다.
Buffer Pool.
카프카 프로듀서는 내부적으로 Buffer Pool 을 가집니다.
하나의 버퍼에 인코딩된 key, value, timestamp 등의 레코드의 정보를 저장할 수 있구요.
이 데이터들이 쌓여서 하나의 배치를 이루고 배치가 가득 차게되면 브로커로 전송됩니다.
아래의 예시들은 여러 Library 에서 제공되는 Buffer Pool 의 사용되는 예시이구요.
free 라는 이름의 변수에 BufferPool 객체가 할당되어 사용됩니다.
// org.apache.kafka.clients.producer.internals.RecordAccumulator.class buffer = this.free.allocate(size, maxTimeToBlock); // org.apache.kafka.clients.producer.internals.Sender.class this.accumulator.deallocate(batch);
// kafka-python record_accumulator.py buf = self._free.allocate(size, max_time_to_block_ms) // kafka-python sender.py self._accumulator.deallocate(batch)
버퍼풀이라는 이름처럼 버퍼는 제한된 갯수만큼 존재합니다.
이 버퍼는 ByteBuffer 형식의 자료구조이구요.
인코딩된 레코드가 ByteBuffer 형식으로 하나의 배치에 추가됩니다.
그리고 이 배치는 batch.size 이상이 되면 Flush 되죠.
이 과정에서 버퍼 풀에서 버퍼를 획득하는 과정에서 버퍼를 획득하는데에 걸리는 최대 시간이 max.block.ms 입니다.
max.block.ms 이상이 소요된다면 예외가 발생합니다.
max.block.ms
max.block.ms 는 카프카 프로듀서가 하나의 Buffer 를 획득하기까지 대기하는 최대 시간입니다.
만약 대기 시간이 max.block.ms 를 초과하게 된다면 Exception 이 발생합니다.
예를 들어, java 의 kafka-clients 모듈에선 "BufferExhaustedException" 예외가 발생하죠.
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
하나의 KafkaRecord 를 KafkaProducer.send( KafkaRecord ) 와 같은 형식으로 send 하게 된다면,
KafkaRecord 는 하나의 버퍼에 추가됩니다.
그리고 여러 버퍼가 모여서 하나의 Batch 를 이루고,
Batch 가 batch.size 제한에 도달하면 브로커로 전송됩니다.
하지만 버퍼는 무한히 생성할 수는 없구요.
memory.buffer 설정의 값만큼 buffer 를 생성할 수 있습니다.
내부적으로 버퍼의 생성과 반환이 교차적으로 이루어집니다.
데이터의 생성 속도가 너무 빨라서 버퍼의 반환이 이루어지지 않으면 새로운 버퍼를 생성하기까지 max.block.ms 만큼 대기합니다.
그리고 대기시간이 max.block.ms 를 초과하게 된다면 예외가 발생합니다.
BufferExhaustedException 재현해보기.
max.block.ms 와 관련된 여러가지 문제상황들을 재현해보겠습니다.
Record 의 크기가 매우 큰 경우.
하나의 Record 가 1MB 인 데이터를 생성해보겠습니다.
추가적인 설정들은 아래와 같습니다.
max.block.ms 는 0.1초로 설정하여 매우 짧은 시간을 주었습니다.
그리고 최대 요청 용량은 50MB, batch.size 는 2MB, buffer.memory 는 50MB 로 설정하였습니다.
이 설정들을 해석해보면,
하나의 요청의 최대 용량은 넉넉히 50MB 로 설정하였고,
batch.size 가 2MB 이므로 1MB 의 Record 를 넉넉히 수용할 수 있도록 구성하였습니다.
그리고 buffer.memory 값을 50MB 로 넉넉히 설정하였습니다.
Properties properties = new Properties(); properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB
그리고 아래의 코드를 실행합니다.
public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB String topic = "test-topic"; String data = ""; StringBuilder appender = new StringBuilder(); while (appender.length() < 1024 * 1024 * 1) appender = appender.append("1"); // data 1MB try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties);) { data = appender.toString(); for (int i = 1; i <= 1000; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, data); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) System.out.println(e); } }); } } }
아래와 같이 max.block.ms 의 시간이 너무 짧기 때문에 예외가 발생합니다.
org.apache.kafka.clients.producer.BufferExhaustedException: Failed to allocate memory within the configured max blocking time 100 ms.
Record 의 크기가 작은 경우.
반면 Record 의 크기가 작은 경우에는 max.block.ms 의 영향을 거의 받지 않습니다.
하나의 Record 를 1KB 정도로 설정하게 된다면 max.block.ms 가 0.1 초 만큼 작더라도 데이터 생성에 무리가 없습니다.
그 이유는 버퍼를 생성하고 반환하는 사이의 시간이 매우 짧기 때문입니다.
public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB String topic = "test_topic"; String data = ""; StringBuilder appender = new StringBuilder(); while (appender.length() < 1024 * 1) appender = appender.append("1"); // data 1KB try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties);) { data = appender.toString(); for (int i = 1; i <= 100000; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, data); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) System.out.println(e); } }); } } }
마치며.
지금까지 설명한 과정들을 요약해보겠습니다.
max.block.ms 는 Kafka Clients 내부에서 사용하는 버퍼를 획득하는데에 걸리는 최대 시간입니다.
버퍼를 획득하는데에 max.block.ms 이상의 시간이 소요되면 예외가 발생합니다.
문제가 발생할 수 있는 경우는 크게 두가지입니다.
1. Record 의 크기가 너무 큰 경우.
2. max.block.ms 가 너무 작은 경우.
따라서 Record 하나의 크기가 너무 큰 경우에는 max.block.ms 을 기본값보다 늘리거나 Record 의 크기를 줄일 수 있는 방안을 생각해야합니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] Timeindex 알아보기 (0) 2024.06.01 [Kafka] Partition 알아보기 (0) 2024.05.12 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 [Kafka-Streams] Json 기반 Custom Serdes 구현하기 (0) 2024.02.17 [Kafka] Transaction Coordinator 알아보기 (0) 2024.02.07