-
[Kafka] Zookeeper 는 Broker 를 어떻게 관리할까 ?Kafka 2023. 12. 22. 07:49728x90반응형
- 목차
함께 보면 좋은 글.
https://westlife0615.tistory.com/484
https://westlife0615.tistory.com/485
소개.
카프카 브로커는 자신의 고유한 아이디를 가집니다.
브로커가 생성될 때에 KAFKA_BROKER_ID 라는 환경변수를 설정함으로써 브로커의 고유한 아이디가 결정이 되는데요.
주로 1, 2, 3 과 같은 방식으로 값을 설정합니다.
이렇게 카프카 클러스터 내부에서 브로커들은 자신의 ID 로 식별됩니다.
그리고 카프카 클러스터는 ZooKeeper 에 의해서 관리됩니다.
생성된 브로커는 자신의 정보와 상태를 ZooKeeper 에게 전달합니다.
그리고 ZooKeeper 는 ZNode 라는 저장소에 브로커의 정보를 저장합니다.
Broker-1 이라는 브로커가 생성된 경우, ZooKeeper 는 1 번 브로커 정보는 ZNode 에 저장합니다.
Broker-2, Broker-3 이 차례로 생성될 때마다 ZooKeeper 는 이들의 정보를 저장합니다.
만약 Broker-1 이 중지되는 경우에는 ZooKeeper 는 Broker-1 이 비정상 상태에 빠졌다고 판단하게 됩니다.
그리고 Broker-1 에 해당하는 ZNode 를 제거하게 되는데요.
이 과정에서 Broker-2 와 Broker-3 은 Broker-1 이 비정상 상태에 빠졌다는 사실을 알게 되며,
이 상황에 알맞은 대처를 하게 됩니다.
위 내용에서 설명한 내용들은
- Ephemeral ZNode
- Watch Mechanism
- Heartbeat
- Leader Election
등의 내용을 포괄합니다.
이번 글에서 ZooKeeper 가 Broker 들을 어떻게 관리하는지에 대해 상세히 알아보려고 합니다.
카프카 클러스터 실행하기.
직접 눈으로 살펴보는 것이 가장 좋은 공부이기 때문에 실습을 위해서 간단한 카프카 클러스터를 실행시켜보겠습니다.
docker-compose 를 통해서 카프카 클러스터를 실행하겠습니다.
< kafka docker compose yaml >
먼저 kafka docker compose yaml 파일을 생성합니다.
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - "kafka1" - "kafka2" - "kafka3" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:2181" networks: - kafka kafka1: image: confluentinc/cp-kafka:7.4.3 container_name: kafka1 depends_on: - zookeeper ports: - "29091:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29091 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka2: image: confluentinc/cp-kafka:7.4.3 container_name: kafka2 depends_on: - zookeeper ports: - "29092:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka3: image: confluentinc/cp-kafka:7.4.3 container_name: kafka3 depends_on: - zookeeper ports: - "29093:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka networks: kafka: driver: bridge EOF
< kafka docker compose up command >
kafka-docker-compose.yaml 을 실행하여 카프카 컨테이너들을 실행합니다.
docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d
Broker Znode 살펴보기.
ZooKeeper 는 카프카 브로커에 해당하는 ZNode 를 관리합니다.
아래 명령어는 zookeeper 의 모든 ZNode 를 조회하는 명령어입니다.
많은 ZNode 목록들이 조회되는데요.
이들 중에서 "/brokers/ids" 에 해당하는 ZNode 만을 살펴볼 예정입니다.
docker exec -it zookeeper zookeeper-shell localhost:2181 ls -R /
/ /admin /brokers /cluster /config /consumers /controller /controller_epoch /feature /isr_change_notification /latest_producer_id_block /log_dir_event_notification /zookeeper /admin/delete_topics /brokers/ids /brokers/seqid /brokers/topics /brokers/ids/1 /brokers/ids/2 /brokers/ids/3 /cluster/id /config/brokers /config/changes /config/clients /config/ips /config/topics /config/users /zookeeper/config /zookeeper/quota
"/brokers/ids" ZNode 는 생성된 브로커들의 상태를 나타냅니다.
host, port 등의 정보 등 각 브로커의 정보를 포함합니다.
docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/1 docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/2 docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/ids/3
{ "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT", "PLAINTEXT_HOST": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://kafka1:9092", "PLAINTEXT_HOST://localhost:29091" ], "jmx_port": -1, "port": 9092, "host": "kafka1", "version": 5, "timestamp": "1703672155381" } { "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT", "PLAINTEXT_HOST": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://kafka2:9092", "PLAINTEXT_HOST://localhost:29092" ], "jmx_port": -1, "port": 9092, "host": "kafka2", "version": 5, "timestamp": "1703672155660" } { "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT", "PLAINTEXT_HOST": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://kafka3:9092", "PLAINTEXT_HOST://localhost:29093" ], "jmx_port": -1, "port": 9092, "host": "kafka3", "version": 5, "timestamp": "1703672155710" }
Ephemeral ZNode.
ZooKeeper 는 두가지 ZNode 종류를 가집니다.
Persistent ZNode 와 Ephemeral ZNode 인데요.
이들의 차이점은 데이터의 저장 기간과 관련있습니다.
이름처럼 Persistent ZNode 는 영구적으로 데이터를 저장하구요.
Ephemeral ZNode 는 임시적으로 데이터가 저장됩니다.
클라이언트와 ZooKeeper 사이의 세션이 유지되는 기간 동안만 ZNode 의 Persistency 가 유지됩니다.
카프카 브로커와 ZooKeeper 는 Ephemeral ZNode 방식으로 데이터를 관리합니다.
이 말인즉슨, 브로커가 종료되거나 비정상 상태에 빠지게 되면 Broker - ZooKeeper 사이의 세션이 종료됩니다.
그리고 ZNode 는 지워지게 되죠.
백문이 불여일견입니다. 실제로 한번 확인해보도록 하겠습니다.
kafka1 브로커는 종료시켜보겠습니다.
docker stop kafka1
그리고 "/brokers/ids" ZNode 를 한번 확인해보겠습니다.
docker exec -it zookeeper zookeeper-shell localhost:2181 ls -R /brokers/ids
Connecting to localhost:2181 WATCHER:: WatchedEvent state:SyncConnected type:None path:null /brokers/ids /brokers/ids/2 /brokers/ids/3
위 결과처럼 1번에 해당하는 ZNode 는 사라지게 됩니다.
Watch Mechanism.
카프카 브로커는 주키퍼의 클라이언트입니다.
그리고 주키퍼 클라이언트는 Watch Mechanism 이라는 방식으로 ZNode 의 상태를 확인할 수 있는데요.
만약 ZNode 의 상태가 변경된다면 주키퍼는 클라이언트에게 ZNode 의 상태변경을 알려줍니다.
그리고 클라이언트는 그 Notification 에 대응하는 처리를 수행할 수 있습니다.
카프카 브로커는 주키퍼에 브로커 상태정보를 Ephemeral ZNode 로써 저장합니다.
그리고 브로커와 주키퍼의 세션이 종료되어 Ephemeral ZNode 가 삭제된다면,
Watch Mechanism 에 의해서 다른 모든 브로커들은 특정 브로커가 비정상 상태임을 확인할 수 있습니다.
예를 들어,
Broker1 이 비정상 종료되어 Broker1 - ZooKeeper 의 관계가 끊어졌다고 가정하겠습니다.
그럼 Broker1 - ZooKeeper 사이의 ZNode 가 없어지게 되고,
이에 대한 알림은 Broker2, Broker3 에게 전달됩니다.
Broker2 와 Broker3 는 Leader Election, Repartition 등의 대응을 수행할 수 있게 됩니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka Streams] Config 알아보기 (0) 2023.12.29 Kafka Log Segment 알아보기 (1) 2023.12.25 Kafka Repartition 알아보기. (0) 2023.12.22 Docker 로 Kafka Cluster 구축해보기. (0) 2023.12.16 Kafka Ksql 구현하기 (0) 2023.12.11