ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Zookeeper 는 Broker 를 어떻게 관리할까 ?
    Kafka 2023. 12. 22. 07:49
    728x90
    반응형

    - 목차

     

    함께 보면 좋은 글.

    https://westlife0615.tistory.com/484

     

    Zookeeper Znode 알아보기

    - 목차 소개. Zookeeper 는 Znode 라는 데이터 저장기능이 존재합니다. Znode 는 데이터를 저장할 수 있는 논리적인 개념인데요. Hadoop NameNode 의 Namespace 와 같이 가상의 저장 개념입니다. Znode 는 파일시

    westlife0615.tistory.com

    https://westlife0615.tistory.com/485

     

    Zookeeper Watch Mechanism 알아보기

    - 목차 소개. Zookeeper 의 Watch Mechanism 은 클라이언트와 주키퍼의 양방향 통신을 가능하게 합니다. 클라이언트는 znode 를 생성/수정/삭제/조회를 할 수 있습니다. 주키퍼는 이러한 클라이언트의 read/

    westlife0615.tistory.com

    소개.

    카프카 브로커는 자신의 고유한 아이디를 가집니다. 

    브로커가 생성될 때에 KAFKA_BROKER_ID 라는 환경변수를 설정함으로써 브로커의 고유한 아이디가 결정이 되는데요. 

    주로 1, 2, 3 과 같은 방식으로 값을 설정합니다. 

    이렇게 카프카 클러스터 내부에서 브로커들은 자신의 ID 로 식별됩니다. 

     

    그리고 카프카 클러스터는 ZooKeeper 에 의해서 관리됩니다. 

    생성된 브로커는 자신의 정보와 상태를 ZooKeeper 에게 전달합니다. 

    그리고 ZooKeeper 는 ZNode 라는 저장소에 브로커의 정보를 저장합니다. 

    Broker-1 이라는 브로커가 생성된 경우, ZooKeeper 는 1 번 브로커 정보는 ZNode 에 저장합니다. 

    Broker-2, Broker-3 이 차례로 생성될 때마다 ZooKeeper 는 이들의 정보를 저장합니다. 

     

    만약 Broker-1 이 중지되는 경우에는 ZooKeeper 는 Broker-1 이 비정상 상태에 빠졌다고 판단하게 됩니다. 

    그리고 Broker-1 에 해당하는 ZNode 를 제거하게 되는데요. 

    이 과정에서 Broker-2 와 Broker-3 은 Broker-1 이 비정상 상태에 빠졌다는 사실을 알게 되며, 

    이 상황에 알맞은 대처를 하게 됩니다. 

     

    위 내용에서 설명한 내용들은 

    - Ephemeral ZNode

    - Watch Mechanism

    - Heartbeat

    - Leader Election

    등의 내용을 포괄합니다. 

     

    이번 글에서 ZooKeeper 가 Broker 들을 어떻게 관리하는지에 대해 상세히 알아보려고 합니다. 

     

    카프카 클러스터 실행하기.

    직접 눈으로 살펴보는 것이 가장 좋은 공부이기 때문에 실습을 위해서 간단한 카프카 클러스터를 실행시켜보겠습니다.

    docker-compose 를 통해서 카프카 클러스터를 실행하겠습니다. 

     

    < kafka docker compose yaml >

    먼저 kafka docker compose yaml 파일을 생성합니다. 

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092"
        depends_on:
          - "kafka1"
          - "kafka2"
          - "kafka3"      
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
        networks:
          - kafka      
      kafka1:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka1
        depends_on:
          - zookeeper
        ports:
          - "29091:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29091
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3
        networks:
          - kafka
      kafka2:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka2 
        depends_on:
          - zookeeper
        ports:
          - "29092:9092"
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
      kafka3:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka3
        depends_on:
          - zookeeper
        ports:
          - "29093:9092"
        environment:
          KAFKA_BROKER_ID: 3
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:29093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
    networks:
      kafka:
        driver: bridge
    EOF

     

    < kafka docker compose up command >

    kafka-docker-compose.yaml 을 실행하여 카프카 컨테이너들을 실행합니다. 

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

     

    Broker Znode 살펴보기.

    ZooKeeper 는 카프카 브로커에 해당하는 ZNode 를 관리합니다. 

    아래 명령어는 zookeeper 의 모든 ZNode 를 조회하는 명령어입니다. 

    많은 ZNode 목록들이 조회되는데요. 

    이들 중에서 "/brokers/ids" 에 해당하는 ZNode 만을 살펴볼 예정입니다. 

     

    docker exec -it zookeeper zookeeper-shell localhost:2181 ls -R /
    /
    /admin
    /brokers
    /cluster
    /config
    /consumers
    /controller
    /controller_epoch
    /feature
    /isr_change_notification
    /latest_producer_id_block
    /log_dir_event_notification
    /zookeeper
    /admin/delete_topics
    /brokers/ids
    /brokers/seqid
    /brokers/topics
    /brokers/ids/1
    /brokers/ids/2
    /brokers/ids/3
    /cluster/id
    /config/brokers
    /config/changes
    /config/clients
    /config/ips
    /config/topics
    /config/users
    /zookeeper/config
    /zookeeper/quota

     

     

    "/brokers/ids" ZNode 생성된 브로커들의 상태를 나타냅니다. 

    host, port 등의 정보 등 각 브로커의 정보를 포함합니다. 

    docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/1
    docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/2
    docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/3

     

    {
      "features": {},
      "listener_security_protocol_map": {
        "PLAINTEXT": "PLAINTEXT",
        "PLAINTEXT_HOST": "PLAINTEXT"
      },
      "endpoints": [
        "PLAINTEXT://kafka1:9092",
        "PLAINTEXT_HOST://localhost:29091"
      ],
      "jmx_port": -1,
      "port": 9092,
      "host": "kafka1",
      "version": 5,
      "timestamp": "1703672155381"
    }
    
    {
      "features": {},
      "listener_security_protocol_map": {
        "PLAINTEXT": "PLAINTEXT",
        "PLAINTEXT_HOST": "PLAINTEXT"
      },
      "endpoints": [
        "PLAINTEXT://kafka2:9092",
        "PLAINTEXT_HOST://localhost:29092"
      ],
      "jmx_port": -1,
      "port": 9092,
      "host": "kafka2",
      "version": 5,
      "timestamp": "1703672155660"
    }
    
    {
      "features": {},
      "listener_security_protocol_map": {
        "PLAINTEXT": "PLAINTEXT",
        "PLAINTEXT_HOST": "PLAINTEXT"
      },
      "endpoints": [
        "PLAINTEXT://kafka3:9092",
        "PLAINTEXT_HOST://localhost:29093"
      ],
      "jmx_port": -1,
      "port": 9092,
      "host": "kafka3",
      "version": 5,
      "timestamp": "1703672155710"
    }

     

    Ephemeral ZNode.

    ZooKeeper 는 두가지 ZNode 종류를 가집니다. 

    Persistent ZNode 와 Ephemeral ZNode 인데요. 

    이들의 차이점은 데이터의 저장 기간과 관련있습니다. 

    이름처럼 Persistent ZNode 는 영구적으로 데이터를 저장하구요. 

    Ephemeral ZNode 는 임시적으로 데이터가 저장됩니다. 

    클라이언트와 ZooKeeper 사이의 세션이 유지되는 기간 동안만 ZNode 의 Persistency 가 유지됩니다. 

     

    카프카 브로커와 ZooKeeper 는 Ephemeral ZNode 방식으로 데이터를 관리합니다. 

    이 말인즉슨, 브로커가 종료되거나 비정상 상태에 빠지게 되면 Broker - ZooKeeper 사이의 세션이 종료됩니다. 

    그리고 ZNode 는 지워지게 되죠. 

     

    백문이 불여일견입니다. 실제로 한번 확인해보도록 하겠습니다.

    kafka1 브로커는 종료시켜보겠습니다. 

    docker stop kafka1

     

    그리고 "/brokers/ids" ZNode 를 한번 확인해보겠습니다. 

    docker exec -it zookeeper zookeeper-shell localhost:2181 ls -R /brokers/ids
    Connecting to localhost:2181
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    /brokers/ids
    /brokers/ids/2
    /brokers/ids/3

     

    위 결과처럼 1번에 해당하는 ZNode 는 사라지게 됩니다. 

     

     

    Watch Mechanism.

    카프카 브로커는 주키퍼의 클라이언트입니다. 

    그리고 주키퍼 클라이언트는 Watch Mechanism 이라는 방식으로 ZNode 의 상태를 확인할 수 있는데요. 

    만약 ZNode 의 상태가 변경된다면 주키퍼는 클라이언트에게 ZNode 의 상태변경을 알려줍니다. 

    그리고 클라이언트는 그 Notification 에 대응하는 처리를 수행할 수 있습니다. 

     

    카프카 브로커는 주키퍼에 브로커 상태정보를 Ephemeral ZNode 로써 저장합니다. 

    그리고 브로커와 주키퍼의 세션이 종료되어 Ephemeral ZNode 가 삭제된다면, 

    Watch Mechanism 에 의해서 다른 모든 브로커들은 특정 브로커가 비정상 상태임을 확인할 수 있습니다. 

    예를 들어, 

    Broker1 이 비정상 종료되어 Broker1 - ZooKeeper 의 관계가 끊어졌다고 가정하겠습니다. 

    그럼 Broker1 - ZooKeeper 사이의 ZNode 가 없어지게 되고, 

    이에 대한 알림은 Broker2, Broker3 에게 전달됩니다. 

    Broker2 와 Broker3 는 Leader Election, Repartition 등의 대응을 수행할 수 있게 됩니다. 

     

    반응형

    'Kafka' 카테고리의 다른 글

    [Kafka Streams] Config 알아보기  (0) 2023.12.29
    Kafka Log Segment 알아보기  (0) 2023.12.25
    Kafka Repartition 알아보기.  (0) 2023.12.22
    Docker 로 Kafka Cluster 구축해보기.  (0) 2023.12.16
    Kafka Ksql 구현하기  (0) 2023.12.11
Designed by Tistory.