ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Rebalance 가 발생하는 경우들 알아보기 ( Rebalance Scenario )
    Kafka 2024. 2. 4. 15:08
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Rebalance 가 발생하는 여러가지 경우들에 대해서 자세히 알아보려고 합니다.

    사용하게 될 Consumer 예제 코드는 아래와 같습니다.

    kafka-clients 2.8.1 모듈을 사용하였구요.

    CustomRebalanceListener 추가하여 Rebalance 가 발생하는 상황을 출력하도록 Consumer 프로그램을 작성하였습니다.

    package com.westlife.consumers;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.*;
    import java.time.*;
    import java.util.*;
    
    public class TestRebalance {
      public static void main (String[] args) {
        String consumerUniqId = args.length > 0 ? args[0] : "0";
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-topic-consumer-group-1");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        List<String> topics = List.of("test-topic");
    
        ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
          @Override
          public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println(String.format("### %s onPartitionsRevoked server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions));
          }
    
          @Override
          public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println(String.format("### %s onPartitionsAssigned server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions));
          }
    
          @Override
          public void onPartitionsLost(Collection<TopicPartition> partitions) {
            System.out.println(String.format("### %s onPartitionsLost server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions));
          }
        };
    
        consumer.subscribe(topics, rebalanceListener);
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 * 60));
          records.forEach(stringStringConsumerRecord -> System.out.println(stringStringConsumerRecord));
        }
      }
    }

     

    Consumer 가 새롭게 추가되는 경우.

    'test-topic" 이라는 Topic 은 3개의 Partition 을 가집니다.

     

    그리고 Consumer 1 과 Consumer 2 인 2개의 Consumer 를 생성해보도록 하겠습니다.

    먼저 Consumer 1 실행해보겠습니다.

     

    Consumer 1 생성.

    Consumer 1 이 생성되면 아래와 같이 onPartitionsAssigned Callback 이 호출됩니다.

    onPartitionsAssigned Callback 은 Consumer 와 Partition 의 Ownership 이 결정되었을 때에 호출됩니다.

    아래의 경우는 Consumer 1 이 모든 Partition 들을 Consume 하게 됩니다.

    ### 2021-05-08T18:21:09.883676Z onPartitionsAssigned 
    server : 1, partition : [test-topic-2, test-topic-0, test-topic-1]

     

    Consumer 2 추가.

    이제 새로운 Consumer 2 를 추가해보겠습니다.

    Consumer 2 는 아래와 같이 "test-topic" 토픽의 Partition 2 를 Consume 하게 됩니다.

    ### 2021-05-08T18:23:16.984657Z onPartitionsAssigned 
     server : 2, partition : [test-topic-2]

     

    Consumer 1 상태

    이 때에 Consumer 1 의 상태는 어떻게 되었을까요 ?

    Consumer 1 은 onPartitionsRevoked 가 호출되어 Partition Ownership 이 종료되었음을 알게되고,

    새롭게 0 번 & 1 번 Partition 을 할당받습니다.

    ### 2021-05-08T18:23:16.954813Z onPartitionsRevoked 
     server : 1, partition : [test-topic-2, test-topic-0, test-topic-1]
    ### 2021-05-08T18:23:16.979479Z onPartitionsAssigned 
     server : 1, partition : [test-topic-0, test-topic-1]

     

    Consumer 3 추가.

    세번째 Consumer 를 추가해보겠습니다.

    아래의 출력과 같이 Consumer 3 이 실행되었고, onPartitionsAssigned Callback 을 통해서

    Consumer 3 은 2번 Partition 의 Ownership 을 획득하였습니다.

    ### 2021-05-08T18:25:56.112791Z onPartitionsAssigned 
     server : 3, partition : [test-topic-2]

     

    Consumer 1 상태

    이때의 Consumer 1 의 상태는 아래와 같습니다.

    Rebalancing 이 발생하여 1번 Partition 을 포기하고, 0번 Partition 의 Ownership 만을 가지게 됩니다.

    ### 2021-05-08T18:25:56.066991Z onPartitionsRevoked 
     server : 1, partition : [test-topic-0, test-topic-1]
    ### 2021-05-08T18:25:56.106946Z onPartitionsAssigned 
     server : 1, partition : [test-topic-0]

     

    Consumer 2 상태

    Consumer 2 의 상태는 아래와 같습니다.

    Consumer 2 는 2번 Partition 을 포기하고, 1번 Partition 의 Ownership 을 가지게 됩니다.

    ### 2021-05-08T18:25:56.058298Z onPartitionsRevoked 
     server : 2, partition : [test-topic-2]
    ### 2021-05-08T18:25:56.106975Z onPartitionsAssigned 
     server : 2, partition : [test-topic-1]

     

     

    그림으로 표현하면 아래와 같습니다.

    Consumer 1 만이 존재할 때에는 모든 Partition 들은 Consumer 1 이 소비합니다.

    그리고 Consumer 가 하나씩 추가될 때마다 Rebalance 가 발생하여 Partition - Consumer 의 관계가 변경됩니다.

     

     

     

    Idle Consumer 추가.

    이제 3개의 Partition 과 3개의 Consumer 들은 1 대 1 의 관계를 가지게 되었습니다.

    앞으로 추가되는 Consumer 는 소비할 Partition 이 없는 Idle Consumer 입니다.

    하지만 이러한 상황에서도 Rebalance 를 발생합니다.

     

    Consumer 4 를 추가해보겠습니다.

    Consumer 4 가 실행되었고, 1번 Partition 이 할당되었네요.

    ### 2021-05-08T18:33:20.455051Z onPartitionsAssigned 
     server : 4, partition : [test-topic-1]

     

    Consumer 1 상태

    Consumer 1 은 기존의 Partition 0 이 할당됩니다.

    ### 2021-05-08T18:33:20.306580Z onPartitionsRevoked 
     server : 1, partition : [test-topic-0]
    ### 2021-05-08T18:33:20.450642Z onPartitionsAssigned 
     server : 1, partition : [test-topic-0]

     

    Consumer 2 상태

    Consumer 2 은 더 이상 Partition 이 할당되지 않아서 Idle 상태가 됩니다.

    ### 2021-05-08T18:33:20.421687Z onPartitionsRevoked 
     server : 3, partition : [test-topic-2]
    ### 2021-05-08T18:33:20.450705Z onPartitionsAssigned 
     server : 3, partition : []

     

    Consumer 3 상태

    Consumer 3 은 Partition 2 가 할당됩니다.

    ### 2021-05-08T18:33:20.321848Z onPartitionsRevoked 
     server : 2, partition : [test-topic-1]
    ### 2021-05-08T18:33:20.450623Z onPartitionsAssigned 
     server : 2, partition : [test-topic-2]

     

    그림으로 표현하면 아래와 같은 상태가 됩니다.

     

     

    Consumer 가 제거되는 경우.

    이제는 반대로 Consumer 를 Consumer Group 에서 제거해보겠습니다.

    Consumer 가 종료되면 더 이상 Heartbeat 를 Group Coordinator 에게 전달하지 않습니다.

    그리고 session.timeout.ms 가 지나게 되면 Rebalance 이 수행됩니다.

    Consumer 1 제거.

    Consumer 1 이 종료됩니다.

    더 이상의 Heartbeat 는 Group Coordinator 에게 전달되지 않습니다.

    session.timeout.ms 가 지난 후의 다른 Consumer 들의 상태를 살펴보겠습니다.

     

    Consumer 2 상태

    Consumer 2 의 상태입니다.

    새로운 Partition 1 이 할당됩니다.

    ### 2021-05-08T18:49:47.886506Z onPartitionsRevoked 
     server : 2, partition : [test-topic-2]
    ### 2021-05-08T18:49:48.058877Z onPartitionsAssigned 
     server : 2, partition : [test-topic-1]

     

    Consumer 3 상태

    Consumer 3 의 상태는 아래와 같습니다.

    어떠한 Partition 도 할당되지 않았던 Idle 상태의 Consumer 였지만, 새롭게 2 번 Partition 이 할당됩니다.

    ### 2021-05-08T18:49:47.929044Z onPartitionsRevoked 
     server : 3, partition : []
    ### 2021-05-08T18:49:48.058843Z onPartitionsAssigned 
     server : 3, partition : [test-topic-2]

     

    Consumer 4 상태

    Consumer 4의 상태는 아래와 같습니다.

    1번 Partition 에서 0번 Partition 으로 Ownership 이 변경됩니다.

    ### 2021-05-08T18:49:47.952973Z onPartitionsRevoked 
     server : 4, partition : [test-topic-1]
    ### 2021-05-08T18:49:48.063027Z onPartitionsAssigned 
     server : 4, partition : [test-topic-0]

     

     

    Rebalance 의 결과를 그림으로 표현하면 아래와 같습니다.

     

     

     

    Consumer Restart.

    실제 운영환경에서 Consumer 는 하나의 Server 로써 동작하게되고, 대개 Auto-Healing 기능이 있습니다.

    그래서 자동적으로 Restart 하게 되죠.

    예를 들어, Kafka Consumer 가 Kubernetes 의 Pod 로써 동작하게 된다면, 종료된 Pod 는 Replica 정책에 의해서 되살아나게 됩니다.

    이 상황을 Kafka Rebalance 관점에서 바라보면 1회의 Consumer 삭제와 1회의 Consumer 생성으로 이해할 수 있습니다.

    즉, 2번의 Rebalance 가 발생합니다.

     

    한번 예를 들어보겠습니다.

     

    Consumer 2 Restart.

    Consumer 2 를 종료시키고, session.timeout.ms 이후에 다시 재시작하는 방식으로 테스트를 진행합니다.

    19:00:36 경에 Consumer 2 는 종료됩니다. 그리고 19:00:42 에 재시작하였습니다.

     

    consumer 3 상태

    Consumer 2 이 종료된 19:00:36 경에 Rebalance 가 발생합니다.

    그리고 Consumer 2 가 Restart 된 시점인 19:00:42 에 새로운 Rebalance 가 발생합니다.

    ### 2021-05-08T19:00:36.245535Z onPartitionsRevoked 
     server : 3, partition : [test-topic-2]
    ### 2021-05-08T19:00:36.297571Z onPartitionsAssigned 
     server : 3, partition : [test-topic-2]
    ### 2021-05-08T19:00:42.282472Z onPartitionsRevoked 
     server : 3, partition : [test-topic-2]
    ### 2021-05-08T19:00:42.300203Z onPartitionsAssigned 
     server : 3, partition : [test-topic-2]

     

    Consumer 4 상태

    Consumer 4 또한 마찬가지입니다.

    Consumer 2 이 종료된 19:00:36 와 Consumer 2 가 Restart 된 시점인 19:00:42 에 2회의 Rebalance 가 발생합니다.

    ### 2021-05-08T19:00:36.271910Z onPartitionsRevoked 
     server : 4, partition : [test-topic-0]
    ### 2021-05-08T19:00:36.297468Z onPartitionsAssigned 
     server : 4, partition : [test-topic-0, test-topic-1]
    ### 2021-05-08T19:00:42.281686Z onPartitionsRevoked 
     server : 4, partition : [test-topic-0, test-topic-1]
    ### 2021-05-08T19:00:42.299942Z onPartitionsAssigned 
     server : 4, partition : [test-topic-1]

     

    consumer 2 상태

    19:00:42 경에 재시작이 되면서 0 번 Partition 이 할당됩니다.

    ### 2021-05-08T19:00:42.305057Z onPartitionsAssigned 
     server : 2, partition : [test-topic-0]

     

     

    2회의 Rebalance 상황은 아래의 그림처럼 표현됩니다.

     

     

     

    Partition 이 변경되는 경우.

    Topic 의 Partition 의 수가 증가하는 경우에 Rebalance 가 발생합니다.

     

    Partition 갯수 증가.

    Topic 의 Partition 의 수가 변경되는 경우에도 Rebalance 가 발생합니다.

    이 또한 Partition 과 Consumer 의 Unbalanced 를 초래하기에 Consumer 의 추가/제거 와 유사한 이유입니다.

     

    아래 명령어를 통해서 Partition 을 3개에서 4개로 증가하였습니다.

    kafka-topics --bootstrap-server localhost:9092 --alter --topic test-topic --partitions 4

     

    아래의 출력들은 3번 Partition 이 추가된 이후의 Consumer 들의 상태입니다.

     

    Consumer 2 상태

    ### 2021-05-08T19:19:57.840206Z onPartitionsRevoked 
     server : 2, partition : [test-topic-0]
    ### 2021-05-08T19:19:57.904700Z onPartitionsAssigned 
     server : 2, partition : [test-topic-0, test-topic-1]

     

    Consumer 3 상태

    ### 2021-05-08T19:19:55.357187Z onPartitionsRevoked 
     server : 3, partition : [test-topic-2]
    ### 2021-05-08T19:19:57.904209Z onPartitionsAssigned 
     server : 3, partition : [test-topic-3]

     

    Consumer 4 상태

    ### 2021-05-08T19:19:57.800569Z onPartitionsRevoked 
     server : 4, partition : [test-topic-1]
    ### 2021-05-08T19:19:57.905115Z onPartitionsAssigned 
     server : 4, partition : [test-topic-2]

     

     

    Reassignment Partition.

    제 실험에 의하면 Partition 의 Reassignment 가 적용되는 경우에는 Rebalance 가 발생하지 않습니다.

    아래의 명령어를 통해서 Repartition 을 수행하였고,  강제로 각 Partition 의 Leader Broker 를 변경하였습니다.

    cat <<EOF> /tmp/repartition.json
    {
      "version": 1,
      "partitions": [
        {"topic": "test-topic", "partition": 0, "replicas": [2, 3, 1]},
        {"topic": "test-topic", "partition": 1, "replicas": [3, 1, 2]},
        {"topic": "test-topic", "partition": 2, "replicas": [1, 2, 3]},
        {"topic": "test-topic", "partition": 3, "replicas": [3, 1, 2]}    
      ]
    }
    EOF
    
    kafka-reassign-partitions --bootstrap-server localhost:9092 \
    --reassignment-json-file /tmp/repartition.json --execute

     

    먼저 Repartition 을 적용하기 이전의 상태입니다.

    ### 2021-05-08T19:30:26.564725Z onPartitionsAssigned 
     server : 2, partition : [test-topic-3]
    ### 2021-05-08T19:30:26.569458Z onPartitionsAssigned 
     server : 1, partition : [test-topic-2]
    ### 2021-05-08T19:30:26.569692Z onPartitionsAssigned 
     server : 3, partition : [test-topic-0, test-topic-1]

     

    그리고 Repartitioning 이 수행되어도 Rebalance 는 되지 않았습니다.

     

     

    max.poll.interval.ms 의 초과.

    Kafka Consumer 의 Polling 주기가 max.poll.interval.ms 를 초과하게 되면 Rebalance 가 발생합니다.

    실제 코드와 함께 한번 예시를 들어보겠습니다.

    max.poll.interval.ms 를 10초로 설정하고, 각 Polling 사이에 30초의 Delay 를 추가하여 Polling Interval 을 초과하도록 설정하였습니다.

    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10 * 1000);
    
    while (true) {
      System.out.println(String.format("### %s start Polling", Instant.now().toString()));
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 * 1));
      System.out.println(String.format("### %s start sleep 30 secs", Instant.now().toString()));
      Thread.sleep(30 * 1000);
      records.forEach(stringStringConsumerRecord -> System.out.println(stringStringConsumerRecord));
    }

     

    그리고 Consumer 0 과 Consumer 1 을 두어 4개의 Partition 들을 처리합니다.

    그림으로 표현하면 아래와 같습니다.

     

     

    Consumer 0 상태

    Consumer 0 은 의도적으로 max.poll.interval.ms 를 초과하도록 Polling 주기를 설정하였습니다.

    아래와 같이 30초를 주기로 onPartitionsLost 와 onPartitionsAssigned 가 발생합니다.

    ### 2021-05-08T20:00:52.474904Z onPartitionsLost 
     server : 0, partition : [test-topic-0, test-topic-1]
    ### 2021-05-08T20:00:53.474658Z start sleep 30 secs
    ### 2021-05-08T20:01:23.479044Z start Polling
    ### 2021-05-08T20:01:24.481462Z start sleep 30 secs
    ### 2021-05-08T20:01:54.483830Z start Polling
    ### 2021-05-08T20:01:54.663388Z onPartitionsAssigned 
     server : 0, partition : [test-topic-2, test-topic-3]
    ### 2021-05-08T20:01:55.485335Z start sleep 30 secs
    ### 2021-05-08T20:02:25.493237Z start Polling
    ### 2021-05-08T20:02:25.500444Z onPartitionsLost 
     server : 0, partition : [test-topic-2, test-topic-3]
    ### 2021-05-08T20:02:26.503201Z start sleep 30 secs
    ### 2021-05-08T20:02:56.507846Z start Polling
    ### 2021-05-08T20:02:57.510367Z start sleep 30 secs

     

    ConsumerRebalanceListener 의 Callback 들은 Poll 함수의 내부에서 호출됩니다.

    그래서 Polling 의 주기가 길어질수록 Callback 의 호출 또한 더딥니다.

     

     

    마치며.

    Rebalance 를 유발하는 다양한 케이스에 대해서 알아보았습니다.

    다음 글에서는 Rebalance 시에 Data Loss 나 Duplication 을 방지하는 방법에 대해서 알아보도록 하겠습니다.

    감사합니다.

     

    반응형
Designed by Tistory.