ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Repartition 알아보기.
    Kafka 2023. 12. 22. 07:49
    728x90
    반응형

    - 목차

     

    소개.

    이번 글에서는 카프카 토픽이 어떻게 repartition 되는지에 대해서 알아보고자 합니다. 

     

    Repartition 은 토픽의 파티션을 여러 브로커로 분배하는 행위를 의미합니다. 

    토픽을 생성하게 되면 파티션의 갯수와 Replication-Factor 의 값을 설정합니다. 

    그리고 생성된 파티션들은 여러 브로커들로 분배되게 됩니다. 

    만약 브로커가 3개 그리고 토픽의 파티션이 3개라면 각각의 파티션들은 각 브로커에서 관리됩니다. 

    아래와 같이 말이죠. 

    broker1 - partition1
    broker2 - partition2
    broker3 - partition3

     

     

    하지만 브로커가 추가되거나, 제거되는 경우 그리고 파티션이 늘거나 줄어드는 경우에는 

    파티션의 재분배가 필요해집니다. 

     

    Kafka Cluster 생성하기.

    docker 를 사용해서 카프카 클러스터를 간단하게 생성해보겠습니다. 

     

    < kafka docker compose yaml >

    먼저 kafka docker compose yaml 파일을 생성합니다. 

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092"
        depends_on:
          - "kafka1"
          - "kafka2"
          - "kafka3"      
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
        networks:
          - kafka      
      kafka1:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka1
        depends_on:
          - zookeeper
        ports:
          - "29091:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29091
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3
        networks:
          - kafka
      kafka2:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka2 
        depends_on:
          - zookeeper
        ports:
          - "29092:9092"
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
      kafka3:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka3
        depends_on:
          - zookeeper
        ports:
          - "29093:9092"
        environment:
          KAFKA_BROKER_ID: 3
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:29093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
    networks:
      kafka:
        driver: bridge
    EOF

     

    < kafka docker compose up command >

    kafka-docker-compose.yaml 을 실행하여 카프카 컨테이너들을 실행합니다. 

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

    < Topic 생성하기 >

    위 docker-compose 가 실행된다면 9000 포트를 통해 kafdrop 웹UI 로 접속할 수 있습니다. 

    (http://localhost:9000)

    그리고 messages 라는 이름의 Topic 을 하나 생성합니다. 

    partition 은 3개, replication-factor 도 3 으로 설정하였습니다. 

     

     

    Repartition.

    카프카 클러스터와 토픽이 생성되었다면 Repartition 을 수행해보도록 하겠습니다. 

    우선 현재 "messages" 토픽의 파티션들과 분배된 상태를 확인해보겠습니다.

    아래 이미지처럼 messages 토픽은 0, 1, 2 에 해당하는 3개의 파티션이 존재합니다. 

    그리고 Replica Nodes 열을 보면 "1,2,3" 이라는 값을 확인할 수 있습니다. 

    이 뜻은 모든 파티션이 모든 브로커에 존재한다는 의미인데요.

    Replication-Factor 값이 3 이고 브로커의 갯수 또한 3 이기 때문에 모든 브로커에 모든 파티션이 존재합니다. 

    그리고 Leader Node 의 값은 해당하는 파티션의 리더 브로커를 의미합니다. 

    그래서 "messages" 토픽의 각 파티션에 쓰여지는 Write 요청은 해당하는 리더 브로커가 책임지게 됩니다. 

     

     

    저는 Kafka Broker 를 하나 추가한 이후에 repartition 을 수행해보도록 하겠습니다. 

     

    < Kafka Broker 추가 >

    id 가 4 인 새로운 브로커를 추가합니다. 

    docker run -d --name kafka4 -p 29094:9092 \
    -e KAFKA_BROKER_ID=4 \
    -e KAFKA_ZOOKEEPER_CONNECT='zookeeper:2181' \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka4:9092,PLAINTEXT_HOST://localhost:29094 \
    -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
    -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
    -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
    -e KAFKA_MESSAGE_MAX_BYTES=10000000 \
    -e KAFKA_SOCKET_REQUEST_MAX_BYTES=100001200 \
    -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=10000000 \
    -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
    -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=3 \
    --network kafka_kafka confluentinc/cp-kafka:7.4.3

     

    새로운 브로커가 추가된 이후에 kafdrop 을 확인해보면 아래와 같이 4번 브로커가 추가되었음을 확인할 수 있습니다. 

     

    그리고 repartition 을 한번 수행해보도록 하겠습니다. 

    cat <<EOF> /tmp/repartition.json
    {
      "version": 1,
      "partitions": [
        {"topic": "messages", "partition": 0, "replicas": [1, 2, 4]},
        {"topic": "messages", "partition": 1, "replicas": [2, 4, 1]},
        {"topic": "messages", "partition": 2, "replicas": [3, 4, 2]}
      ]
    }
    EOF
    
    kafka-reassign-partitions --bootstrap-server localhost:9092 \
    --reassignment-json-file /tmp/repartition.json --execute

     

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"messages","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"messages","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"messages","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started partition reassignments for messages-0,messages-1,messages-2

     

     

    kafka-reassign-partitions 수행을 마치면 아래와 같이 새로운 브로커인 4번 브로커로 파티션들이 분배됨을 알 수 있습니다. 

     

    그리고 messags-0, messages-1 에 해당하는 로그 파일도 생성됩니다. 

    ls -lh /var/lib/kafka/data/
    total 28K
    -rw-r--r-- 1 appuser appuser    4 Dec 25 02:56 cleaner-offset-checkpoint
    -rw-r--r-- 1 appuser appuser    4 Dec 24 02:57 log-start-offset-checkpoint
    drwxr-xr-x 2 appuser appuser 4.0K Dec 25 02:56 messages-0
    drwxr-xr-x 2 appuser appuser 4.0K Dec 24 02:56 messages-1
    -rw-r--r-- 1 appuser appuser   88 Dec 24 23:49 meta.properties
    -rw-r--r-- 1 appuser appuser   30 Dec 25 02:57 recovery-point-offset-checkpoint
    -rw-r--r-- 1 appuser appuser   30 Dec 25 02:58 replication-offset-checkpoint

     

     

     

    브로커가 줄어든 상태에서의 Repartitioning.

    4번 브로커를 종료시켜보도록 하겠습니다.

    docker stop kafka4

     

    4번 브로커가 종료되어 In-sync Replica Nodes 에서 4번 브로커는 제외됩니다. 

    참고로 In-sync Replica Nodes 는 Write 과 Replication 에 관여하는 브로커들을 의미합니다. 

    이 상황에서 다시 Repartitioning 을 수행해보도록 하겠습니다. 

    cat <<EOF> /tmp/repartition.json
    {
      "version": 1,
      "partitions": [
        {"topic": "messages", "partition": 0, "replicas": [1, 2, 3]},
        {"topic": "messages", "partition": 1, "replicas": [2, 3, 1]},
        {"topic": "messages", "partition": 2, "replicas": [3, 1, 2]}
      ]
    }
    EOF
    
    kafka-reassign-partitions --bootstrap-server localhost:9092 \
    --reassignment-json-file /tmp/repartition.json --execute

     

    kafka-reassign-partitions 명령어가 수행되면 아래 이미지처럼 1,2,3 번 브로커로 파티션들이 재분배됩니다. 

     

     

    그리고 1번 브로커의 Log Segment 를 조회해보면 아래와 같이 0,1,2 파티션과 관련된 파일들이 존재하게 됩니다. 

    ls -lh /var/lib/kafka/data/
    total 32K
    -rw-r--r-- 1 appuser appuser    4 Dec 24 02:56 cleaner-offset-checkpoint
    -rw-r--r-- 1 appuser appuser    4 Dec 25 03:06 log-start-offset-checkpoint
    drwxr-xr-x 2 appuser appuser 4.0K Dec 25 03:04 messages-0
    drwxr-xr-x 2 appuser appuser 4.0K Dec 24 03:00 messages-1
    drwxr-xr-x 2 appuser appuser 4.0K Dec 24 03:04 messages-2
    -rw-r--r-- 1 appuser appuser   88 Dec 25 23:49 meta.properties
    -rw-r--r-- 1 appuser appuser   43 Dec 25 03:06 recovery-point-offset-checkpoint
    -rw-r--r-- 1 appuser appuser   43 Dec 25 03:07 replication-offset-checkpoint

     

    반응형
Designed by Tistory.