ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] listeners, advertised.listeners 알아보기
    Kafka 2024. 1. 1. 11:23
    728x90
    반응형

    - 목차

     

    함께 보면 좋은 자료.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    listeners.

    Kafka 의 server.properties 설정을 통해서 listeners 를 등록할 수 있습니다.

    listeners 는 "INTERNAL://:9092,EXTERNAL://:29092,EXTERNAL_SASL://:39092" 와 같이

    (Protocol + IP + Port) 를 나열하는 문자열입니다.

    listeners 설정의 가장 중요한 것은 listeners 에 추가한 Port 들은 Kafka Broker 프로세스가 Binding 하고 Listening 하게 됩니다.

    즉, Socket -> Bind -> Listen 과 같은 System Call 을 통해서 TCP Socket 이 생성됩니다.

     

    예를 들어, 저는 아래와 같이 server.properties 를 구현하였습니다.

    KAFKA_LISTENERS 값으로 9092, 29092, 29192, 29292, 29392, 29492 Port 를 설정하였구요.

    kafka2:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka2 
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092,EXTERNAL1://:29192,EXTERNAL2://:29292,EXTERNAL3://:29392,EXTERNAL4://:29492
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:29092,EXTERNAL1://kafka2:29192,EXTERNAL2://kafka2:29292,EXTERNAL3://kafka2:29392,EXTERNAL4://kafka2:29492
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNAL1:PLAINTEXT,EXTERNAL2:PLAINTEXT,EXTERNAL3:PLAINTEXT,EXTERNAL4:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka

     

    위와 같이 브로커가 구성된다면 브로커는 9092, 29092, 29192, 29292, 29392, 29492 Port 들을 Binding 하게 됩니다.

    아래와 같이 각 Port 를 리스닝하고 있습니다.

    nc -zv localhost 9092
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:9092.
    Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
    
    sh-4.4$ nc -zv localhost 29092
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:29092.
    Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
    
    sh-4.4$ nc -zv localhost 29192
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:29192.
    Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
    
    sh-4.4$ nc -zv localhost 29292
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:29292.
    Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
    
    sh-4.4$ nc -zv localhost 29392
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:29392.
    Ncat: 0 bytes sent, 0 bytes received in 0.00 seconds.
    
    sh-4.4$ nc -zv localhost 29492
    Ncat: Version 7.70 ( https://nmap.org/ncat )
    Ncat: Connected to 127.0.0.1:29492.
    Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.

     

    advertised.listeners

    advertised.listener 는 브로커가 Listening 하는 Port 를 외부 세계로 연결하는 역할을 합니다.

    카프카는 클러스터 환경이기 때문에 Private Network 내부에서 동작하게 됩니다.

    이러한 클러스터 내부의 네트워크를 외부로 연결시켜야하는데요.

    그 역할을 수행하는 것이 advertised.listeners 설정입니다.

     

    저희는 이미 listener 설정을 통해서 브로커의 Network Port 를 활성화시켰습니다.

    그 Port 정보를 토대로 advertised.listeners 를 활용하여 다른 서버들과 소통할 수 있습니다.

    개별 브로커가 소통하는 서버는 크게 다음과 같습니다.

     

    - 다른 브로커

    - 메시지 프로듀서 ( Producer ), 컨슈머 ( Consumer )

     

    그래서 보통 다른 브로커들과 소통하기 위한 Protocol 와 Client 인 Producer & Consumer 와 소통하는 2개의 Protocol 을 만들곤 합니다.

    한번 테스트를 통해서 살펴보도록 하겠습니다.

    아래 docker compose 를 실행시키면 Kafka Cluster 가 생성됩니다.

    3개의 브로커로 구성되구요.

    INTERNAL 과 EXTERNAL 프로토콜을 가집니다.

    INTERNAL 은 다른 브로커들과 소통하기 위한 Protocol 이자 Port 이구요. 9092 Port 를 사용합니다.

    EXTERNAL 은 Producer 또는 Consumer 들과 소통하기 위한 Protocol 이자 Port 이구요.

    각 브로커는 29091 또는 29092 또는 29093 Port 를 사용합니다.

     

    그리고 저희 Local Machine 의 Domain 을 westlife.com 으로 강제로 변경하였습니다.

    /etc/hosts 파일에 127.0.0.1 westlife.com 를 추가하였습니다.

     

    < Kafka Docker Compose Yaml >

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092"
        depends_on:
          - "kafka1"
          - "kafka2"
          - "kafka3"      
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:22181"
        networks:
          - kafka      
      kafka1:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka1
        depends_on:
          - zookeeper
        ports:
          - "29091:29091"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://westlife.com:29091
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3
        networks:
          - kafka
      kafka2:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka2 
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://westlife.com:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
      kafka3:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka3
        depends_on:
          - zookeeper
        ports:
          - "29093:29093"
        environment:
          KAFKA_BROKER_ID: 3
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://westlife.com:29093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3  
        networks:
          - kafka      
    networks:
      kafka:
        driver: bridge
    EOF

     

     

    그리고 아래의 Kafka Producer 를 실행하게 되면 정상적으로 Broker 와 연결이 가능합니다.

    advertised.listeners 로 오픈한 westlife.com:29091, westlife.com:29092, westlife.com:29093 을 통해서

    Producer 는 Broker 와 통신이 가능해집니다.

    from kafka import KafkaProducer
    import time, random, string
    
    if __name__ == '__main__':
        bootstrap_servers = ['westlife.com:29091', 'westlife.com:29092', 'westlife.com:29093']
        producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            client_id='test_producer',
            value_serializer=lambda d: d.encode('utf-8'),
    
        )
        topic = 'new_topic'
        while True:
            random_value = ''.join(random.choice(string.ascii_letters) for _ in range(10))
            future = producer.send(topic, random_value)
            record_metadata = future.get()
            print(record_metadata)
            # producer.flush()
            print("#### flush")
            time.sleep(1)

     

     

     

    반응형
Designed by Tistory.