-
[Kafka] Rebalance 가 발생하는 경우들 알아보기 ( Rebalance Scenario )Kafka 2024. 2. 4. 15:08728x90반응형
- 목차
들어가며.
이번 글에서는 Rebalance 가 발생하는 여러가지 경우들에 대해서 자세히 알아보려고 합니다.
사용하게 될 Consumer 예제 코드는 아래와 같습니다.
kafka-clients 2.8.1 모듈을 사용하였구요.
CustomRebalanceListener 추가하여 Rebalance 가 발생하는 상황을 출력하도록 Consumer 프로그램을 작성하였습니다.
package com.westlife.consumers; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.*; import java.time.*; import java.util.*; public class TestRebalance { public static void main (String[] args) { String consumerUniqId = args.length > 0 ? args[0] : "0"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-topic-consumer-group-1"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); List<String> topics = List.of("test-topic"); ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println(String.format("### %s onPartitionsRevoked server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions)); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println(String.format("### %s onPartitionsAssigned server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions)); } @Override public void onPartitionsLost(Collection<TopicPartition> partitions) { System.out.println(String.format("### %s onPartitionsLost server : %s, partition : %s", Instant.now().toString(), consumerUniqId, partitions)); } }; consumer.subscribe(topics, rebalanceListener); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 * 60)); records.forEach(stringStringConsumerRecord -> System.out.println(stringStringConsumerRecord)); } } }
Consumer 가 새롭게 추가되는 경우.
'test-topic" 이라는 Topic 은 3개의 Partition 을 가집니다.
그리고 Consumer 1 과 Consumer 2 인 2개의 Consumer 를 생성해보도록 하겠습니다.
먼저 Consumer 1 실행해보겠습니다.
Consumer 1 생성.
Consumer 1 이 생성되면 아래와 같이 onPartitionsAssigned Callback 이 호출됩니다.
onPartitionsAssigned Callback 은 Consumer 와 Partition 의 Ownership 이 결정되었을 때에 호출됩니다.
아래의 경우는 Consumer 1 이 모든 Partition 들을 Consume 하게 됩니다.
### 2021-05-08T18:21:09.883676Z onPartitionsAssigned server : 1, partition : [test-topic-2, test-topic-0, test-topic-1]
Consumer 2 추가.
이제 새로운 Consumer 2 를 추가해보겠습니다.
Consumer 2 는 아래와 같이 "test-topic" 토픽의 Partition 2 를 Consume 하게 됩니다.
### 2021-05-08T18:23:16.984657Z onPartitionsAssigned server : 2, partition : [test-topic-2]
Consumer 1 상태
이 때에 Consumer 1 의 상태는 어떻게 되었을까요 ?
Consumer 1 은 onPartitionsRevoked 가 호출되어 Partition Ownership 이 종료되었음을 알게되고,
새롭게 0 번 & 1 번 Partition 을 할당받습니다.
### 2021-05-08T18:23:16.954813Z onPartitionsRevoked server : 1, partition : [test-topic-2, test-topic-0, test-topic-1] ### 2021-05-08T18:23:16.979479Z onPartitionsAssigned server : 1, partition : [test-topic-0, test-topic-1]
Consumer 3 추가.
세번째 Consumer 를 추가해보겠습니다.
아래의 출력과 같이 Consumer 3 이 실행되었고, onPartitionsAssigned Callback 을 통해서
Consumer 3 은 2번 Partition 의 Ownership 을 획득하였습니다.
### 2021-05-08T18:25:56.112791Z onPartitionsAssigned server : 3, partition : [test-topic-2]
Consumer 1 상태
이때의 Consumer 1 의 상태는 아래와 같습니다.
Rebalancing 이 발생하여 1번 Partition 을 포기하고, 0번 Partition 의 Ownership 만을 가지게 됩니다.
### 2021-05-08T18:25:56.066991Z onPartitionsRevoked server : 1, partition : [test-topic-0, test-topic-1] ### 2021-05-08T18:25:56.106946Z onPartitionsAssigned server : 1, partition : [test-topic-0]
Consumer 2 상태
Consumer 2 의 상태는 아래와 같습니다.
Consumer 2 는 2번 Partition 을 포기하고, 1번 Partition 의 Ownership 을 가지게 됩니다.
### 2021-05-08T18:25:56.058298Z onPartitionsRevoked server : 2, partition : [test-topic-2] ### 2021-05-08T18:25:56.106975Z onPartitionsAssigned server : 2, partition : [test-topic-1]
그림으로 표현하면 아래와 같습니다.
Consumer 1 만이 존재할 때에는 모든 Partition 들은 Consumer 1 이 소비합니다.
그리고 Consumer 가 하나씩 추가될 때마다 Rebalance 가 발생하여 Partition - Consumer 의 관계가 변경됩니다.
Idle Consumer 추가.
이제 3개의 Partition 과 3개의 Consumer 들은 1 대 1 의 관계를 가지게 되었습니다.
앞으로 추가되는 Consumer 는 소비할 Partition 이 없는 Idle Consumer 입니다.
하지만 이러한 상황에서도 Rebalance 를 발생합니다.
Consumer 4 를 추가해보겠습니다.
Consumer 4 가 실행되었고, 1번 Partition 이 할당되었네요.
### 2021-05-08T18:33:20.455051Z onPartitionsAssigned server : 4, partition : [test-topic-1]
Consumer 1 상태
Consumer 1 은 기존의 Partition 0 이 할당됩니다.
### 2021-05-08T18:33:20.306580Z onPartitionsRevoked server : 1, partition : [test-topic-0] ### 2021-05-08T18:33:20.450642Z onPartitionsAssigned server : 1, partition : [test-topic-0]
Consumer 2 상태
Consumer 2 은 더 이상 Partition 이 할당되지 않아서 Idle 상태가 됩니다.
### 2021-05-08T18:33:20.421687Z onPartitionsRevoked server : 3, partition : [test-topic-2] ### 2021-05-08T18:33:20.450705Z onPartitionsAssigned server : 3, partition : []
Consumer 3 상태
Consumer 3 은 Partition 2 가 할당됩니다.
### 2021-05-08T18:33:20.321848Z onPartitionsRevoked server : 2, partition : [test-topic-1] ### 2021-05-08T18:33:20.450623Z onPartitionsAssigned server : 2, partition : [test-topic-2]
그림으로 표현하면 아래와 같은 상태가 됩니다.
Consumer 가 제거되는 경우.
이제는 반대로 Consumer 를 Consumer Group 에서 제거해보겠습니다.
Consumer 가 종료되면 더 이상 Heartbeat 를 Group Coordinator 에게 전달하지 않습니다.
그리고 session.timeout.ms 가 지나게 되면 Rebalance 이 수행됩니다.
Consumer 1 제거.
Consumer 1 이 종료됩니다.
더 이상의 Heartbeat 는 Group Coordinator 에게 전달되지 않습니다.
session.timeout.ms 가 지난 후의 다른 Consumer 들의 상태를 살펴보겠습니다.
Consumer 2 상태
Consumer 2 의 상태입니다.
새로운 Partition 1 이 할당됩니다.
### 2021-05-08T18:49:47.886506Z onPartitionsRevoked server : 2, partition : [test-topic-2] ### 2021-05-08T18:49:48.058877Z onPartitionsAssigned server : 2, partition : [test-topic-1]
Consumer 3 상태
Consumer 3 의 상태는 아래와 같습니다.
어떠한 Partition 도 할당되지 않았던 Idle 상태의 Consumer 였지만, 새롭게 2 번 Partition 이 할당됩니다.
### 2021-05-08T18:49:47.929044Z onPartitionsRevoked server : 3, partition : [] ### 2021-05-08T18:49:48.058843Z onPartitionsAssigned server : 3, partition : [test-topic-2]
Consumer 4 상태
Consumer 4의 상태는 아래와 같습니다.
1번 Partition 에서 0번 Partition 으로 Ownership 이 변경됩니다.
### 2021-05-08T18:49:47.952973Z onPartitionsRevoked server : 4, partition : [test-topic-1] ### 2021-05-08T18:49:48.063027Z onPartitionsAssigned server : 4, partition : [test-topic-0]
Rebalance 의 결과를 그림으로 표현하면 아래와 같습니다.
Consumer Restart.
실제 운영환경에서 Consumer 는 하나의 Server 로써 동작하게되고, 대개 Auto-Healing 기능이 있습니다.
그래서 자동적으로 Restart 하게 되죠.
예를 들어, Kafka Consumer 가 Kubernetes 의 Pod 로써 동작하게 된다면, 종료된 Pod 는 Replica 정책에 의해서 되살아나게 됩니다.
이 상황을 Kafka Rebalance 관점에서 바라보면 1회의 Consumer 삭제와 1회의 Consumer 생성으로 이해할 수 있습니다.
즉, 2번의 Rebalance 가 발생합니다.
한번 예를 들어보겠습니다.
Consumer 2 Restart.
Consumer 2 를 종료시키고, session.timeout.ms 이후에 다시 재시작하는 방식으로 테스트를 진행합니다.
19:00:36 경에 Consumer 2 는 종료됩니다. 그리고 19:00:42 에 재시작하였습니다.
consumer 3 상태
Consumer 2 이 종료된 19:00:36 경에 Rebalance 가 발생합니다.
그리고 Consumer 2 가 Restart 된 시점인 19:00:42 에 새로운 Rebalance 가 발생합니다.
### 2021-05-08T19:00:36.245535Z onPartitionsRevoked server : 3, partition : [test-topic-2] ### 2021-05-08T19:00:36.297571Z onPartitionsAssigned server : 3, partition : [test-topic-2] ### 2021-05-08T19:00:42.282472Z onPartitionsRevoked server : 3, partition : [test-topic-2] ### 2021-05-08T19:00:42.300203Z onPartitionsAssigned server : 3, partition : [test-topic-2]
Consumer 4 상태
Consumer 4 또한 마찬가지입니다.
Consumer 2 이 종료된 19:00:36 와 Consumer 2 가 Restart 된 시점인 19:00:42 에 2회의 Rebalance 가 발생합니다.
### 2021-05-08T19:00:36.271910Z onPartitionsRevoked server : 4, partition : [test-topic-0] ### 2021-05-08T19:00:36.297468Z onPartitionsAssigned server : 4, partition : [test-topic-0, test-topic-1] ### 2021-05-08T19:00:42.281686Z onPartitionsRevoked server : 4, partition : [test-topic-0, test-topic-1] ### 2021-05-08T19:00:42.299942Z onPartitionsAssigned server : 4, partition : [test-topic-1]
consumer 2 상태
19:00:42 경에 재시작이 되면서 0 번 Partition 이 할당됩니다.
### 2021-05-08T19:00:42.305057Z onPartitionsAssigned server : 2, partition : [test-topic-0]
2회의 Rebalance 상황은 아래의 그림처럼 표현됩니다.
Partition 이 변경되는 경우.
Topic 의 Partition 의 수가 증가하는 경우에 Rebalance 가 발생합니다.
Partition 갯수 증가.
Topic 의 Partition 의 수가 변경되는 경우에도 Rebalance 가 발생합니다.
이 또한 Partition 과 Consumer 의 Unbalanced 를 초래하기에 Consumer 의 추가/제거 와 유사한 이유입니다.
아래 명령어를 통해서 Partition 을 3개에서 4개로 증가하였습니다.
kafka-topics --bootstrap-server localhost:9092 --alter --topic test-topic --partitions 4
아래의 출력들은 3번 Partition 이 추가된 이후의 Consumer 들의 상태입니다.
Consumer 2 상태
### 2021-05-08T19:19:57.840206Z onPartitionsRevoked server : 2, partition : [test-topic-0] ### 2021-05-08T19:19:57.904700Z onPartitionsAssigned server : 2, partition : [test-topic-0, test-topic-1]
Consumer 3 상태
### 2021-05-08T19:19:55.357187Z onPartitionsRevoked server : 3, partition : [test-topic-2] ### 2021-05-08T19:19:57.904209Z onPartitionsAssigned server : 3, partition : [test-topic-3]
Consumer 4 상태
### 2021-05-08T19:19:57.800569Z onPartitionsRevoked server : 4, partition : [test-topic-1] ### 2021-05-08T19:19:57.905115Z onPartitionsAssigned server : 4, partition : [test-topic-2]
Reassignment Partition.
제 실험에 의하면 Partition 의 Reassignment 가 적용되는 경우에는 Rebalance 가 발생하지 않습니다.
아래의 명령어를 통해서 Repartition 을 수행하였고, 강제로 각 Partition 의 Leader Broker 를 변경하였습니다.
cat <<EOF> /tmp/repartition.json { "version": 1, "partitions": [ {"topic": "test-topic", "partition": 0, "replicas": [2, 3, 1]}, {"topic": "test-topic", "partition": 1, "replicas": [3, 1, 2]}, {"topic": "test-topic", "partition": 2, "replicas": [1, 2, 3]}, {"topic": "test-topic", "partition": 3, "replicas": [3, 1, 2]} ] } EOF kafka-reassign-partitions --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/repartition.json --execute
먼저 Repartition 을 적용하기 이전의 상태입니다.
### 2021-05-08T19:30:26.564725Z onPartitionsAssigned server : 2, partition : [test-topic-3] ### 2021-05-08T19:30:26.569458Z onPartitionsAssigned server : 1, partition : [test-topic-2] ### 2021-05-08T19:30:26.569692Z onPartitionsAssigned server : 3, partition : [test-topic-0, test-topic-1]
그리고 Repartitioning 이 수행되어도 Rebalance 는 되지 않았습니다.
max.poll.interval.ms 의 초과.
Kafka Consumer 의 Polling 주기가 max.poll.interval.ms 를 초과하게 되면 Rebalance 가 발생합니다.
실제 코드와 함께 한번 예시를 들어보겠습니다.
max.poll.interval.ms 를 10초로 설정하고, 각 Polling 사이에 30초의 Delay 를 추가하여 Polling Interval 을 초과하도록 설정하였습니다.
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10 * 1000); while (true) { System.out.println(String.format("### %s start Polling", Instant.now().toString())); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 * 1)); System.out.println(String.format("### %s start sleep 30 secs", Instant.now().toString())); Thread.sleep(30 * 1000); records.forEach(stringStringConsumerRecord -> System.out.println(stringStringConsumerRecord)); }
그리고 Consumer 0 과 Consumer 1 을 두어 4개의 Partition 들을 처리합니다.
그림으로 표현하면 아래와 같습니다.
Consumer 0 상태
Consumer 0 은 의도적으로 max.poll.interval.ms 를 초과하도록 Polling 주기를 설정하였습니다.
아래와 같이 30초를 주기로 onPartitionsLost 와 onPartitionsAssigned 가 발생합니다.
### 2021-05-08T20:00:52.474904Z onPartitionsLost server : 0, partition : [test-topic-0, test-topic-1] ### 2021-05-08T20:00:53.474658Z start sleep 30 secs ### 2021-05-08T20:01:23.479044Z start Polling ### 2021-05-08T20:01:24.481462Z start sleep 30 secs ### 2021-05-08T20:01:54.483830Z start Polling ### 2021-05-08T20:01:54.663388Z onPartitionsAssigned server : 0, partition : [test-topic-2, test-topic-3] ### 2021-05-08T20:01:55.485335Z start sleep 30 secs ### 2021-05-08T20:02:25.493237Z start Polling ### 2021-05-08T20:02:25.500444Z onPartitionsLost server : 0, partition : [test-topic-2, test-topic-3] ### 2021-05-08T20:02:26.503201Z start sleep 30 secs ### 2021-05-08T20:02:56.507846Z start Polling ### 2021-05-08T20:02:57.510367Z start sleep 30 secs
ConsumerRebalanceListener 의 Callback 들은 Poll 함수의 내부에서 호출됩니다.
그래서 Polling 의 주기가 길어질수록 Callback 의 호출 또한 더딥니다.
마치며.
Rebalance 를 유발하는 다양한 케이스에 대해서 알아보았습니다.
다음 글에서는 Rebalance 시에 Data Loss 나 Duplication 을 방지하는 방법에 대해서 알아보도록 하겠습니다.
감사합니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka-Streams] Json 기반 Custom Serdes 구현하기 (0) 2024.02.17 [Kafka] Transaction Coordinator 알아보기 (0) 2024.02.07 [Kafka] Kafka Rebalance Protocol 알아보기 ( JoinGroup, LeaveGroup ) (0) 2024.02.04 [Kafka] API Version 알아보기 ( Protocol ) (0) 2024.01.31 [Kafka Producer] Data Loss 는 언제 발생할까 ? (0) 2024.01.21