ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] BufferPool 알아보기 (org.apache.kafka:kafka-clients)
    Kafka 2024. 6. 3. 06:08
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Kafka ProducerBufferPool 이라는 구성 요소를 통해 데이터를 효과적으로 관리합니다.

    BufferPool 은 이름에서 알 수 있듯이, Kafka Producer 가 사용할 버퍼(Buffer) 를 제공하는 핵심적인 역할을 수행합니다.

    본 글에서는 Java의 org.apache.kafka:kafka-clients 모듈을 기반으로 BufferPool 의 동작 원리를 상세히 설명할 예정입니다.

    Java 로 구현된 Kafka ProducerJava NIO ByteBuffer 를 활용하여 데이터를 처리합니다.

    이를 통해 Kafka 의 BufferPool 은 ByteBuffer 를 효율적으로 재활용하며 클라이언트(Producer)에게 필요한 버퍼를 제공합니다.

    즉, Java NIO 의 ByteBuffer 와 시너지를 발휘하는 방식으로 동작하게 됩니다.

    이어지는 내용에서 관련 역할들을 심도있게 다루어보도록 하겠습니다.

     

    본 내용은 org.apache.kafka:kafka-clients:3.5.1 모듈의

    org.apache.kafka.clients.producer.internals.BufferPool 클래스에 대한 이야기를 중점적으로 다룹니다.

     

    BufferPool 이란 ?

    Kafka Producer 는 Java NIO 의 ByteBuffer를 관리하여 데이터 처리 및 전송을 효율적으로 수행합니다.

    이를 위해 BufferPool 클래스는 내부적으로 Deque<ByteBuffer> free라는 필드를 사용합니다.

    public class BufferPool {
        private final Deque<ByteBuffer> free;
        public ByteBuffer allocate(int size, long maxTimeToBlockMs);
        public void deallocate(ByteBuffer buffer, int size);
        // 생략 ..
    }

     

    free 라는 이름의 이 자료구조는 BufferPool 이 관리하는 ByteBuffer 의 풀(pool) 을 나타냅니다.

    이를 통해 Kafka 는 필요한 ByteBuffer 를 효율적으로 할당하거나 반환할 수 있습니다.

     

    BufferPoolallocatedeallocate 메서드를 사용하여 free 자료구조 내의 ByteBuffer를 제공하거나 해제하는 역할을 수행합니다.

    이러한 설계는 메모리 재활용과 성능 최적화를 목표로 하며, ByteBuffer 관리의 핵심적인 부분을 담당합니다.

     

    왜 ByteBuffer 를 사용할까 ?

    KafkaJava NIO(New I/O)ChannelBuffer 를 활용하여 IO 성능을 극대화합니다.

    Java NIOFile IO Network IO 를 포함한 다양한 데이터 처리 작업에서 효율적인 비동기 IO 동작을 지원하며,

    Kafka 의 요구에 적합한 성능과 확장성을 제공합니다.

    Kafka 는 Java 와 Scala 로 구성된 환경에서 운영되며,

    특히 TCP 기반의 네트워크 통신을 처리하는 Kafka ClientBrokerJava NIO의 SocketChannel 을 적극적으로 사용합니다.

    이러한 구조는 네트워크 통신 과정에서 불필요한 데이터 복사를 줄이고, 빠르고 효율적인 데이터 전송을 가능하게 합니다.

     

    Kafka Client(Producer)Broker 로 데이터를 전송하기 전에 메시지를 Java NIO 의 ByteBuffer에 직렬화하여 저장합니다.

    이렇게 직렬화된 데이터는 batch.sizelinger.ms 설정에 따라 적절한 전송 타이밍을 결정한 후,

    ByteBuffer 에 기록된 데이터를 그대로 전송합니다.

    이 방식은 데이터를 한 번만 기록하고 필요 시 전송하기 때문에 불필요한 복사를 줄이고 성능을 최적화합니다.

    결과적으로 ByteBuffer 는 Kafka 의 고성능 메시징 처리에서 중요한 역할을 합니다.

     

    특히, Kafka Producer는 Record 를 개별적으로 (Batch 단위가 아닌) Broker에 전송하지 않습니다.

    대신, 네트워크 통신은 Batch 단위로 수행되며, 이 과정에서 BufferPool이 제공하는 ByteBuffer에 Record들을 순차적으로 저장합니다.

     

    allocate 함수 살펴보기.

    BufferPool 의 allocate 함수에 대해서 살펴보겠습니다.

    allocate 함수의 시그니처는 아래와 같습니다.

    ByteBuffer allocate(int size, long maxTimeToBlockMs)

     

    allocate 함수는 2개의 Input 을 필요로 합니다.

    • int size 는 일반적으로 Kafka Producer 의 설정값인 batch.size 에 해당합니다.
    • long maxTimeToBlockMs 두번째 인자인 maxTimeToBlockMs 역시 Kafka Producer 설정인 max.block.ms 에 해당합니다.

     

    그리고 allocate 함수는 ByteBuffer 를 반환합니다.

    단순히 시그니처만 볼 때에는 ByteBuffer 를 생성하여 반환하는 함수로 보입니다.

     

    이제 내부를 확인해볼까요 ?

    아래 부분은 allocate 함수의 시작 부분입니다.

    sizebatch.size 에 해당하는 Kafka Producer 설정이며, 이는 16KB 라는 Default Value 를 가집니다.

    그리고 this.totalMemorybuffer.memory 에 해당하는 Kafka Producer 의 설정입니다.

    buffer.memory 의 Default Value 는 32MB 입니다.

    그래서 이론적으로는 최대 2048 개의 Buffer 를 만들 수 있습니다.

    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

     

     

    ReentrantLock Mutex.

    또한 BufferPool.allocate 함수의 중요한 점은 ReentrantLock 을 통해서 Mutex 동기화를 제공합니다.

    사실 allocate 와 deallocate 는 Mutual Exclusive 동기화 방식을 사용하여 버퍼의 제공과 해제를 동기화시켜야 합니다.

    흔히 Producer - Consumer 관계의 동기화를 구현하는 이치와 동일하다고 생각하시면 됩니다.

     

    그래서 allocate 함수는 아래와 같이 동기화 로직이 구현되어 있습니다.

    allocate 함수는 한 시점의 1개의 Thread 만이 ByteBuffer 를 획득할 수 있도록 구성되어 있습니다.

    this.lock = new ReentrantLock();
    
    ByteBuffer allocate (int size, long maxTimeToBlockMs) {
    	
        this.lock.lock();
        
        // 생락...
        
        finally {
        	this.lock.unlock()
        }
    }

     

    근데 사실상 KafkaProducer.send (Record) 와 같은 코드는 Main Thread 에서 싱글 쓰레드 방식으로 동작하는 경우가 대부분입니다.

    따라서 BufferPool 의 allocate 함수 또한 Main Thread 에서 동작하는게 일반적이지만 이러한 방식으로 MultiThreading 동기화가 적용되어 있습니다.

     

    Buffer Pool 에 Buffer 가 존재할 때의 ByteBuffer 제공.

    아래 코드는 Deque<ByteBuffer> 에 저장된 ( 또는 풀링된 ) ByteBuffer 를 제공하는 로직입니다.

    poolableSize 는 일반적으로 batch.size 설정 값으로, 16KB로 설정되는 경우가 많습니다.

    사용자가 요청한 Buffer 의 크기(size)가 풀링 중인 poolableSize 와 동일한 경우, BufferPool은 Deque에서 관리 중인 버퍼를 꺼내어 제공합니다.

    if (size == poolableSize && !this.free.isEmpty())
        return this.free.pollFirst();

     

    1. 크거나 작은 버퍼를 요구하거나

    2. 버퍼풀의 버퍼가 비어있는 경우에는

    풀링 중인 버퍼를 제공하지 않습니다.

     

    아래의 코드는 RecordAccumulator.java 의 코드입니다.

    free 라는 이름의 BufferPool 의 allocate 함수를 호출하는 예시이며,

    일반적으로 batch.size 또는 그 이상의 값이 allocate 의 첫번째 size 인자로 사용됩니다.

    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
    buffer = free.allocate(size, maxTimeToBlock);

     

     

    Buffer Pool 이 비어있을 때 ( Empty ), ByteBuffer 제공.

    BufferPool 이 비어있는 상황에서 BufferPool 에게 Buffer 를 요청한다면,

    BufferPool 은 새로운 Buffer 를 생성해야 합니다.

    아래의 allocate 코드는 Empty Buffer Pool 상황에서의 동작을 설명합니다.

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    	
        // 생략 .. 
        
        int freeListSize = freeSize() * this.poolableSize;
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
        }
    
        // 생략 .. 
    
        if (buffer == null)
            return safeAllocateByteBuffer(size);
    
        // 생략 .. 
        
    }
    
    private ByteBuffer safeAllocateByteBuffer(int size) {
        boolean error = true;
        try {
            ByteBuffer buffer = allocateByteBuffer(size);
            error = false;
            return buffer;
        } finally {
            if (error) {
                this.lock.lock();
                try {
                    this.nonPooledAvailableMemory += size;
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }

     

    this.nonPooledAvailableMemory버퍼풀의 여유 메모리 값을 의미합니다.

    buffer.memory 의 기본 설정값인 32MBthis.nonPooledAvailableMemory최대값이 됩니다.

    그리고 batch.size 의 기본 설정값인 16KB 만큼 버퍼가 생성된다면 this.nonPooledAvailableMemory 가 값이 줄어들게 됩니다.

    그래서 nonPooledAvailableMemory 은 버퍼풀에 제공될 수 있는 여유 메모리, freeListSize 는 버퍼풀이 풀링 중인 버퍼의 총 사이즈 합이 됩니다.

     

    좀 정리를 하자면, 아래 그림처럼 표현할 수 있을 것 같은데요.

    nonPooledAvailableMemory 의 여유 공간만큼 BufferPool 은 버퍼을 만들어 버퍼를 풀링할 수 있습니다.

    그리고 풀링 중인 버퍼를 사용자에게 제공하죠.

     

     

    다시 코드로 돌아가서 "this.nonPooledAvailableMemory + freeListSize > size" 의미는 사용자가 요구하는 버퍼 사이즈인 size 보다 버퍼풀의 메모리 상황이 여유롭다라고 해석하시면 됩니다.

    따라서 이러한 경우에는 ( BufferPool 이 비어있고 & 메모리 상황이 여유롭다면 ) 새로운 ByteBuffer 를 생성합니다.

     

     

    Buffer Pool 의 메모리 상황이 여유롭지 않을 때.

    BufferPool 이 특히 신경을 기울인 부분은 메모리가 부족한 상황에서도 사용자에게 버퍼를 제공해야 하는 경우입니다.

    Kafka 의 BufferPool 은 메모리가 부족한 상황에서도 사용자에게 버퍼를 제공하기 위해 ReentrantLockCondition 동기화 도구를 사용합니다.

     

    다음은 allocate 함수의 복잡한 코드를 간소화한 코드입니다.

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    
        // 생략 .. 
    
        Condition moreMemory = this.lock.newCondition();
        this.waiters.addLast(moreMemory);
    
        // 생략 .. 
    
        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    
        // 생략 .. 
    
        buffer = this.free.pollFirst();
    
        // 생략 .. 
    
    }

     

     

    BufferPool 이 사용자에게 버퍼를 제공할 수 없는 상황이 되면, deallocate 함수가 실행되어 버퍼가 반환될 때까지 기다려야 합니다.

    BufferPooldeallocate 를 통해 반환된 버퍼만을 다시 제공할 수 있기 때문입니다.

    이 동작과 관련된 Kafka Producer 설정이 바로 max.block.ms 입니다.
    max.block.msBufferPool 의 메모리 상황이 여유롭지 않을 때, 사용자가 버퍼를 획득하기까지 대기할 수 있는 최대 시간을 지정합니다.

    allocate 함수는 다음과 같은 방식으로 동작합니다:

    1. moreMemory 라는 Condition 객체를 생성하고, 이를 waiters 대기열에 추가합니다.
    2. moreMemory.await 를 호출하여, 남은 시간(remainingTimeToBlockNs) 동안 메모리가 반환되기를 기다립니다.
    3. 대기 중인 Condition은 deallocate 함수 내부에서 호출된 moreMemory.signal 에 의해 깨어납니다.

    아래는 deallocate 함수의 코드입니다:

        public void deallocate(ByteBuffer buffer, int size) {
            lock.lock();
            try {
                if (size == this.poolableSize && size == buffer.capacity()) {
                    buffer.clear();
                    this.free.add(buffer);
                } else {
                    this.nonPooledAvailableMemory += size;
                }
                Condition moreMem = this.waiters.peekFirst();
                if (moreMem != null)
                    moreMem.signal();
            } finally {
                lock.unlock();
            }
        }

     

     

    이러한 방식으로 allocate 함수는 당장 버퍼를 제공할 수 있는 상태에서 deallocate 와 협력하여 동작하게 됩니다.

     

     

    deallocate 함수 살펴보기.

    deallocate 함수는 Kafka 의 BufferPool에서 사용된 ByteBuffer를 반환하고, 대기 중인 요청에 신호를 보내는 역할을 합니다.

    이는 BufferPool 이 제한된 메모리 자원을 효율적으로 관리하고 재사용성을 극대화하기 위해 설계된 중요한 메커니즘입니다.

     

    함수의 정의는 아래와 같습니다.

     

    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.nonPooledAvailableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

     

     

    • 버퍼 반환:
      사용이 완료된 ByteBuffer 를 BufferPool로 반환하여 메모리를 재활용할 수 있도록 합니다.

     

    • 메모리 상태 업데이트:
      • 재사용 가능한 버퍼는 BufferPool의 Deque(free)에 저장됩니다.
      • 풀링되지 않은 메모리는 nonPooledAvailableMemory 변수에 추가됩니다.

     

    • 대기 중인 요청 처리:
      BufferPool에서 메모리를 기다리고 있는 스레드가 있다면, Condition.signal을 호출하여 대기 상태를 해제하고 버퍼 할당을 진행할 수 있도록 합니다.

     

     

    반응형
Designed by Tistory.