-
Kafka Repartition 알아보기.Kafka 2023. 12. 22. 07:49728x90반응형
- 목차
소개.
이번 글에서는 카프카 토픽이 어떻게 repartition 되는지에 대해서 알아보고자 합니다.
Repartition 은 토픽의 파티션을 여러 브로커로 분배하는 행위를 의미합니다.
토픽을 생성하게 되면 파티션의 갯수와 Replication-Factor 의 값을 설정합니다.
그리고 생성된 파티션들은 여러 브로커들로 분배되게 됩니다.
만약 브로커가 3개 그리고 토픽의 파티션이 3개라면 각각의 파티션들은 각 브로커에서 관리됩니다.
아래와 같이 말이죠.
broker1 - partition1 broker2 - partition2 broker3 - partition3
하지만 브로커가 추가되거나, 제거되는 경우 그리고 파티션이 늘거나 줄어드는 경우에는
파티션의 재분배가 필요해집니다.
Kafka Cluster 생성하기.
docker 를 사용해서 카프카 클러스터를 간단하게 생성해보겠습니다.
< kafka docker compose yaml >
먼저 kafka docker compose yaml 파일을 생성합니다.
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - "kafka1" - "kafka2" - "kafka3" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:2181" networks: - kafka kafka1: image: confluentinc/cp-kafka:7.4.3 container_name: kafka1 depends_on: - zookeeper ports: - "29091:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29091 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka2: image: confluentinc/cp-kafka:7.4.3 container_name: kafka2 depends_on: - zookeeper ports: - "29092:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka3: image: confluentinc/cp-kafka:7.4.3 container_name: kafka3 depends_on: - zookeeper ports: - "29093:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka networks: kafka: driver: bridge EOF
< kafka docker compose up command >
kafka-docker-compose.yaml 을 실행하여 카프카 컨테이너들을 실행합니다.
docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d
< Topic 생성하기 >
위 docker-compose 가 실행된다면 9000 포트를 통해 kafdrop 웹UI 로 접속할 수 있습니다.
(http://localhost:9000)
그리고 messages 라는 이름의 Topic 을 하나 생성합니다.
partition 은 3개, replication-factor 도 3 으로 설정하였습니다.
Repartition.
카프카 클러스터와 토픽이 생성되었다면 Repartition 을 수행해보도록 하겠습니다.
우선 현재 "messages" 토픽의 파티션들과 분배된 상태를 확인해보겠습니다.
아래 이미지처럼 messages 토픽은 0, 1, 2 에 해당하는 3개의 파티션이 존재합니다.
그리고 Replica Nodes 열을 보면 "1,2,3" 이라는 값을 확인할 수 있습니다.
이 뜻은 모든 파티션이 모든 브로커에 존재한다는 의미인데요.
Replication-Factor 값이 3 이고 브로커의 갯수 또한 3 이기 때문에 모든 브로커에 모든 파티션이 존재합니다.
그리고 Leader Node 의 값은 해당하는 파티션의 리더 브로커를 의미합니다.
그래서 "messages" 토픽의 각 파티션에 쓰여지는 Write 요청은 해당하는 리더 브로커가 책임지게 됩니다.
저는 Kafka Broker 를 하나 추가한 이후에 repartition 을 수행해보도록 하겠습니다.
< Kafka Broker 추가 >
id 가 4 인 새로운 브로커를 추가합니다.
docker run -d --name kafka4 -p 29094:9092 \ -e KAFKA_BROKER_ID=4 \ -e KAFKA_ZOOKEEPER_CONNECT='zookeeper:2181' \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka4:9092,PLAINTEXT_HOST://localhost:29094 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ -e KAFKA_MESSAGE_MAX_BYTES=10000000 \ -e KAFKA_SOCKET_REQUEST_MAX_BYTES=100001200 \ -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=10000000 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=3 \ --network kafka_kafka confluentinc/cp-kafka:7.4.3
새로운 브로커가 추가된 이후에 kafdrop 을 확인해보면 아래와 같이 4번 브로커가 추가되었음을 확인할 수 있습니다.
그리고 repartition 을 한번 수행해보도록 하겠습니다.
cat <<EOF> /tmp/repartition.json { "version": 1, "partitions": [ {"topic": "messages", "partition": 0, "replicas": [1, 2, 4]}, {"topic": "messages", "partition": 1, "replicas": [2, 4, 1]}, {"topic": "messages", "partition": 2, "replicas": [3, 4, 2]} ] } EOF kafka-reassign-partitions --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/repartition.json --execute
Current partition replica assignment {"version":1,"partitions":[{"topic":"messages","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"messages","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"messages","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for messages-0,messages-1,messages-2
kafka-reassign-partitions 수행을 마치면 아래와 같이 새로운 브로커인 4번 브로커로 파티션들이 분배됨을 알 수 있습니다.
그리고 messags-0, messages-1 에 해당하는 로그 파일도 생성됩니다.
ls -lh /var/lib/kafka/data/ total 28K -rw-r--r-- 1 appuser appuser 4 Dec 25 02:56 cleaner-offset-checkpoint -rw-r--r-- 1 appuser appuser 4 Dec 24 02:57 log-start-offset-checkpoint drwxr-xr-x 2 appuser appuser 4.0K Dec 25 02:56 messages-0 drwxr-xr-x 2 appuser appuser 4.0K Dec 24 02:56 messages-1 -rw-r--r-- 1 appuser appuser 88 Dec 24 23:49 meta.properties -rw-r--r-- 1 appuser appuser 30 Dec 25 02:57 recovery-point-offset-checkpoint -rw-r--r-- 1 appuser appuser 30 Dec 25 02:58 replication-offset-checkpoint
브로커가 줄어든 상태에서의 Repartitioning.
4번 브로커를 종료시켜보도록 하겠습니다.
docker stop kafka4
4번 브로커가 종료되어 In-sync Replica Nodes 에서 4번 브로커는 제외됩니다.
참고로 In-sync Replica Nodes 는 Write 과 Replication 에 관여하는 브로커들을 의미합니다.
이 상황에서 다시 Repartitioning 을 수행해보도록 하겠습니다.
cat <<EOF> /tmp/repartition.json { "version": 1, "partitions": [ {"topic": "messages", "partition": 0, "replicas": [1, 2, 3]}, {"topic": "messages", "partition": 1, "replicas": [2, 3, 1]}, {"topic": "messages", "partition": 2, "replicas": [3, 1, 2]} ] } EOF kafka-reassign-partitions --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/repartition.json --execute
kafka-reassign-partitions 명령어가 수행되면 아래 이미지처럼 1,2,3 번 브로커로 파티션들이 재분배됩니다.
그리고 1번 브로커의 Log Segment 를 조회해보면 아래와 같이 0,1,2 파티션과 관련된 파일들이 존재하게 됩니다.
ls -lh /var/lib/kafka/data/ total 32K -rw-r--r-- 1 appuser appuser 4 Dec 24 02:56 cleaner-offset-checkpoint -rw-r--r-- 1 appuser appuser 4 Dec 25 03:06 log-start-offset-checkpoint drwxr-xr-x 2 appuser appuser 4.0K Dec 25 03:04 messages-0 drwxr-xr-x 2 appuser appuser 4.0K Dec 24 03:00 messages-1 drwxr-xr-x 2 appuser appuser 4.0K Dec 24 03:04 messages-2 -rw-r--r-- 1 appuser appuser 88 Dec 25 23:49 meta.properties -rw-r--r-- 1 appuser appuser 43 Dec 25 03:06 recovery-point-offset-checkpoint -rw-r--r-- 1 appuser appuser 43 Dec 25 03:07 replication-offset-checkpoint
반응형'Kafka' 카테고리의 다른 글
Kafka Log Segment 알아보기 (0) 2023.12.25 [Kafka] Zookeeper 는 Broker 를 어떻게 관리할까 ? (0) 2023.12.22 Docker 로 Kafka Cluster 구축해보기. (0) 2023.12.16 Kafka Ksql 구현하기 (0) 2023.12.11 Kafka-Connect & MySQL 구현하기 (0) 2023.12.09