ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Static Membership 과 Partition Assignment 관계 알아보기 ( group.instance.id )
    Kafka/Kafka Consumer 2024. 6. 30. 14:57
    반응형

    - 목차

     

    들어가며.

    Kafka Consumer 들은 group.id 를 기반으로 하나의 Consumer Group 을 형성합니다.

    그리고 각각의 Consumer 들은 자신의 고유한 member id 를 가지게 되는데요.

    이 member id 는 Consumer 가 Group Coordinator 에게 JoinGroup API 를 요청함으로써 생성됩니다.

    즉, Consumer 는 Group Coordinator 가 생성해주는 랜덤한 식별값을 전달받아 자신의 member id 로써 사용합니다.

    이 과정에서 Leader Consumer 가 선정되고, 나머지 Consumer 들은 Follower Consumer 가 됩니다.

    ( 가장 먼저 JoinGroup API 를 요청한 Consumer 가 Leader Consumer 가 되는 구조입니다. )

     

    Leader Consumer 는 중요한 역할을 한가지 부여받습니다.

    이 중요한 역할은 Consumer 와 Partition 를 할당하는 작업입니다.

    Leader Consumer 는 Group Coordinator 로부터 모든 Member 등의 목록을 전달받게 됩니다.

    그리고 Leader Consumer 는 이미 Topic 과 Partition 의 정보를 가지고 있는데요. ( Metadata API 를 통해서 )

    Leader Consumer 는 보유하고 있는 Partition 과 Member 들의 정보를 기반으로 Partition Assignment 를 수행합니다.

     

    만약에 member-1, member-2, member-3 과 partition-1, partition-2, partition-3 이 있다고 가정할 때에,

    Range, Sticky, Roundrobin 등의 할당 방식을 기반으로 Member 와 Partition 을 Assigning 하게 됩니다.

    이것이 일반적인 동적 방식의 Partition Assignment 입니다.

     

    하지만 이러한 방식은 문제점이 존재합니다.

    문제점은 바로 리밸런싱으로 인한 파티션의 재할당될 때에 Consumer 가 기존에  처리하던 Partition 을 재할당받을 확률이 매우 적다는 점입니다.

    일반적으로 Consumer 가 추가/제거되는 경우에 리밸런싱이 발생하는데,

    Rebalance In Progress 상태에서 모든 Consumer 는 Group Coordinator 로부터 새로운 member id 를 부여받게 되는데요.

    이때마다 새롭게 Partition Assigning 이 되기 때문에 기본에 Caching 된 레코드들을 사용할 수 없게 되거나 데이터의 중복 처리의 위험 또한 존재합니다.

     

    이어지는 내용에서 Static Membership 에 대한 설명과 이러한 방식의 장점을 알아보도록 하겠습니다.

     

    Partition Assignment 는 어떻게 수행되는가 ?

    아래 그림은 JoinGroup API 의 Response 의 정보를 표현하는 이미지입니다.

    "blue-ninjas" 라는 Consumer Group 이 존재하고, 이 안에 3개의 Consumer 들이 존재합니다.

    그 중에서 consumer 1 이 Group Leader 로 선택됩니다.

    모든 Consumer 는 Group Coordinator 로부터 JoinGroup API Response 를 전달받게 되는데요.

    Group Leader 만이 모든 Member 의 정보를 전달받습니다.

    ( 아래 그림에서는 Topic-Partition 의 정보를 함께 전달받도록 되어 있지만, 사실상 Member ID 목록만을 전달받습니다. )

    출처 : https://www.confluent.io/ko-kr/blog/apache-kafka-data-access-semantics-consumers-and-membership/

     

     

    그리고 모든 Consumer 들은 Metadata API 를 통해서 Topic 과 Partition 목록의 정보를 보유하고 있습니다.

    이렇게 Leader Consumer 는 Partition 목록과 Member 목록과 파티션 할당 방식 (Range, RoundRobin, Sticky) 에 대한 정보를 가집니다.

    따라서 파티션 할당 정책에 맞게 파티션과 멤버를 매칭시키게 됩니다.

     

    이렇게 매칭된 파티션 할당 정보를 SyncGroup API 를 통해서 브로커에게 전달하게 됩니다.

     

    출처 : https://www.confluent.io/ko-kr/blog/apache-kafka-data-access-semantics-consumers-and-membership/

     

     

    여기서 중요한 포인트는 Member ID 가 Group Coordinator 가 생성하는 난수라는 점입니다.

    따라서 Leader Consumer 가 Partition Assignment 를 수행할 때에 난수를 기반으로 매칭을 하다보니 매번 그 결과가 동일하지 않습니다.

    Client ID 가 A 인 Consumer 라고 할지언정, 매번 리밸런싱이 발생할 때마다 Client ID A 인 Consumer 의 Member ID 는 새로운 값이 됩니다.

     

    Static Membership 이란 ?

    Kafka Consumer 실행 시에 group.instance.id 라는 설정값을 추가합니다.

    이는 하나의 Consumer Group 에 소속된 Consumer 는 고유한 group.instance.id 를 가져야 합니다.

    즉, 중복된 group.instance.id 가 하나의 Consumer Group 에 존재해선 안됩니다.

    ( 만약 여러 Consumer 들이 중복된 group.instance.id 를 사용한다면 효율적인 Partition Assignment 가 수행되지 않습니다. )

     

    아래의 이미지를 간단히 설명드리면,

    group.id 가 1 이라는 Consumer Group 이 존재하고, 총 3개의 Consumer 들이 존재합니다.

    각 Consumer 의 group.instance.id 는 1, 2, 3 으로 표시되어 있죠 ?

    이렇게 group.instance.id 는 Consumer 의 고유한 식별값으로 사용되어야 합니다.

    출처 : https://developer.confluent.io/courses/architecture/consumer-group-protocol/

     

     

    Member Id 는 동적인 값이지만, group.instance.id 는 정적인 값이다.

    Consumer 가 JoinGroup API 를 통해서 Member Id 를 Group Coordinator 로부터 전달받는데요.

    이 값은 매번 동적인 값을 가집니다.

    아래의 정보는 kafka-consumer-groups.sh 명령어의 실행 결과입니다.

    이 명령어를 통해서 Consumer 의 Member ID 를 확인할 수 있는데요.

    Consuemr 의 생성과 제거를 반복하면서 Consumer 의 Member Id 가 고정적이지 않음을 확인하기 위해 아래의 정보를 기록해보았습니다.

     

    GROUP             TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                            HOST            CLIENT-ID
    my-consumer-group test-topic      1          219             228             9               this_is_client_id-e643b96d-60b5-45c3-92ba-5cbcca3ba0a5 /172.19.0.6     this_is_client_id
    my-consumer-group test-topic      0          211             221             10              this_is_client_id-db90c372-bd4a-4490-b921-56b720da0a67 /172.19.0.5     this_is_client_id
    GROUP             TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                            HOST            CLIENT-ID
    my-consumer-group test-topic      1          219             228             9               this_is_client_id-4cbcf596-7ef9-40e2-a5f5-0e2c777ba74a /172.19.0.5     this_is_client_id
    my-consumer-group test-topic      0          211             221             10              this_is_client_id-23623d35-598c-4ba1-a4f3-6de0dff5a73a /172.19.0.6     this_is_client_id

     

    이처럼 Consumer 의 Member Id 는 매번 새로운 값이 설정됩니다.

    따라서 Leader Consumer 가 Member Id 와 Partition 정보를 기반으로 Partition Assignment 를 수행할 때마다 매번 다른 파티션 할당이 발생할 수 밖에 없습니다.

    왜냐하면 member id 를 기반으로 파티션 할당이 수행되는데, 리더 브로커는 새로운 난수인 Member Id 가 이전 Epoch 에서 사용된 Consumer 인지 아닌지 여부를 알 수 없습니다.

     

    group.instance.id 를 사용하면 어떤 장점이 있는가 ?

    Consumer 가 group.instance.id 를 가지게 되면, Leader Consumer 가 Partition Assignment 를 수행할 때에 Member Id 가 아닌 group.instance.id 를 기준으로 파티션 할당을 수행합니다.

    그러니깐 리밸런싱이 발생한 후에 Leader Consumer 는 Member 와 Partition 의 할당을 집행해야합니다.

    기존 방식에서는 랜덤값인 member id 와 Partition 을 연결짓다보니 매번 새로운 Member-Partition Pair 가 생성될 확률이 높습니다.

    반면 고정적인 group.instance.id 와 Partition 의 Pair 를 만들때에는 기존의 연결 방식과 동일할 확률이 매우 높습니다.

     

    예를 들어, Range 기반의 할당방식을 사용한다고 가정합니다.

    그리고 알파벳 순서로 정렬된 group.instance.id 와 Partition Index 를 기준으로 파티션을 할당할 때에

    group.instance.id 가 고정적이기 때문에 이전 Epoch 와 동일한 방식으로 파티션 할당이 될 확률이 매우 높습니다.

     

    그렇게 때문에 기존의 Consumer 가 처리하던 Partition 을 리밸런싱 이후에도 동일하게 처리하기 때문에 캐싱된 레코드들을 유지할 수 있고, 새롭게 레코드들을 Fetch 하지 않아도 됩니다.

     

     

     

     

    카프카 클러스터 실행.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=2 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:29092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,INNER://kafka2:9091,OUTER://localhost:29092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 29092:29092 \
      bitnami/kafka:3.1.2

     

    kafka-topics.sh --bootstrap-server kafka1:9092 \
    	--topic test-topic \
    	--create \
    	--replication-factor 2 \
    	--partitions 2

     

    kafka-topics.sh --bootstrap-server kafka1:9092 \
      --topic test-topic \
      --delete

     

     

    반응형
Designed by Tistory.