-
[Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기Kafka 2024. 6. 22. 07:50반응형
- 목차
들어가며.
Kafka Broker 는 advertised.listeners 와 같은 설정값이 존재합니다.
advertised.listeners 는 카프카 브로커가 외부 클라이언트에게 자신의 IP 혹은 Domain 그리고 Port 를 노출하여 연결될 수 있는 주소를 제공합니다.
그리고 이러한 advertised.listeners 를 통해서 클라이언트는 브로커와 연결될 수 있습니다.
예를 들어 아래와 같은 Docker Command 를 통해서 Kafka 를 실행할 때에 실행된 브로커는 kafka:9092 라는 주소를 외부로 노출합니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.8.1 docker run -d --name kafka --hostname kafka --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \ confluentinc/cp-kafka:7.8.1
그리고 외부의 클라이언트들은 아래와 같은 명령어를 통해서 kafka:9092 브로커에 접근할 수 있습니다.
( --bootstrap-server kafka:9092 )
docker run -it --rm --net kafka-net confluentinc/cp-kafka:7.8.1 \ kafka-topics --create --bootstrap-server kafka:9092 \ --replication-factor 1 --partitions 1 --topic test_topic // WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. // Created topic test_topic.
advertised.listeners 와 함께 실행되는 Kafka Broker 는 내부적으로 TCP Socket 을 생성합니다.
그리고 TCP Socket 은 필수적으로 Accept, Read, Write 등과 같은 시스템 콜을 호출하는 별도의 Thread 가 생성되게 됩니다.
이번 글에서 advertised.listeners 와 이와 관련하여 생성되는 Socket Acceptor Listener Thread 에 대해서 알아보려고 합니다.
advertised.listeners 와 Socket 의 관계.
카프카 브로커는 기본적으로 advertised.listeners 을 요구합니다.
아래와 같은 형식으로 Kafka Broker 도커 컨테이너를 실행할 수 있습니다.
그리고 kafka:9092 를 advertised.listeners 로 설정하였습니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.8.1 docker run -d --name kafka --hostname kafka --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \ confluentinc/cp-kafka:7.8.1
ip addr show 명령어를 통해서 Kafka Broker 가 실행되는 환경에서의 Network Interface 와 IP Address 를 확인할 수 있습니다.
( Docker 환경이기 때문에 eth0 Network Interface 는 Virtual Ethernet 형식을 취하고 있습니다. )
ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo valid_lft forever preferred_lft forever inet6 ::1/128 scope host valid_lft forever preferred_lft forever 1537: eth0@if1538: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:ac:13:00:03 brd ff:ff:ff:ff:ff:ff link-netnsid 0 inet 172.19.0.3/16 brd 172.19.255.255 scope global eth0 valid_lft forever preferred_lft forever
ip addr show 로 확인되는 IP 는 172.19.0.3 입니다.
그리고 /etc/hosts 파일에 작성된 Domain 이름 또한 확인 가능합니다.
아래와 같이 IP 172.19.0.3 는 "kafka" Domain 명으로 설정되어 있습니다.
cat /etc/hosts 127.0.0.1 localhost ::1 localhost ip6-localhost ip6-loopback fe00::0 ip6-localnet ff00::0 ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters 172.19.0.3 kafka
그리고 생성된 Socket 을 확인해보겠습니다.
lsof -i :9092 명령어를 통해서 9092 Port 로 바인딩된 Socket 을 확인할 수 있습니다.
lsof -i :9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 1 appuser 154u IPv6 1527280 0t0 TCP *:XmlIpcRegSvc (LISTEN) java 1 appuser 155u IPv6 1538198 0t0 TCP kafka:XmlIpcRegSvc->client.kafka-net:38414 (ESTABLISHED)
위 출력 결과는 1개의 Listen Socket 과 1개의 Established Socket 이 확인됩니다.
Listen Socket 은 카프카 브로커가 Client 로부터 TCP Connection 요청을 리스닝하는 Socket 입니다.
그리고 Established Socket 은 제가 사전에 만들어 준 Client 와 연결된 Socket 입니다.
( 참고로 XmlIpcRegSvc 는 9092 Port 의 별칭으로 사용되고 있습니다. )
즉, advertised.listeners 에 명시한 Host 와 Port 를 기준으로 Kafka 내부에서 Socket 이 생성되게 됩니다.
jstack 실행 중인 Thread 확인하기.
jstack 명령어를 통해서 실행 중인 Thread 를 확인할 수 있습니다.
일반적인 TCP Socket 프로그래밍에서 클라이언트의 커넥션 요청을 리스닝하는 쓰레드와 요청을 처리하는 쓰레드는 분리되어 있습니다.
jstack 기능을 통해서 현재 실행 중인 Java Process 의 Thread 목록을 확인할 수 있는데요.
아래와 같이 "data-plane-kafka-socket-acceptor-ListenerName" 와 같은 이름의 Thread 가 확인됩니다.
이 Thread 는 클라이언트의 Connection 요청을 리스닝하는 역할을 담당합니다.
jstack 1 | grep data-plane-kafka-socket "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092" #35 prio=5 os_prio=0 cpu=888.84ms elapsed=2277.96s tid=0x0000ffffa138c820 nid=0x9c runnable [0x0000ffff6600e000]
Socket Acceptor Thread 는 클라이언트의 연결 요청을 처리하여 Socket 을 Established 상태로 만듭니다.
그리고 이후의 데이터 요청/응답에 해당하는 데이터 교환은 Network Thread 에게 넘겨집니다.
jstack 1 | grep data-plane-kafka-network-thread "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-0" #36 prio=5 os_prio=0 cpu=1483.47ms elapsed=2728.82s tid=0x0000ffffa1323e50 nid=0x99 runnable [0x0000ffff6660b000] "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-1" #37 prio=5 os_prio=0 cpu=1487.45ms elapsed=2728.82s tid=0x0000ffffa137a140 nid=0x9a runnable [0x0000ffff6640c000] "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-2" #38 prio=5 os_prio=0 cpu=1357.36ms elapsed=2728.82s tid=0x0000ffffa137b2b0 nid=0x9b runnable [0x0000ffff6620d000]
기본적으로 Network Thread 는 3개의 Thread 가 생성됩니다.
이는 연결된 Socket 의 File Descriptor 를 전달받아서 Client 의 요청을 처리합니다.
Kafka Broker 설정 중에서 num.network.threads 가 이 Thread 들의 갯수를 결정합니다.
마지막으로 실질적인 Kafka Record 들을 Log File 에 저장하는 IO Thread 가 존재하는데,
이 Thread 들의 이름은 Request Handler 라고 부릅니다.
jstack 1 | grep data-plane-kafka-request-handler "data-plane-kafka-request-handler-0" #60 daemon prio=5 os_prio=0 cpu=1084.69ms elapsed=2873.57s tid=0x0000ffffa12e8860 nid=0x8f waiting on condition [0x0000ffff67a01000] "data-plane-kafka-request-handler-1" #61 daemon prio=5 os_prio=0 cpu=1087.53ms elapsed=2873.57s tid=0x0000ffffa12eba10 nid=0x90 waiting on condition [0x0000ffff67802000] "data-plane-kafka-request-handler-2" #62 daemon prio=5 os_prio=0 cpu=1065.40ms elapsed=2873.57s tid=0x0000ffffa12ec760 nid=0x91 waiting on condition [0x0000ffff67603000] "data-plane-kafka-request-handler-3" #63 daemon prio=5 os_prio=0 cpu=1147.43ms elapsed=2873.57s tid=0x0000ffffa12ed6d0 nid=0x92 waiting on condition [0x0000ffff67404000] "data-plane-kafka-request-handler-4" #64 daemon prio=5 os_prio=0 cpu=1148.68ms elapsed=2873.57s tid=0x0000ffffa12ee700 nid=0x93 waiting on condition [0x0000ffff67205000] "data-plane-kafka-request-handler-5" #65 daemon prio=5 os_prio=0 cpu=1068.30ms elapsed=2873.57s tid=0x0000ffffa12ef900 nid=0x94 waiting on condition [0x0000ffff67006000] "data-plane-kafka-request-handler-6" #66 daemon prio=5 os_prio=0 cpu=1101.81ms elapsed=2873.57s tid=0x0000ffffa12f0b00 nid=0x95 waiting on condition [0x0000ffff66e07000] "data-plane-kafka-request-handler-7" #67 daemon prio=5 os_prio=0 cpu=1144.45ms elapsed=2873.57s tid=0x0000ffffa12f1b00 nid=0x96 waiting on condition [0x0000ffff66c08000]
이들의 갯수는 num.io.threads 로 설정됩니다.
여러 개의 주소가 advertised.listeners 로 설정되는 경우.
만약 여러개의 주소 정보가 advertised.listeners 로 설정된다면,
Kafka Broker 는 그 갯수만큼 Socket 을 생성하고, Socket Acceptor Thread 를 만듭니다.
아래의 예시는 Kafka Broker 가 총 3개의 advertised.listeners 주소를 가지는 예시입니다.
"kafka-sockets" 이라는 이름의 Kafka Broker 도커 컨테이너들 생성하였구요.
"kafka-sockets" 는 아래의 3가지 주소를 가집니다.
- 192.168.100.10:9092
- 192.168.101.10:9093
- 192.168.102.10:9094
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.8.1 docker network create --subnet=192.168.100.0/24 net1 docker network create --subnet=192.168.101.0/24 net2 docker network create --subnet=192.168.102.0/24 net3 docker create --name kafka-sockets --hostname kafka-sockets --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_NET2://0.0.0.0:9093,PLAINTEXT_NET3://0.0.0.0:9094 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.10:9092,PLAINTEXT_NET2://192.168.101.10:9093,PLAINTEXT_NET3://192.168.102.10:9094 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_NET2:PLAINTEXT,PLAINTEXT_NET3:PLAINTEXT \ confluentinc/cp-kafka:7.8.1 docker network connect net1 --ip 192.168.100.10 kafka-sockets docker network connect net2 --ip 192.168.101.10 kafka-sockets docker network connect net3 --ip 192.168.102.10 kafka-sockets docker start kafka-sockets
ip addr show 명령을 통해서 아래와 같이 3개의 IP 를 가지는 것을 확인할 수 있구요.
# ip addr show 1547: eth1@if1548: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:c0:a8:64:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0 inet 192.168.100.10/24 brd 192.168.100.255 scope global eth1 valid_lft forever preferred_lft forever 1549: eth2@if1550: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:c0:a8:65:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0 inet 192.168.101.10/24 brd 192.168.101.255 scope global eth2 valid_lft forever preferred_lft forever 1551: eth3@if1552: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:c0:a8:66:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0 inet 192.168.102.10/24 brd 192.168.102.255 scope global eth3 valid_lft forever preferred_lft forever
그리고 lsof 명령어를 통해서 9092, 9093, 9094 Port 를 리스닝하는 3개의 Socket 이 확인됩니다.
[root@kafka-sockets appuser]# lsof -i :9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 1 appuser 168u IPv6 1557976 0t0 TCP *:XmlIpcRegSvc (LISTEN) java 1 appuser 172u IPv6 1553320 0t0 TCP kafka-sockets:34156->kafka-sockets:XmlIpcRegSvc (ESTABLISHED) java 1 appuser 173u IPv6 1560599 0t0 TCP kafka-sockets:XmlIpcRegSvc->kafka-sockets:34156 (ESTABLISHED) [root@kafka-sockets appuser]# lsof -i :9093 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 1 appuser 169u IPv6 1557977 0t0 TCP *:copycat (LISTEN) [root@kafka-sockets appuser]# lsof -i :9094 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 1 appuser 167u IPv6 1557975 0t0 TCP *:9094 (LISTEN)
마지막으로 jstack 을 통해서 생성된 3개의 Socket Acceptor Thread 도 확인할 수 있습니다.
jstack 1 | grep data-plane-kafka-socket-acceptor "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT_NET3)-PLAINTEXT-9094" #43 prio=5 os_prio=0 cpu=114.52ms elapsed=341.82s tid=0x0000ffff9d3d3460 nid=0x9a runnable [0x0000ffff60e17000] "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092" #35 prio=5 os_prio=0 cpu=121.04ms elapsed=341.81s tid=0x0000ffff9d3d7be0 nid=0x9e runnable [0x0000ffff6061b000] "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT_NET2)-PLAINTEXT-9093" #39 prio=5 os_prio=0 cpu=100.69ms elapsed=341.81s tid=0x0000ffff9d3dcd20 nid=0xa2 runnable [0x0000ffff57dff000]
반응형'Kafka' 카테고리의 다른 글
[Kafka] Topic 의 retention.bytes & retention.ms 알아보기 (0) 2024.06.24 [Kafka] num.replica.fetchers 와 ReplicaFetcherThread 알아보기 (0) 2024.06.24 [Kafka] Adaptive Partitioning 코드로 살펴보는 내부 동작 원리 ( BuiltInPartitioner ) (0) 2024.06.09 [Kafka] BufferPool 알아보기 (org.apache.kafka:kafka-clients) (0) 2024.06.03 [Kafka] Timeindex 알아보기 (0) 2024.06.01