-
[Kafka] listeners, advertised.listeners 알아보기Kafka 2024. 1. 1. 11:23728x90반응형
- 목차
함께 보면 좋은 자료.
https://westlife0615.tistory.com/474
listeners.
Kafka 의 server.properties 설정을 통해서 listeners 를 등록할 수 있습니다.
listeners 는 "INTERNAL://:9092,EXTERNAL://:29092,EXTERNAL_SASL://:39092" 와 같이
(Protocol + IP + Port) 를 나열하는 문자열입니다.
listeners 설정의 가장 중요한 것은 listeners 에 추가한 Port 들은 Kafka Broker 프로세스가 Binding 하고 Listening 하게 됩니다.
즉, Socket -> Bind -> Listen 과 같은 System Call 을 통해서 TCP Socket 이 생성됩니다.
예를 들어, 저는 아래와 같이 server.properties 를 구현하였습니다.
KAFKA_LISTENERS 값으로 9092, 29092, 29192, 29292, 29392, 29492 Port 를 설정하였구요.
kafka2: image: confluentinc/cp-kafka:7.4.3 container_name: kafka2 depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092,EXTERNAL1://:29192,EXTERNAL2://:29292,EXTERNAL3://:29392,EXTERNAL4://:29492 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:29092,EXTERNAL1://kafka2:29192,EXTERNAL2://kafka2:29292,EXTERNAL3://kafka2:29392,EXTERNAL4://kafka2:29492 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNAL1:PLAINTEXT,EXTERNAL2:PLAINTEXT,EXTERNAL3:PLAINTEXT,EXTERNAL4:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka
위와 같이 브로커가 구성된다면 브로커는 9092, 29092, 29192, 29292, 29392, 29492 Port 들을 Binding 하게 됩니다.
아래와 같이 각 Port 를 리스닝하고 있습니다.
nc -zv localhost 9092 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:9092. Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds. sh-4.4$ nc -zv localhost 29092 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:29092. Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds. sh-4.4$ nc -zv localhost 29192 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:29192. Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds. sh-4.4$ nc -zv localhost 29292 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:29292. Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds. sh-4.4$ nc -zv localhost 29392 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:29392. Ncat: 0 bytes sent, 0 bytes received in 0.00 seconds. sh-4.4$ nc -zv localhost 29492 Ncat: Version 7.70 ( https://nmap.org/ncat ) Ncat: Connected to 127.0.0.1:29492. Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
advertised.listeners
advertised.listener 는 브로커가 Listening 하는 Port 를 외부 세계로 연결하는 역할을 합니다.
카프카는 클러스터 환경이기 때문에 Private Network 내부에서 동작하게 됩니다.
이러한 클러스터 내부의 네트워크를 외부로 연결시켜야하는데요.
그 역할을 수행하는 것이 advertised.listeners 설정입니다.
저희는 이미 listener 설정을 통해서 브로커의 Network Port 를 활성화시켰습니다.
그 Port 정보를 토대로 advertised.listeners 를 활용하여 다른 서버들과 소통할 수 있습니다.
개별 브로커가 소통하는 서버는 크게 다음과 같습니다.
- 다른 브로커
- 메시지 프로듀서 ( Producer ), 컨슈머 ( Consumer )
그래서 보통 다른 브로커들과 소통하기 위한 Protocol 와 Client 인 Producer & Consumer 와 소통하는 2개의 Protocol 을 만들곤 합니다.
한번 테스트를 통해서 살펴보도록 하겠습니다.
아래 docker compose 를 실행시키면 Kafka Cluster 가 생성됩니다.
3개의 브로커로 구성되구요.
INTERNAL 과 EXTERNAL 프로토콜을 가집니다.
INTERNAL 은 다른 브로커들과 소통하기 위한 Protocol 이자 Port 이구요. 9092 Port 를 사용합니다.
EXTERNAL 은 Producer 또는 Consumer 들과 소통하기 위한 Protocol 이자 Port 이구요.
각 브로커는 29091 또는 29092 또는 29093 Port 를 사용합니다.
그리고 저희 Local Machine 의 Domain 을 westlife.com 으로 강제로 변경하였습니다.
/etc/hosts 파일에 127.0.0.1 westlife.com 를 추가하였습니다.
< Kafka Docker Compose Yaml >
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - "kafka1" - "kafka2" - "kafka3" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:22181" networks: - kafka kafka1: image: confluentinc/cp-kafka:7.4.3 container_name: kafka1 depends_on: - zookeeper ports: - "29091:29091" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://westlife.com:29091 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka2: image: confluentinc/cp-kafka:7.4.3 container_name: kafka2 depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://westlife.com:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka kafka3: image: confluentinc/cp-kafka:7.4.3 container_name: kafka3 depends_on: - zookeeper ports: - "29093:29093" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://westlife.com:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 3 networks: - kafka networks: kafka: driver: bridge EOF
그리고 아래의 Kafka Producer 를 실행하게 되면 정상적으로 Broker 와 연결이 가능합니다.
advertised.listeners 로 오픈한 westlife.com:29091, westlife.com:29092, westlife.com:29093 을 통해서
Producer 는 Broker 와 통신이 가능해집니다.
from kafka import KafkaProducer import time, random, string if __name__ == '__main__': bootstrap_servers = ['westlife.com:29091', 'westlife.com:29092', 'westlife.com:29093'] producer = KafkaProducer( bootstrap_servers=bootstrap_servers, client_id='test_producer', value_serializer=lambda d: d.encode('utf-8'), ) topic = 'new_topic' while True: random_value = ''.join(random.choice(string.ascii_letters) for _ in range(10)) future = producer.send(topic, random_value) record_metadata = future.get() print(record_metadata) # producer.flush() print("#### flush") time.sleep(1)
반응형'Kafka' 카테고리의 다른 글
[Kafka] Partition Ownership 알아보기 (0) 2024.01.05 [Kafka] Log Index File 알아보기 (0) 2024.01.03 [Kafka] Partition Leader Election 알아보기 (파티션 리더 선출) (0) 2023.12.31 [Kafka Streams] Config 알아보기 (0) 2023.12.29 Kafka Log Segment 알아보기 (0) 2023.12.25