-
[Kafka] Static Membership 과 Partition Assignment 관계 알아보기 ( group.instance.id )Kafka/Kafka Consumer 2024. 6. 30. 14:57반응형
- 목차
들어가며.
Kafka Consumer 들은 group.id 를 기반으로 하나의 Consumer Group 을 형성합니다.
그리고 각각의 Consumer 들은 자신의 고유한 member id 를 가지게 되는데요.
이 member id 는 Consumer 가 Group Coordinator 에게 JoinGroup API 를 요청함으로써 생성됩니다.
즉, Consumer 는 Group Coordinator 가 생성해주는 랜덤한 식별값을 전달받아 자신의 member id 로써 사용합니다.
이 과정에서 Leader Consumer 가 선정되고, 나머지 Consumer 들은 Follower Consumer 가 됩니다.
( 가장 먼저 JoinGroup API 를 요청한 Consumer 가 Leader Consumer 가 되는 구조입니다. )
Leader Consumer 는 중요한 역할을 한가지 부여받습니다.
이 중요한 역할은 Consumer 와 Partition 를 할당하는 작업입니다.
Leader Consumer 는 Group Coordinator 로부터 모든 Member 등의 목록을 전달받게 됩니다.
그리고 Leader Consumer 는 이미 Topic 과 Partition 의 정보를 가지고 있는데요. ( Metadata API 를 통해서 )
Leader Consumer 는 보유하고 있는 Partition 과 Member 들의 정보를 기반으로 Partition Assignment 를 수행합니다.
만약에 member-1, member-2, member-3 과 partition-1, partition-2, partition-3 이 있다고 가정할 때에,
Range, Sticky, Roundrobin 등의 할당 방식을 기반으로 Member 와 Partition 을 Assigning 하게 됩니다.
이것이 일반적인 동적 방식의 Partition Assignment 입니다.
하지만 이러한 방식은 문제점이 존재합니다.
문제점은 바로 리밸런싱으로 인한 파티션의 재할당될 때에 Consumer 가 기존에 처리하던 Partition 을 재할당받을 확률이 매우 적다는 점입니다.
일반적으로 Consumer 가 추가/제거되는 경우에 리밸런싱이 발생하는데,
Rebalance In Progress 상태에서 모든 Consumer 는 Group Coordinator 로부터 새로운 member id 를 부여받게 되는데요.
이때마다 새롭게 Partition Assigning 이 되기 때문에 기본에 Caching 된 레코드들을 사용할 수 없게 되거나 데이터의 중복 처리의 위험 또한 존재합니다.
이어지는 내용에서 Static Membership 에 대한 설명과 이러한 방식의 장점을 알아보도록 하겠습니다.
Partition Assignment 는 어떻게 수행되는가 ?
아래 그림은 JoinGroup API 의 Response 의 정보를 표현하는 이미지입니다.
"blue-ninjas" 라는 Consumer Group 이 존재하고, 이 안에 3개의 Consumer 들이 존재합니다.
그 중에서 consumer 1 이 Group Leader 로 선택됩니다.
모든 Consumer 는 Group Coordinator 로부터 JoinGroup API Response 를 전달받게 되는데요.
Group Leader 만이 모든 Member 의 정보를 전달받습니다.
( 아래 그림에서는 Topic-Partition 의 정보를 함께 전달받도록 되어 있지만, 사실상 Member ID 목록만을 전달받습니다. )
출처 : https://www.confluent.io/ko-kr/blog/apache-kafka-data-access-semantics-consumers-and-membership/ 그리고 모든 Consumer 들은 Metadata API 를 통해서 Topic 과 Partition 목록의 정보를 보유하고 있습니다.
이렇게 Leader Consumer 는 Partition 목록과 Member 목록과 파티션 할당 방식 (Range, RoundRobin, Sticky) 에 대한 정보를 가집니다.
따라서 파티션 할당 정책에 맞게 파티션과 멤버를 매칭시키게 됩니다.
이렇게 매칭된 파티션 할당 정보를 SyncGroup API 를 통해서 브로커에게 전달하게 됩니다.
출처 : https://www.confluent.io/ko-kr/blog/apache-kafka-data-access-semantics-consumers-and-membership/ 여기서 중요한 포인트는 Member ID 가 Group Coordinator 가 생성하는 난수라는 점입니다.
따라서 Leader Consumer 가 Partition Assignment 를 수행할 때에 난수를 기반으로 매칭을 하다보니 매번 그 결과가 동일하지 않습니다.
Client ID 가 A 인 Consumer 라고 할지언정, 매번 리밸런싱이 발생할 때마다 Client ID A 인 Consumer 의 Member ID 는 새로운 값이 됩니다.
Static Membership 이란 ?
Kafka Consumer 실행 시에 group.instance.id 라는 설정값을 추가합니다.
이는 하나의 Consumer Group 에 소속된 Consumer 는 고유한 group.instance.id 를 가져야 합니다.
즉, 중복된 group.instance.id 가 하나의 Consumer Group 에 존재해선 안됩니다.
( 만약 여러 Consumer 들이 중복된 group.instance.id 를 사용한다면 효율적인 Partition Assignment 가 수행되지 않습니다. )
아래의 이미지를 간단히 설명드리면,
group.id 가 1 이라는 Consumer Group 이 존재하고, 총 3개의 Consumer 들이 존재합니다.
각 Consumer 의 group.instance.id 는 1, 2, 3 으로 표시되어 있죠 ?
이렇게 group.instance.id 는 Consumer 의 고유한 식별값으로 사용되어야 합니다.
출처 : https://developer.confluent.io/courses/architecture/consumer-group-protocol/ Member Id 는 동적인 값이지만, group.instance.id 는 정적인 값이다.
Consumer 가 JoinGroup API 를 통해서 Member Id 를 Group Coordinator 로부터 전달받는데요.
이 값은 매번 동적인 값을 가집니다.
아래의 정보는 kafka-consumer-groups.sh 명령어의 실행 결과입니다.
이 명령어를 통해서 Consumer 의 Member ID 를 확인할 수 있는데요.
Consuemr 의 생성과 제거를 반복하면서 Consumer 의 Member Id 가 고정적이지 않음을 확인하기 위해 아래의 정보를 기록해보았습니다.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-consumer-group test-topic 1 219 228 9 this_is_client_id-e643b96d-60b5-45c3-92ba-5cbcca3ba0a5 /172.19.0.6 this_is_client_id my-consumer-group test-topic 0 211 221 10 this_is_client_id-db90c372-bd4a-4490-b921-56b720da0a67 /172.19.0.5 this_is_client_id
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-consumer-group test-topic 1 219 228 9 this_is_client_id-4cbcf596-7ef9-40e2-a5f5-0e2c777ba74a /172.19.0.5 this_is_client_id my-consumer-group test-topic 0 211 221 10 this_is_client_id-23623d35-598c-4ba1-a4f3-6de0dff5a73a /172.19.0.6 this_is_client_id
이처럼 Consumer 의 Member Id 는 매번 새로운 값이 설정됩니다.
따라서 Leader Consumer 가 Member Id 와 Partition 정보를 기반으로 Partition Assignment 를 수행할 때마다 매번 다른 파티션 할당이 발생할 수 밖에 없습니다.
왜냐하면 member id 를 기반으로 파티션 할당이 수행되는데, 리더 브로커는 새로운 난수인 Member Id 가 이전 Epoch 에서 사용된 Consumer 인지 아닌지 여부를 알 수 없습니다.
group.instance.id 를 사용하면 어떤 장점이 있는가 ?
Consumer 가 group.instance.id 를 가지게 되면, Leader Consumer 가 Partition Assignment 를 수행할 때에 Member Id 가 아닌 group.instance.id 를 기준으로 파티션 할당을 수행합니다.
그러니깐 리밸런싱이 발생한 후에 Leader Consumer 는 Member 와 Partition 의 할당을 집행해야합니다.
기존 방식에서는 랜덤값인 member id 와 Partition 을 연결짓다보니 매번 새로운 Member-Partition Pair 가 생성될 확률이 높습니다.
반면 고정적인 group.instance.id 와 Partition 의 Pair 를 만들때에는 기존의 연결 방식과 동일할 확률이 매우 높습니다.
예를 들어, Range 기반의 할당방식을 사용한다고 가정합니다.
그리고 알파벳 순서로 정렬된 group.instance.id 와 Partition Index 를 기준으로 파티션을 할당할 때에
group.instance.id 가 고정적이기 때문에 이전 Epoch 와 동일한 방식으로 파티션 할당이 될 확률이 매우 높습니다.
그렇게 때문에 기존의 Consumer 가 처리하던 Partition 을 리밸런싱 이후에도 동일하게 처리하기 때문에 캐싱된 레코드들을 유지할 수 있고, 새롭게 레코드들을 Fetch 하지 않아도 됩니다.
카프카 클러스터 실행.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=2 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 29092:29092 \ bitnami/kafka:3.1.2
kafka-topics.sh --bootstrap-server kafka1:9092 \ --topic test-topic \ --create \ --replication-factor 2 \ --partitions 2
kafka-topics.sh --bootstrap-server kafka1:9092 \ --topic test-topic \ --delete
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] __consumer_offsets Topic 알아보기 (0) 2024.07.01 [Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기 (0) 2024.06.30 [Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30