ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Docker 로 Kafka Cluster 구축해보기.
    Kafka 2023. 12. 16. 00:39
    728x90
    반응형

    - 목차

     

    소개.

    저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 

    그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. 

     

     

    docker-compose.yaml.

    아래 명령어는 kafka 클러스터를 실행하기 위한 docker-compose.yaml 파일입니다. 

    cat 명령어를 통해서 /tmp 폴더 하위에 kafka-docker-compose.yaml 파일을 생성합니다. 

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "always"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092"
        depends_on:
          - "kafka1"
          - "kafka2"
          - "kafka3"      
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        restart: "always"    
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:22181"
        networks:
          - kafka      
      kafka1:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka1
        restart: "always"    
        depends_on:
          - zookeeper
        ports:
          - "29091:29091"
          - "29191:29191"      
          - "29291:29291"            
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:29091,EXTERNALDOCKER://host.docker.internal:29191,EXTERNALSERVER://192.168.45.32:29291
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNALDOCKER:PLAINTEXT,EXTERNALSERVER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3
        networks:
          - kafka
      kafka2:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka2 
        restart: "always"    
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
          - "29192:29192"   
          - "29292:29292"                        
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:29092,EXTERNALDOCKER://host.docker.internal:29192,EXTERNALSERVER://192.168.45.32:29292
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNALDOCKER:PLAINTEXT,EXTERNALSERVER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
      kafka3:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka3
        restart: "always"    
        depends_on:
          - zookeeper
        ports:
          - "29093:29093"
          - "29193:29193"      
          - "29293:29293"                  
        environment:
          KAFKA_BROKER_ID: 3
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:29093,EXTERNALDOCKER://host.docker.internal:29193,EXTERNALSERVER://192.168.45.32:29293
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNALDOCKER:PLAINTEXT,EXTERNALSERVER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
    networks:
      kafka:
        driver: bridge
    EOF

     

     

    kafka docker-compose 실행하기.

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

     

     

    Kafdrop 확인하기.

    Kafdrop 는 9000 포트를 통해서 웹 UI 를 제공합니다.

     

    < http://localhost:9000 >

     

     

    토픽 생성하기.

    http://localhost:9000/topic/create 주소를 통해서 카프카 토픽을 생성할 수 있습니다. 

     

     

    http://localhost:9000/topic/new_topic 주소에서 생성된 토픽의 상세 정보를 확인할 수 있습니다.

     

    또는 아래와 같이 CLI 를 통해서 생성이 가능합니다.

    kafka-topics --bootstrap-server localhost:9092 \
    --create --topic test-topic \
    --partitions 3 --replication-factor 3 \
    --config min.insync.replicas=2

     

    메시지 생성하기.

    아래 명령어를 입력하면 메시지를 생성할 수 있는 콘솔이 생성됩니다. 

    docker exec -it kafka1 \
    kafka-console-producer --bootstrap-server localhost:9092 --topic new_topic

     

    생성된 콘솔에서 아래 메시지들을 입력하면 메시지가 생성됩니다. 

    {"name": "Andy", "age": 26, "city": "Seoul"}
    {"name": "Bruce", "age": 26, "city": "Busan"}
    {"name": "Chris", "age": 26, "city": "Daegu"}
    {"name": "Daniel", "age": 26, "city": "Yongin"}
    {"name": "Emma", "age": 26, "city": "Pohang"}

     

     

     

    메시지 대량으로 생성하기.

    touch test.json
    for num in {0..100000}; do echo {"num":${num}} >> test.json; done;
    cat test.json | kafka-console-producer --bootstrap-server kafka1:9092 --topic test
    cat test.json | kafka-console-producer --bootstrap-server kafka2:9092 --topic test
    cat test.json | kafka-console-producer --bootstrap-server kafka3:9092 --topic test

     

    메시지 조회하기.

    아래 명령어를 입력하게되면 카프카 메시지를 조회하는 콘솔이 생성됩니다. 

    docker exec -it kafka1 kafka-console-consumer \
    --bootstrap-server localhost:9092 --topic new_topic group group1 --from-beginning
    {"name": "Andy", "age": 26, "city": "Seoul"}
    {"name": "Bruce", "age": 26, "city": "Busan"}
    {"name": "Chris", "age": 26, "city": "Daegu"}
    {"name": "Daniel", "age": 26, "city": "Yongin"}
    {"name": "Emma", "age": 26, "city": "Pohang"}

     

     

    단독으로 Kafka Container 생성하기.

    아래 명령어를 통해서 단일 Kafka Container 를 생성할 수 있습니다. 

    zookeeper:2181 라는 의존성이 존재하긴 하는데요. 

    ZooKeeper Container 가 먼저 생성되어 있다면 아래의 명령어를 사용하실 수 있습니다. 

     

    docker run -d --name kafka4 -p 29094:9092 \
    -e KAFKA_BROKER_ID=4 \
    -e KAFKA_ZOOKEEPER_CONNECT='zookeeper:2181' \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka4:9092,PLAINTEXT_HOST://localhost:29094 \
    -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
    -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
    -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
    -e KAFKA_MESSAGE_MAX_BYTES=10000000 \
    -e KAFKA_SOCKET_REQUEST_MAX_BYTES=100001200 \
    -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=10000000 \
    -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
    -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=3 \
    --network kafka_kafka confluentinc/cp-kafka:7.4.3

     

     

    Kafka CLI 알아보기.

    Consumer Group 조회.

    kafka-consumer-groups --bootstrap-server localhost:9092 --list

     

    Consumer Group 삭제하기.

    kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group test-group

     

     

     

     

    단독으로 ZooKeeper Container 실행하기.

    docker run -d --name zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    -e ZOOKEEPER_PEER_PORT=2888 \
    -e ZOOKEEPER_SERVER_PORT=3888 \
    -e ZOOKEEPER_SERVER_HOST=zookeeper \
    -e ZOOKEEPER_TICK_TIME=2000 \
    -e ZOOKEEPER_INIT_LIMIT=5 \
    -e ZOOKEEPER_SYNC_LIMIT=2 \
    -e ZOOKEEPER_SERVER_ID=1 \
    -e ZOOKEEPER_SERVERS="zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888" \
    -p 2181:2181 confluentinc/cp-zookeeper:7.4.3

     

     

    Znode 조회하기.

    모든 Znode 조회하기.

    docker exec -it zookeeper zookeeper-shell localhost:2181 ls -R /

    brokers/topics 조회.

    docker exec -it zookeeper zookeeper-shell localhost:2181 get /brokers/topics

     

     

    반응형
Designed by Tistory.