-
[Kafka] Adaptive Partitioning 코드로 살펴보는 내부 동작 원리 ( BuiltInPartitioner )Kafka 2024. 6. 9. 08:16728x90반응형
- 목차
들어가며.
Kafka의 Adaptive Partitioning 은 Kafka 프로듀서가 데이터를 효율적으로 분배하기 위해 동적으로 파티션 할당 전략을 조정하는 기법입니다.
3.x.x 대 버전의 Kafka Client Module 에서는 기본적으로 채택하고 있는 방식이며,
이는 Hash Partitioning 과 Sticky Partitioning 을 동시에 제공합니다.
Adaptive Partitioning 의 구체적인 전략은 아래와 같습니다.
1. Record 에 Key 가 존재하지 않을 시에는 Sticky Partitioning 기법을 사용합니다.
2. Record 에 Key 가 존재한다면 Hash Partitioning 기법은 제공합니다.
이를 이미지로 표현하면 아래와 같습니다.
먼저 Key 가 존재하지 않는 Record 들은 자체적인 Hash Partitioning 에 의해서 적절한 Partition 으로 분배됩니다.
Sticky Partitioning 은 Hash Partitioning 처럼 Deterministic 하게 동작하지 않습니다.
현재 Kafka Client 의 Buffer 의 상황에 맞게 특정 Partition 이 결정되는 구조입니다.
Adaptive Partitioning 은 위처럼 2가지 Partitioning 전략을 사용하게 됩니다.
그 기준은 Record 가 Key 를 가지느냐 아니냐에 따라서 결정됩니다.
이어지는 글에서 좀 더 상세한 Adaptive Partitioning 에 대해서 다루어보도록 하겠습니다.
Kafka Clients 모듈에서 Adaptive Partitioning 동작 원리 살펴보기.
Kafka Clients 모듈에서 Adaptive Partitioning의 동작 방식을 분석하고자 합니다.
본 문서는 Java Kafka Clients 모듈인 org.apache.kafka:kafka-clients:3.5.1 버전을 기준으로 작성되었습니다.(https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.5.1)
1. KafkaProducer.java
KafkaProducer의 KafkaProducer 생성자에서는 Partitioner를 생성하는 로직을 확인할 수 있습니다.
해당 생성자에서는 아래와 같이 this.partitioner 필드를 초기화합니다.this.partitioner = config.getConfiguredInstance( ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
this.partitioner는 KafkaProducer에서 기본적으로 사용할 Partitioner를 결정하는 역할을 합니다.
이를 통해 메시지가 특정 파티션으로 라우팅되는 방식을 정의합니다.Partitioner는 크게 두 가지 유형으로 나뉩니다:
- 내장된(Built-in) Partitioner를 사용할 것인지,
- 사용자가 직접 구현한 Custom Partitioner를 사용할 것인지 결정할 수 있습니다.
Kafka Producer는 기본적으로 Built-in Partitioner를 사용합니다.
이는 ProducerConfig.PARTITIONER_CLASS_CONFIG 의 기본값이 null 로 설정되어 있기 때문입니다.따라서, ProducerConfig.PARTITIONER_CLASS_CONFIG 값을 명시적으로 설정하지 않을 경우, this.partitioner 필드는 null로 초기화됩니다.
그리고 enableAdaptivePartitioning 이라는 필드가 존재합니다.
이는 Adaptive Partitioning 을 사용할지 말지 결정하는 Boolean Type 의 변수입니다.
boolean enableAdaptivePartitioning = partitioner == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
위에서 설명하였듯이, 기본적으로 카프카 프로듀서는 Adaptive Partitioning 을 사용하게 됩니다.
이는 this.partitioner == null 인 조건과 PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG 가 True 인 조건으로 표현됩니다.
참고로 PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG 는 그 기본값이 True 입니다.
그래서 KafkaProducer 객체는 기본적으로 this.partitioner 가 null 이고 , enableAdaptivePartitioning 필드가 True 인 상태로 초기화됩니다.
( this.partitioner == null 의 의미는 사용자가 정의한 커스텀 파티셔너를 사용하지 않는다는 의미입니다. )
2. RecordAccumulator.java
Kafka 의 RecordAccumulator 는 Kafka Producer 에서 데이터를 브로커로 전송하기 전에 메시지 배치를 관리하고 최적화하는 역할을 하는 내부 구성 요소입니다.
많이들 들어보셨겠지만, Kafka Producer 는 Record 를 브로커에게 one by one 으로 전송하지 않습니다.
Batch 단위로 레코드들을 묶어서 한번에 전송하게 되죠.
이와 관련된 Kafka Producer 설정으로 batch.size 와 max.block.ms 같은 설정들이 존재합니다.
아래의 코드는 RecordAccumulator 에 존재하는 Partition 을 설정하는 조건문 코드입니다.
아래의 코드에서 if-else 구문이 보이실텐데요.
partition == RecordMetadata.UNKNOWN_PARTITION 인 상황은 Sticky Partitioning 이 적용되는 상황입니다.
즉, Key 가 존재하지 않는 Record 에 대해서 partition 은 RecordMetadata.UNKNOWN_PARTITION 으로 설정됩니다.
반면, Record 에 Key 가 존재하여 Hash Partitioning 이 수행되는 상황에서는 partition 에 0 이상의 정수가 할당됩니다.
이 정수는 특정 파티션의 식별값을 의미합니다.
final BuiltInPartitioner.StickyPartitionInfo partitionInfo; final int effectivePartition; if (partition == RecordMetadata.UNKNOWN_PARTITION) { partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); effectivePartition = partitionInfo.partition(); } else { partitionInfo = null; effectivePartition = partition; }
정리하자면, RecordAccumulator 의 기능을 통해서 Record 를 Accumulator 로 적재시킬 수 있습니다.
그리고 Record Accumulator 는 Record 를 Batch 단위로 적재시킵니다.
이 과정에서 Key 가 존재하지 않는 Record 는 partition 이 존재하지 않아 peekCurrentPartitionInfo 함수의 도움으로 적절한 Partition 을 찾습니다.
Key 가 존재하는 Record 는 Key 를 활용한 Hashing 과정을 통해 Partition 을 선택합니다.
아래의 이미지는 Sticky Partition 이 Accumulator 에서 Partition 이 선택되는 과정을 도식화하였습니다.
Key 가 없는 Record 는 Accumulator 내부에서 활성화되어 있는 Sticky Partition 의 Batch 로 Append 되게 됩니다.
만약 Partition 2 Batch 가 가득차서 더이상 Record 를 첨가할 수 없게 된다면, 다른 Partiiton 의 Batch 로 변경됩니다.
2.2 partitionChanged.
RecordAccumulator 의 partitionChanged 함수는 Sticky Partitioning 의 파티션 변경 여부를 판단하는 함수입니다.
( 아래의 topicInfo.builtInPartitioner.isPartitionChanged 함수는 아래의 BuiltInPartitioner.java 의 설명부로 대체하겠습니다. )
이 함수는 Sticky Partitioning 과 관련이 있으며, 메시지가 특정 파티션에 고정되었는지 또는 변경되었는지 확인하고 적절히 처리합니다.
즉, 현재 어떤 Kafka Producer 가 어떤 Partition 으로 고정되어있는지 ( Sticky ) 한지 판단합니다.
private boolean partitionChanged(String topic, TopicInfo topicInfo, BuiltInPartitioner.StickyPartitionInfo partitionInfo, Deque<ProducerBatch> deque, long nowMs, Cluster cluster) { if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) { log.trace("Partition {} for topic {} switched by a concurrent append, retrying", partitionInfo.partition(), topic); return true; } // We might have disabled partition switch if the queue had incomplete batches. // Check if all batches are full now and switch . if (allBatchesFull(deque)) { topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true); if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) { log.trace("Completed previously disabled switch for topic {} partition {}, retrying", topic, partitionInfo.partition()); return true; } } return false; }
topicInfo
- TopicInfo 는 각 Topic 별 Batch 들을 저장하고 있습니다.
- 만약에 Topic B (파티션 3개) 의 TopicInfo 는 아래와 같이 총 3개의 Batch 를 포함하는 Deque 로 구성됩니다.
- B - 1 : Deque<Batch>
- B - 2 : Deque<Batch>
- B - 3 : Deque<Batch>
- 그래서 B 토픽의 2번째 파티션가 어떤 Batch 들을 가지고 있는지 살펴보기 위해서 topicInfo.get(2) 와 같은 형식으로 조회할 수 있습니다.
private static class TopicInfo { public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>(); public final BuiltInPartitioner builtInPartitioner; public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) { builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize); } }
allBatchesFull
- allBatchesFull 함수는 Batch 가 가득차는지 여부를 판단하는 함수입니다.
- 카프카 프로듀서는 batch.size 설정 크기만큼의 Batch 를 제공합니다. (기본값 16kb)
- record 가 batch.size 크기만큼 Batch 를 가득 채우게 되면 allBatchesFull 함수는 True 를 반환합니다.
- Sticky Partitioning 의 상황에서 특정 Partition 의 Batch 가 가득차게 되면, Sticky Partition 이 변경되게 됩니다.
3. BuiltInPartitioner.java
Kafka 의 BuiltInPartitioner 클래스는 Adaptive Partitioning 기능을 구현한 내장 파티셔닝 모듈입니다.
이 클래스는 Hash Partitioning과 Sticky Partitioning 두 가지 방식을 지원하며, Adaptive Partitioning의 핵심 역할을 수행합니다.
BuiltInPartitioner 는 isPartitionChanged 메소드를 제공하며,
이 메서드는 현재 Sticky Partitioning 에서 사용 중인 파티션이 변경되었는지 여부를 반환합니다.
( 다시 말해, Hash Partitioning 이 사용 중인 경우에는 isPartitionChanged 함수는 사용되지 않으며, 오직 Sticky Partitioning 에서만 사용됩니다. )
아래는 isPartitionChanged 메서드의 구현입니다.
boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { // partitionInfo may be null if the caller didn't use built-in partitioner. return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo; // Hash Partitioning 에서는 partitionInfo == null // Sticky Partitioning 에서는 partitionInfo != null }
partitionInfo :
- isPartitionChanged 함수의 인자입니다.
- 현재의 Append 시도에서 사용할 파티션 정보를 나타냅니다.
- 만약 Record 가 Key 를 가지고 있어서 Hash Partitioning 이 사용된다면, partitionInfo 는 null 로 설정되어 있습니다.
아래의 코드에 의해서 partitionInfo 는 null 로 설정됩니다.
정확히는 조건문의 else 블록의 코드와 관련이 있습니다.
Hash Partitioning 에서는 partition 의 지정된 Partition 이 할당되기 때문입니다.
if (partition == RecordMetadata.UNKNOWN_PARTITION) { partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); effectivePartition = partitionInfo.partition(); } else { partitionInfo = null; effectivePartition = partition; }
stickyPartitionInfo :
- 현재 Sticky Partitioning 에서 사용 중인 파티션 정보를 나타냅니다.
- 즉, 직전 Append 시도에서 사용한 Sticky Partition 의 정보를 의미합니다.
private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
Output :
- 직전까지 사용한 Sticky Partition 과 이번 시도에서 사용할 Sticky Partition 이 서로 다를 경우에, True 를 반환합니다.
- 직전까지 사용한 Sticky Partition 과 이번 시도에서 사용할 Sticky Partition 이 서로 같은 경우에, False 를 반환합니다.
Adaptive Partitioning 은 Hash Partitioning 과 Sticky Partitioning 두가지 방식을 제공한다고 말씀드렸습니다.
isPartitionChanged 함수는 Sticky Partitioning 에 한해서 사용되는 함수입니다.
그래서 Key 가 존재하는 Record 에 한하여 isPartitionChanged 함수는 동작하지 않게 되구요.
Key 가 존재하지 않은 Record 는 어떻게 Sticky Partitioning 을 적용할지 결정하는데 사용됩니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] BufferPool 알아보기 (org.apache.kafka:kafka-clients) (0) 2024.06.03 [Kafka] Timeindex 알아보기 (0) 2024.06.01 [Kafka] Partition 알아보기 (0) 2024.05.12 [Kafka] max.block.ms 알아보기 (0) 2024.02.18 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18