ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Adaptive Partitioning 코드로 살펴보는 내부 동작 원리 ( BuiltInPartitioner )
    Kafka 2024. 6. 9. 08:16
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Kafka의 Adaptive Partitioning 은 Kafka 프로듀서가 데이터를 효율적으로 분배하기 위해 동적으로 파티션 할당 전략을 조정하는 기법입니다.

    3.x.x 대 버전의 Kafka Client Module 에서는 기본적으로 채택하고 있는 방식이며,

    이는 Hash Partitioning 과 Sticky Partitioning 을 동시에 제공합니다.

     

    Adaptive Partitioning  의 구체적인 전략은 아래와 같습니다.

    1. Record 에 Key 가 존재하지 않을 시에는 Sticky Partitioning 기법을 사용합니다.

    2. Record 에 Key 가 존재한다면 Hash Partitioning 기법은 제공합니다.

     

    이를 이미지로 표현하면 아래와 같습니다.

    먼저 Key 가 존재하지 않는 Record 들은 자체적인 Hash Partitioning 에 의해서 적절한 Partition 으로 분배됩니다.

     

     

    Sticky Partitioning 은 Hash Partitioning 처럼 Deterministic 하게 동작하지 않습니다.

    현재 Kafka Client 의 Buffer 의 상황에 맞게 특정 Partition 이 결정되는 구조입니다.

     

     

     

    Adaptive Partitioning 은 위처럼 2가지 Partitioning 전략을 사용하게 됩니다.

    그 기준은 Record 가 Key 를 가지느냐 아니냐에 따라서 결정됩니다.

     

    이어지는 글에서 좀 더 상세한 Adaptive Partitioning 에 대해서 다루어보도록 하겠습니다.

     

     

    Kafka Clients 모듈에서 Adaptive Partitioning 동작 원리 살펴보기.

    Kafka Clients 모듈에서 Adaptive Partitioning의 동작 방식을 분석하고자 합니다.
    본 문서는 Java Kafka Clients 모듈인 org.apache.kafka:kafka-clients:3.5.1 버전을 기준으로 작성되었습니다.

    (https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.5.1)

     

     

    1. KafkaProducer.java

     

    KafkaProducer의 KafkaProducer 생성자에서는 Partitioner를 생성하는 로직을 확인할 수 있습니다.
    해당 생성자에서는 아래와 같이 this.partitioner 필드를 초기화합니다.

     

    this.partitioner = config.getConfiguredInstance(
    	ProducerConfig.PARTITIONER_CLASS_CONFIG,
    	Partitioner.class,
    	Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

     

    this.partitioner는 KafkaProducer에서 기본적으로 사용할 Partitioner를 결정하는 역할을 합니다.
    이를 통해 메시지가 특정 파티션으로 라우팅되는 방식을 정의합니다.

     

    Partitioner는 크게 두 가지 유형으로 나뉩니다:

    1. 내장된(Built-in) Partitioner를 사용할 것인지,
    2. 사용자가 직접 구현한 Custom Partitioner를 사용할 것인지 결정할 수 있습니다.

    Kafka Producer는 기본적으로 Built-in Partitioner를 사용합니다.
    이는 ProducerConfig.PARTITIONER_CLASS_CONFIG 의 기본값이 null 로 설정되어 있기 때문입니다.

    따라서, ProducerConfig.PARTITIONER_CLASS_CONFIG 값을 명시적으로 설정하지 않을 경우, this.partitioner 필드는 null로 초기화됩니다.

     

     

    그리고 enableAdaptivePartitioning 이라는 필드가 존재합니다.

    이는 Adaptive Partitioning 을 사용할지 말지 결정하는 Boolean Type 의 변수입니다.

    boolean enableAdaptivePartitioning = 
        partitioner == null &&
        config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);

     

    위에서 설명하였듯이, 기본적으로 카프카 프로듀서는 Adaptive Partitioning 을 사용하게 됩니다.

    이는 this.partitioner == null 인 조건과 PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG 가 True 인 조건으로 표현됩니다.

    참고로 PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG 는 그 기본값이 True 입니다.

     

    그래서 KafkaProducer 객체는 기본적으로 this.partitioner 가 null 이고 , enableAdaptivePartitioning 필드가 True 인 상태로 초기화됩니다.

    ( this.partitioner == null 의 의미는 사용자가 정의한 커스텀 파티셔너를 사용하지 않는다는 의미입니다. )

     

     

    2. RecordAccumulator.java

    KafkaRecordAccumulatorKafka Producer 에서 데이터를 브로커로 전송하기 전에 메시지 배치를 관리하고 최적화하는 역할을 하는 내부 구성 요소입니다.

    많이들 들어보셨겠지만, Kafka Producer 는 Record 를 브로커에게 one by one 으로 전송하지 않습니다.

    Batch 단위로 레코드들을 묶어서 한번에 전송하게 되죠.

    이와 관련된 Kafka Producer 설정으로 batch.size 와 max.block.ms 같은 설정들이 존재합니다.

     

    아래의 코드는 RecordAccumulator 에 존재하는 Partition 을 설정하는 조건문 코드입니다.

    아래의 코드에서 if-else 구문이 보이실텐데요.

    partition == RecordMetadata.UNKNOWN_PARTITION 인 상황은 Sticky Partitioning 이 적용되는 상황입니다.

    즉, Key 가 존재하지 않는 Record 에 대해서 partition 은 RecordMetadata.UNKNOWN_PARTITION 으로 설정됩니다.

    반면, Record 에 Key 가 존재하여 Hash Partitioning 이 수행되는 상황에서는 partition 에 0 이상의 정수가 할당됩니다.

    이 정수는 특정 파티션의 식별값을 의미합니다.

     

    final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
    final int effectivePartition;
    if (partition == RecordMetadata.UNKNOWN_PARTITION) {
        partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
        effectivePartition = partitionInfo.partition();
    } else {
        partitionInfo = null;
        effectivePartition = partition;
    }

     

     

    정리하자면, RecordAccumulator 의 기능을 통해서 Record 를 Accumulator 로 적재시킬 수 있습니다.

    그리고 Record Accumulator 는 Record 를 Batch 단위로 적재시킵니다.

    이 과정에서 Key 가 존재하지 않는 Record 는 partition 이 존재하지 않아 peekCurrentPartitionInfo 함수의 도움으로 적절한 Partition 을 찾습니다.

    Key 가 존재하는 Record 는 Key 를 활용한 Hashing 과정을 통해 Partition 을 선택합니다.

     

    아래의 이미지는 Sticky Partition 이 Accumulator 에서 Partition 이 선택되는 과정을 도식화하였습니다.

    Key 가 없는 Record 는 Accumulator 내부에서 활성화되어 있는 Sticky Partition 의 Batch 로 Append 되게 됩니다.

    만약 Partition 2 Batch 가 가득차서 더이상 Record 를 첨가할 수 없게 된다면, 다른 Partiiton 의 Batch 로 변경됩니다.

     

    2.2 partitionChanged.

    RecordAccumulator 의 partitionChanged 함수는 Sticky Partitioning 의 파티션 변경 여부를 판단하는 함수입니다.

    ( 아래의 topicInfo.builtInPartitioner.isPartitionChanged 함수는 아래의 BuiltInPartitioner.java 의 설명부로 대체하겠습니다. )

     

    이 함수는 Sticky Partitioning 과 관련이 있으며, 메시지가 특정 파티션에 고정되었는지 또는 변경되었는지 확인하고 적절히 처리합니다.

    즉, 현재 어떤 Kafka Producer 가 어떤 Partition 으로 고정되어있는지 ( Sticky ) 한지 판단합니다.

     

    private boolean partitionChanged(String topic,
                                     TopicInfo topicInfo,
                                     BuiltInPartitioner.StickyPartitionInfo partitionInfo,
                                     Deque<ProducerBatch> deque, long nowMs,
                                     Cluster cluster) {
        if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
            log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
                    partitionInfo.partition(), topic);
            return true;
        }
    
        // We might have disabled partition switch if the queue had incomplete batches.
        // Check if all batches are full now and switch .
        if (allBatchesFull(deque)) {
            topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true);
            if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
                log.trace("Completed previously disabled switch for topic {} partition {}, retrying",
                        topic, partitionInfo.partition());
                return true;
            }
        }
    
        return false;
    }

     

    topicInfo

    • TopicInfo 는 각 Topic 별 Batch 들을 저장하고 있습니다.
    • 만약에 Topic B (파티션 3개) 의 TopicInfo 는 아래와 같이 총 3개의 Batch 를 포함하는 Deque 로 구성됩니다.
      • B - 1 : Deque<Batch>
      • B - 2 : Deque<Batch>
      • B - 3 : Deque<Batch>

     

    • 그래서 B 토픽의 2번째 파티션가 어떤 Batch 들을 가지고 있는지 살펴보기 위해서 topicInfo.get(2) 와 같은 형식으로 조회할 수 있습니다.
        private static class TopicInfo {
            public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
            public final BuiltInPartitioner builtInPartitioner;
    
            public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) {
                builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize);
            }
        }

     

     

    allBatchesFull

    • allBatchesFull 함수는 Batch 가 가득차는지 여부를 판단하는 함수입니다.
    • 카프카 프로듀서는 batch.size 설정 크기만큼의 Batch 를 제공합니다. (기본값 16kb)
    • record 가 batch.size 크기만큼 Batch 를 가득 채우게 되면 allBatchesFull 함수는 True 를 반환합니다.
    • Sticky Partitioning 의 상황에서 특정 Partition 의 Batch 가 가득차게 되면, Sticky Partition 이 변경되게 됩니다.

     

    3. BuiltInPartitioner.java

    Kafka 의 BuiltInPartitioner 클래스는 Adaptive Partitioning 기능을 구현한 내장 파티셔닝 모듈입니다.

    이 클래스는 Hash PartitioningSticky Partitioning 두 가지 방식을 지원하며, Adaptive Partitioning의 핵심 역할을 수행합니다.

     

    BuiltInPartitioner 는 isPartitionChanged 메소드를 제공하며,

    이 메서드는 현재 Sticky Partitioning 에서 사용 중인 파티션이 변경되었는지 여부를 반환합니다.

    ( 다시 말해, Hash Partitioning 이 사용 중인 경우에는 isPartitionChanged 함수는 사용되지 않으며, 오직 Sticky Partitioning 에서만 사용됩니다. )

    아래는 isPartitionChanged 메서드의 구현입니다.

    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
        // partitionInfo may be null if the caller didn't use built-in partitioner.
        return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo;
        // Hash Partitioning 에서는 partitionInfo == null
        // Sticky Partitioning 에서는 partitionInfo != null
    }

     

    partitionInfo :

    • isPartitionChanged 함수의 인자입니다.
    • 현재의 Append 시도에서 사용할 파티션 정보를 나타냅니다.
    • 만약 RecordKey 를 가지고 있어서 Hash Partitioning 이 사용된다면, partitionInfo 는 null 로 설정되어 있습니다.

    아래의 코드에 의해서 partitionInfo 는 null 로 설정됩니다.

    정확히는 조건문의 else 블록의 코드와 관련이 있습니다.

    Hash Partitioning 에서는 partition 의 지정된 Partition 이 할당되기 때문입니다.

     

    if (partition == RecordMetadata.UNKNOWN_PARTITION) {
        partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
        effectivePartition = partitionInfo.partition();
    } else {
        partitionInfo = null;
        effectivePartition = partition;
    }

     

     

    stickyPartitionInfo :

    • 현재 Sticky Partitioning 에서 사용 중인 파티션 정보를 나타냅니다.
    • 즉, 직전 Append 시도에서 사용한 Sticky Partition 의 정보를 의미합니다.

     

    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
    	new AtomicReference<>();

     

    Output :

    • 직전까지 사용한 Sticky Partition 과 이번 시도에서 사용할 Sticky Partition 이 서로 다를 경우에, True 를 반환합니다.
    • 직전까지 사용한 Sticky Partition 과 이번 시도에서 사용할 Sticky Partition 이 서로 같은 경우에, False 를 반환합니다.

     

    Adaptive Partitioning 은 Hash Partitioning 과 Sticky Partitioning 두가지 방식을 제공한다고 말씀드렸습니다.

    isPartitionChanged 함수는 Sticky Partitioning 에 한해서 사용되는 함수입니다.

    그래서 Key 가 존재하는 Record 에 한하여 isPartitionChanged 함수는 동작하지 않게 되구요.

    Key 가 존재하지 않은 Record 는 어떻게 Sticky Partitioning 을 적용할지 결정하는데 사용됩니다.

     

     

     

    반응형
Designed by Tistory.