-
[Kafka] BufferPool 알아보기 (org.apache.kafka:kafka-clients)Kafka 2024. 6. 3. 06:08728x90반응형
- 목차
들어가며.
Kafka Producer 는 BufferPool 이라는 구성 요소를 통해 데이터를 효과적으로 관리합니다.
BufferPool 은 이름에서 알 수 있듯이, Kafka Producer 가 사용할 버퍼(Buffer) 를 제공하는 핵심적인 역할을 수행합니다.
본 글에서는 Java의 org.apache.kafka:kafka-clients 모듈을 기반으로 BufferPool 의 동작 원리를 상세히 설명할 예정입니다.
Java 로 구현된 Kafka Producer 는 Java NIO 의 ByteBuffer 를 활용하여 데이터를 처리합니다.
이를 통해 Kafka 의 BufferPool 은 ByteBuffer 를 효율적으로 재활용하며 클라이언트(Producer)에게 필요한 버퍼를 제공합니다.
즉, Java NIO 의 ByteBuffer 와 시너지를 발휘하는 방식으로 동작하게 됩니다.
이어지는 내용에서 관련 역할들을 심도있게 다루어보도록 하겠습니다.
본 내용은 org.apache.kafka:kafka-clients:3.5.1 모듈의
org.apache.kafka.clients.producer.internals.BufferPool 클래스에 대한 이야기를 중점적으로 다룹니다.
BufferPool 이란 ?
Kafka Producer 는 Java NIO 의 ByteBuffer를 관리하여 데이터 처리 및 전송을 효율적으로 수행합니다.
이를 위해 BufferPool 클래스는 내부적으로 Deque<ByteBuffer> free라는 필드를 사용합니다.
public class BufferPool { private final Deque<ByteBuffer> free; public ByteBuffer allocate(int size, long maxTimeToBlockMs); public void deallocate(ByteBuffer buffer, int size); // 생략 .. }
free 라는 이름의 이 자료구조는 BufferPool 이 관리하는 ByteBuffer 의 풀(pool) 을 나타냅니다.
이를 통해 Kafka 는 필요한 ByteBuffer 를 효율적으로 할당하거나 반환할 수 있습니다.
BufferPool 은 allocate 와 deallocate 메서드를 사용하여 free 자료구조 내의 ByteBuffer를 제공하거나 해제하는 역할을 수행합니다.
이러한 설계는 메모리 재활용과 성능 최적화를 목표로 하며, ByteBuffer 관리의 핵심적인 부분을 담당합니다.
왜 ByteBuffer 를 사용할까 ?
Kafka 는 Java NIO(New I/O) 의 Channel과 Buffer 를 활용하여 IO 성능을 극대화합니다.
Java NIO 는 File IO 와 Network IO 를 포함한 다양한 데이터 처리 작업에서 효율적인 비동기 IO 동작을 지원하며,
Kafka 의 요구에 적합한 성능과 확장성을 제공합니다.
Kafka 는 Java 와 Scala 로 구성된 환경에서 운영되며,
특히 TCP 기반의 네트워크 통신을 처리하는 Kafka Client 와 Broker 는 Java NIO의 SocketChannel 을 적극적으로 사용합니다.
이러한 구조는 네트워크 통신 과정에서 불필요한 데이터 복사를 줄이고, 빠르고 효율적인 데이터 전송을 가능하게 합니다.
Kafka Client(Producer) 는 Broker 로 데이터를 전송하기 전에 메시지를 Java NIO 의 ByteBuffer에 직렬화하여 저장합니다.
이렇게 직렬화된 데이터는 batch.size 와 linger.ms 설정에 따라 적절한 전송 타이밍을 결정한 후,
ByteBuffer 에 기록된 데이터를 그대로 전송합니다.
이 방식은 데이터를 한 번만 기록하고 필요 시 전송하기 때문에 불필요한 복사를 줄이고 성능을 최적화합니다.
결과적으로 ByteBuffer 는 Kafka 의 고성능 메시징 처리에서 중요한 역할을 합니다.
특히, Kafka Producer는 Record 를 개별적으로 (Batch 단위가 아닌) Broker에 전송하지 않습니다.
대신, 네트워크 통신은 Batch 단위로 수행되며, 이 과정에서 BufferPool이 제공하는 ByteBuffer에 Record들을 순차적으로 저장합니다.
allocate 함수 살펴보기.
BufferPool 의 allocate 함수에 대해서 살펴보겠습니다.
allocate 함수의 시그니처는 아래와 같습니다.
ByteBuffer allocate(int size, long maxTimeToBlockMs)
allocate 함수는 2개의 Input 을 필요로 합니다.
- int size 는 일반적으로 Kafka Producer 의 설정값인 batch.size 에 해당합니다.
- long maxTimeToBlockMs 두번째 인자인 maxTimeToBlockMs 역시 Kafka Producer 설정인 max.block.ms 에 해당합니다.
그리고 allocate 함수는 ByteBuffer 를 반환합니다.
단순히 시그니처만 볼 때에는 ByteBuffer 를 생성하여 반환하는 함수로 보입니다.
이제 내부를 확인해볼까요 ?
아래 부분은 allocate 함수의 시작 부분입니다.
size 는 batch.size 에 해당하는 Kafka Producer 설정이며, 이는 16KB 라는 Default Value 를 가집니다.
그리고 this.totalMemory 는 buffer.memory 에 해당하는 Kafka Producer 의 설정입니다.
buffer.memory 의 Default Value 는 32MB 입니다.
그래서 이론적으로는 최대 2048 개의 Buffer 를 만들 수 있습니다.
if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");
ReentrantLock Mutex.
또한 BufferPool.allocate 함수의 중요한 점은 ReentrantLock 을 통해서 Mutex 동기화를 제공합니다.
사실 allocate 와 deallocate 는 Mutual Exclusive 동기화 방식을 사용하여 버퍼의 제공과 해제를 동기화시켜야 합니다.
흔히 Producer - Consumer 관계의 동기화를 구현하는 이치와 동일하다고 생각하시면 됩니다.
그래서 allocate 함수는 아래와 같이 동기화 로직이 구현되어 있습니다.
allocate 함수는 한 시점의 1개의 Thread 만이 ByteBuffer 를 획득할 수 있도록 구성되어 있습니다.
this.lock = new ReentrantLock(); ByteBuffer allocate (int size, long maxTimeToBlockMs) { this.lock.lock(); // 생락... finally { this.lock.unlock() } }
근데 사실상 KafkaProducer.send (Record) 와 같은 코드는 Main Thread 에서 싱글 쓰레드 방식으로 동작하는 경우가 대부분입니다.
따라서 BufferPool 의 allocate 함수 또한 Main Thread 에서 동작하는게 일반적이지만 이러한 방식으로 MultiThreading 동기화가 적용되어 있습니다.
Buffer Pool 에 Buffer 가 존재할 때의 ByteBuffer 제공.
아래 코드는 Deque<ByteBuffer> 에 저장된 ( 또는 풀링된 ) ByteBuffer 를 제공하는 로직입니다.
poolableSize 는 일반적으로 batch.size 설정 값으로, 16KB로 설정되는 경우가 많습니다.
사용자가 요청한 Buffer 의 크기(size)가 풀링 중인 poolableSize 와 동일한 경우, BufferPool은 Deque에서 관리 중인 버퍼를 꺼내어 제공합니다.
if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst();
1. 크거나 작은 버퍼를 요구하거나
2. 버퍼풀의 버퍼가 비어있는 경우에는
풀링 중인 버퍼를 제공하지 않습니다.
아래의 코드는 RecordAccumulator.java 의 코드입니다.
free 라는 이름의 BufferPool 의 allocate 함수를 호출하는 예시이며,
일반적으로 batch.size 또는 그 이상의 값이 allocate 의 첫번째 size 인자로 사용됩니다.
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); buffer = free.allocate(size, maxTimeToBlock);
Buffer Pool 이 비어있을 때 ( Empty ), ByteBuffer 제공.
BufferPool 이 비어있는 상황에서 BufferPool 에게 Buffer 를 요청한다면,
BufferPool 은 새로운 Buffer 를 생성해야 합니다.
아래의 allocate 코드는 Empty Buffer Pool 상황에서의 동작을 설명합니다.
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { // 생략 .. int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { freeUp(size); this.nonPooledAvailableMemory -= size; } // 생략 .. if (buffer == null) return safeAllocateByteBuffer(size); // 생략 .. } private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { ByteBuffer buffer = allocateByteBuffer(size); error = false; return buffer; } finally { if (error) { this.lock.lock(); try { this.nonPooledAvailableMemory += size; if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { this.lock.unlock(); } } } }
this.nonPooledAvailableMemory 는 버퍼풀의 여유 메모리 값을 의미합니다.
buffer.memory 의 기본 설정값인 32MB 가 this.nonPooledAvailableMemory 의 최대값이 됩니다.
그리고 batch.size 의 기본 설정값인 16KB 만큼 버퍼가 생성된다면 this.nonPooledAvailableMemory 가 값이 줄어들게 됩니다.
그래서 nonPooledAvailableMemory 은 버퍼풀에 제공될 수 있는 여유 메모리, freeListSize 는 버퍼풀이 풀링 중인 버퍼의 총 사이즈 합이 됩니다.
좀 정리를 하자면, 아래 그림처럼 표현할 수 있을 것 같은데요.
nonPooledAvailableMemory 의 여유 공간만큼 BufferPool 은 버퍼을 만들어 버퍼를 풀링할 수 있습니다.
그리고 풀링 중인 버퍼를 사용자에게 제공하죠.
다시 코드로 돌아가서 "this.nonPooledAvailableMemory + freeListSize > size" 의미는 사용자가 요구하는 버퍼 사이즈인 size 보다 버퍼풀의 메모리 상황이 여유롭다라고 해석하시면 됩니다.
따라서 이러한 경우에는 ( BufferPool 이 비어있고 & 메모리 상황이 여유롭다면 ) 새로운 ByteBuffer 를 생성합니다.
Buffer Pool 의 메모리 상황이 여유롭지 않을 때.
BufferPool 이 특히 신경을 기울인 부분은 메모리가 부족한 상황에서도 사용자에게 버퍼를 제공해야 하는 경우입니다.
Kafka 의 BufferPool 은 메모리가 부족한 상황에서도 사용자에게 버퍼를 제공하기 위해 ReentrantLock과 Condition 동기화 도구를 사용합니다.
다음은 allocate 함수의 복잡한 코드를 간소화한 코드입니다.
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { // 생략 .. Condition moreMemory = this.lock.newCondition(); this.waiters.addLast(moreMemory); // 생략 .. waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); // 생략 .. buffer = this.free.pollFirst(); // 생략 .. }
BufferPool 이 사용자에게 버퍼를 제공할 수 없는 상황이 되면, deallocate 함수가 실행되어 버퍼가 반환될 때까지 기다려야 합니다.
BufferPool 은 deallocate 를 통해 반환된 버퍼만을 다시 제공할 수 있기 때문입니다.
이 동작과 관련된 Kafka Producer 설정이 바로 max.block.ms 입니다.
max.block.ms 는 BufferPool 의 메모리 상황이 여유롭지 않을 때, 사용자가 버퍼를 획득하기까지 대기할 수 있는 최대 시간을 지정합니다.allocate 함수는 다음과 같은 방식으로 동작합니다:
- moreMemory 라는 Condition 객체를 생성하고, 이를 waiters 대기열에 추가합니다.
- moreMemory.await 를 호출하여, 남은 시간(remainingTimeToBlockNs) 동안 메모리가 반환되기를 기다립니다.
- 대기 중인 Condition은 deallocate 함수 내부에서 호출된 moreMemory.signal 에 의해 깨어납니다.
아래는 deallocate 함수의 코드입니다:
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
이러한 방식으로 allocate 함수는 당장 버퍼를 제공할 수 있는 상태에서 deallocate 와 협력하여 동작하게 됩니다.
deallocate 함수 살펴보기.
deallocate 함수는 Kafka 의 BufferPool에서 사용된 ByteBuffer를 반환하고, 대기 중인 요청에 신호를 보내는 역할을 합니다.
이는 BufferPool 이 제한된 메모리 자원을 효율적으로 관리하고 재사용성을 극대화하기 위해 설계된 중요한 메커니즘입니다.
함수의 정의는 아래와 같습니다.
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
- 버퍼 반환:
사용이 완료된 ByteBuffer 를 BufferPool로 반환하여 메모리를 재활용할 수 있도록 합니다.
- 메모리 상태 업데이트:
- 재사용 가능한 버퍼는 BufferPool의 Deque(free)에 저장됩니다.
- 풀링되지 않은 메모리는 nonPooledAvailableMemory 변수에 추가됩니다.
- 대기 중인 요청 처리:
BufferPool에서 메모리를 기다리고 있는 스레드가 있다면, Condition.signal을 호출하여 대기 상태를 해제하고 버퍼 할당을 진행할 수 있도록 합니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] Adaptive Partitioning 코드로 살펴보는 내부 동작 원리 ( BuiltInPartitioner ) (0) 2024.06.09 [Kafka] Timeindex 알아보기 (0) 2024.06.01 [Kafka] Partition 알아보기 (0) 2024.05.12 [Kafka] max.block.ms 알아보기 (0) 2024.02.18 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18