ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] max.block.ms 알아보기
    Kafka 2024. 2. 18. 07:04
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 Kafka Producer 설정인 max.block.ms 에 대해서 알아보겠습니다.

    max.block.ms 는 Kafka Producer 가 레코드를 생성하는 과정에서 대기하는 최대 시간을 설정하는 값인데요.

    이를 이해하기 위해서 몇가지 배경지식이 필요합니다.

     

    Accumulator & Batch.

    출처 : https://blog.developer.adobe.com/exploring-kafka-producers-internals-37411b647d0f

     

    먼저 카프카 프로듀서는 데이터를 생성할 때에 레코드를 하나씩 하나씩 생성하지 않습니다.

    카프카 프로듀서가 브로커에게 one by one 형식으로 레코드를 전송하지 않는 구조이구요.

    대신 레코드들을 하나의 Batch 에 모은 후에 Batch 를 브로커에게 전달하게 됩니다.

    "batch.size" 라는 카프카 프로듀서의 설정은 하나의 Batch 에 얼마 만큼의 레코드를 저장할지에 대한 설정입니다.

     

    그리고 "Record Accumulator"Batch 처리를 담당하는 카프카 프로듀서의 구성요소입니다.

    Accumulator 가 레코드들을 배치 단위로 모으고, 적절한 시기에 브로커로 전달합니다.

    브로커로 전달되는 적절한 시기는 하나의 배치의 크기가 batch.size 이상의 되는 시점을 뜻합니다.

     

    Buffer Pool.

    카프카 프로듀서는 내부적으로 Buffer Pool 을 가집니다.

    하나의 버퍼에 인코딩된 key, value, timestamp 등의 레코드의 정보를 저장할 수 있구요.

    이 데이터들이 쌓여서 하나의 배치를 이루고 배치가 가득 차게되면 브로커로 전송됩니다.

     

    아래의 예시들은 여러 Library 에서 제공되는 Buffer Pool 의 사용되는 예시이구요.

    free 라는 이름의 변수에 BufferPool 객체가 할당되어 사용됩니다.

    // org.apache.kafka.clients.producer.internals.RecordAccumulator.class
    buffer = this.free.allocate(size, maxTimeToBlock);
    
    // org.apache.kafka.clients.producer.internals.Sender.class
    this.accumulator.deallocate(batch);
    // kafka-python record_accumulator.py
    buf = self._free.allocate(size, max_time_to_block_ms)
    
    // kafka-python sender.py
    self._accumulator.deallocate(batch)

     

     

    버퍼풀이라는 이름처럼 버퍼는 제한된 갯수만큼 존재합니다.

    이 버퍼는 ByteBuffer 형식의 자료구조이구요.

    인코딩된 레코드가 ByteBuffer 형식으로 하나의 배치에 추가됩니다.

    그리고 이 배치는 batch.size 이상이 되면 Flush 되죠.

     

    이 과정에서 버퍼 풀에서 버퍼를 획득하는 과정에서 버퍼를 획득하는데에 걸리는 최대 시간이 max.block.ms 입니다.

    max.block.ms 이상이 소요된다면 예외가 발생합니다.

     

     

    max.block.ms

    max.block.ms 는 카프카 프로듀서가 하나의 Buffer 를 획득하기까지 대기하는 최대 시간입니다.

    만약 대기 시간이 max.block.ms 를 초과하게 된다면 Exception 이 발생합니다.

    예를 들어, java 의 kafka-clients 모듈에선 "BufferExhaustedException" 예외가 발생하죠.

    throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

     

    하나의 KafkaRecord 를 KafkaProducer.send( KafkaRecord ) 와 같은 형식으로 send 하게 된다면,

    KafkaRecord 는 하나의 버퍼에 추가됩니다.

    그리고 여러 버퍼가 모여서 하나의 Batch 를 이루고,

    Batch 가 batch.size 제한에 도달하면 브로커로 전송됩니다.

    하지만 버퍼는 무한히 생성할 수는 없구요.

    memory.buffer 설정의 값만큼 buffer 를 생성할 수 있습니다.

     

    내부적으로 버퍼의 생성과 반환이 교차적으로 이루어집니다.

    데이터의 생성 속도가 너무 빨라서 버퍼의 반환이 이루어지지 않으면 새로운 버퍼를 생성하기까지 max.block.ms 만큼 대기합니다.

    그리고 대기시간이 max.block.ms 를 초과하게 된다면 예외가 발생합니다.

     

    BufferExhaustedException 재현해보기.

    max.block.ms 와 관련된 여러가지 문제상황들을 재현해보겠습니다.

     

    Record 의 크기가 매우 큰 경우.

    하나의 Record 가 1MB 인 데이터를 생성해보겠습니다.

    추가적인 설정들은 아래와 같습니다.

    max.block.ms 는 0.1초로 설정하여 매우 짧은 시간을 주었습니다.

    그리고 최대 요청 용량은 50MB, batch.size 는 2MB, buffer.memory 는 50MB 로 설정하였습니다.

    이 설정들을 해석해보면,

    하나의 요청의 최대 용량은 넉넉히 50MB 로 설정하였고,

    batch.size 가 2MB 이므로 1MB 의 Record 를 넉넉히 수용할 수 있도록 구성하였습니다.

    그리고 buffer.memory 값을 50MB 로 넉넉히 설정하였습니다.

    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
    properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB
    properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB
    properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB

     

    그리고 아래의 코드를 실행합니다.

    public static void main(String[] args) {
    
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB
        properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB
        String topic = "test-topic";
    
        String data = "";
        StringBuilder appender = new StringBuilder();
        while (appender.length() < 1024 * 1024 * 1) appender = appender.append("1");
        // data 1MB
    
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties);) {
    
            data = appender.toString();
            for (int i = 1; i <= 1000; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, data);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) System.out.println(e);
                    }
                });
            }
        }
    }

     

    아래와 같이 max.block.ms 의 시간이 너무 짧기 때문에 예외가 발생합니다.

    org.apache.kafka.clients.producer.BufferExhaustedException: Failed to allocate memory within the configured max blocking time 100 ms.

     

    Record 의 크기가 작은 경우.

    반면 Record 의 크기가 작은 경우에는 max.block.ms 의 영향을 거의 받지 않습니다.

    하나의 Record 를 1KB 정도로 설정하게 된다면 max.block.ms 가 0.1 초 만큼 작더라도 데이터 생성에 무리가 없습니다.

    그 이유는 버퍼를 생성하고 반환하는 사이의 시간이 매우 짧기 때문입니다.

     

    public static void main(String[] args) {
    
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "" + 1024 * 1024 * 2); // 2MB
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "" + 1024 * 1024 * 50); // 50MB
        properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1024 * 1024 * 50); // 50MB
        String topic = "test_topic";
    
        String data = "";
        StringBuilder appender = new StringBuilder();
        while (appender.length() < 1024 * 1) appender = appender.append("1");
        // data 1KB
    
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties);) {
    
            data = appender.toString();
            for (int i = 1; i <= 100000; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, data);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) System.out.println(e);
                    }
                });
            }
        }
    }

     

     

    마치며.

    지금까지 설명한 과정들을 요약해보겠습니다.

    max.block.ms 는 Kafka Clients 내부에서 사용하는 버퍼를 획득하는데에 걸리는 최대 시간입니다.

    버퍼를 획득하는데에 max.block.ms 이상의 시간이 소요되면 예외가 발생합니다.

    문제가 발생할 수 있는 경우는 크게 두가지입니다.

     

    1. Record 의 크기가 너무 큰 경우.

    2. max.block.ms 가 너무 작은 경우.

     

    따라서 Record 하나의 크기가 너무 큰 경우에는 max.block.ms 을 기본값보다 늘리거나 Record 의 크기를 줄일 수 있는 방안을 생각해야합니다.

     

    반응형
Designed by Tistory.