ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기
    Kafka 2024. 6. 22. 07:50
    반응형

    - 목차

     

    들어가며.

    Kafka Broker 는 advertised.listeners 와 같은 설정값이 존재합니다.

    advertised.listeners 는 카프카 브로커가 외부 클라이언트에게 자신의 IP 혹은 Domain 그리고 Port 를 노출하여 연결될 수 있는 주소를 제공합니다.

    그리고 이러한 advertised.listeners 를 통해서 클라이언트는 브로커와 연결될 수 있습니다.

    예를 들어 아래와 같은 Docker Command 를 통해서 Kafka 를 실행할 때에 실행된 브로커는 kafka:9092 라는 주소를 외부로 노출합니다.

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.8.1
    
    docker run -d --name kafka --hostname kafka --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
      confluentinc/cp-kafka:7.8.1

     

    그리고 외부의 클라이언트들은 아래와 같은 명령어를 통해서 kafka:9092 브로커에 접근할 수 있습니다.

    ( --bootstrap-server kafka:9092 )

    docker run -it --rm --net kafka-net confluentinc/cp-kafka:7.8.1 \
    	kafka-topics --create --bootstrap-server kafka:9092 \
    	--replication-factor 1 --partitions 1 --topic test_topic
        
    // WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    // Created topic test_topic.

     

     

    advertised.listeners 와 함께 실행되는 Kafka Broker 는 내부적으로 TCP Socket 을 생성합니다.

    그리고 TCP Socket 은 필수적으로 Accept, Read, Write 등과 같은 시스템 콜을 호출하는 별도의 Thread 가 생성되게 됩니다.

    이번 글에서 advertised.listeners 와 이와 관련하여 생성되는 Socket Acceptor Listener Thread 에 대해서 알아보려고 합니다.

     

    advertised.listeners 와 Socket 의 관계.

    카프카 브로커는 기본적으로 advertised.listeners 을 요구합니다.

    아래와 같은 형식으로 Kafka Broker 도커 컨테이너를 실행할 수 있습니다.

    그리고 kafka:9092 를 advertised.listeners 로 설정하였습니다.

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.8.1
    
    docker run -d --name kafka --hostname kafka --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
      confluentinc/cp-kafka:7.8.1

     

    ip addr show 명령어를 통해서 Kafka Broker 가 실행되는 환경에서의 Network Interface 와 IP Address 를 확인할 수 있습니다.

    ( Docker 환경이기 때문에 eth0 Network Interface 는 Virtual Ethernet 형식을 취하고 있습니다. )

    ip addr show
    1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
        link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
        inet 127.0.0.1/8 scope host lo
           valid_lft forever preferred_lft forever
        inet6 ::1/128 scope host
           valid_lft forever preferred_lft forever
    1537: eth0@if1538: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
        link/ether 02:42:ac:13:00:03 brd ff:ff:ff:ff:ff:ff link-netnsid 0
        inet 172.19.0.3/16 brd 172.19.255.255 scope global eth0
           valid_lft forever preferred_lft forever

     

    ip addr show 로 확인되는 IP 는 172.19.0.3 입니다.

    그리고 /etc/hosts 파일에 작성된 Domain 이름 또한 확인 가능합니다.

    아래와 같이 IP 172.19.0.3 는 "kafka" Domain 명으로 설정되어 있습니다.

     

    cat /etc/hosts
    127.0.0.1	localhost
    ::1	localhost ip6-localhost ip6-loopback
    fe00::0	ip6-localnet
    ff00::0	ip6-mcastprefix
    ff02::1	ip6-allnodes
    ff02::2	ip6-allrouters
    172.19.0.3	kafka

     

    그리고 생성된 Socket 을 확인해보겠습니다.

    lsof -i :9092 명령어를 통해서 9092 Port 로 바인딩된 Socket 을 확인할 수 있습니다.

    lsof -i :9092
    COMMAND PID    USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
    java      1 appuser  154u  IPv6 1527280      0t0  TCP *:XmlIpcRegSvc (LISTEN)
    java      1 appuser  155u  IPv6 1538198      0t0  TCP kafka:XmlIpcRegSvc->client.kafka-net:38414 (ESTABLISHED)

     

    위 출력 결과는 1개의 Listen Socket 과 1개의 Established Socket 이 확인됩니다.

    Listen Socket 은 카프카 브로커가 Client 로부터 TCP Connection 요청을 리스닝하는 Socket 입니다.

    그리고 Established Socket 은 제가 사전에 만들어 준 Client 와 연결된 Socket 입니다.

    ( 참고로 XmlIpcRegSvc 는 9092 Port 의 별칭으로 사용되고 있습니다. )

     

    즉, advertised.listeners 에 명시한 Host 와 Port 를 기준으로 Kafka 내부에서 Socket 이 생성되게 됩니다.

     

    jstack 실행 중인 Thread 확인하기.

    jstack 명령어를 통해서 실행 중인 Thread 를 확인할 수 있습니다.

    일반적인 TCP Socket 프로그래밍에서 클라이언트의 커넥션 요청을 리스닝하는 쓰레드와 요청을 처리하는 쓰레드는 분리되어 있습니다. 

    jstack 기능을 통해서 현재 실행 중인 Java Process 의 Thread 목록을 확인할 수 있는데요.

    아래와 같이 "data-plane-kafka-socket-acceptor-ListenerName" 와 같은 이름의 Thread 가 확인됩니다.

    이 Thread 는 클라이언트의 Connection 요청을 리스닝하는 역할을 담당합니다.

     

    jstack 1 | grep data-plane-kafka-socket
    "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092" 
    #35 prio=5 os_prio=0 cpu=888.84ms elapsed=2277.96s tid=0x0000ffffa138c820 nid=0x9c runnable  [0x0000ffff6600e000]

     

    Socket Acceptor Thread 는 클라이언트의 연결 요청을 처리하여 Socket 을 Established 상태로 만듭니다. 

    그리고 이후의 데이터 요청/응답에 해당하는 데이터 교환은 Network Thread 에게 넘겨집니다.

     

    jstack 1 | grep data-plane-kafka-network-thread
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-0" #36 prio=5 os_prio=0 cpu=1483.47ms elapsed=2728.82s tid=0x0000ffffa1323e50 nid=0x99 runnable  [0x0000ffff6660b000]
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-1" #37 prio=5 os_prio=0 cpu=1487.45ms elapsed=2728.82s tid=0x0000ffffa137a140 nid=0x9a runnable  [0x0000ffff6640c000]
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-2" #38 prio=5 os_prio=0 cpu=1357.36ms elapsed=2728.82s tid=0x0000ffffa137b2b0 nid=0x9b runnable  [0x0000ffff6620d000]

     

    기본적으로 Network Thread 는 3개의 Thread 가 생성됩니다.

    이는 연결된 Socket 의 File Descriptor 를 전달받아서 Client 의 요청을 처리합니다. 

    Kafka Broker 설정 중에서 num.network.threads 가 이 Thread 들의 갯수를 결정합니다.

     

    마지막으로 실질적인 Kafka Record 들을 Log File 에 저장하는 IO Thread 가 존재하는데,

    이 Thread 들의 이름은 Request Handler 라고 부릅니다.

     

    jstack 1 | grep data-plane-kafka-request-handler
    "data-plane-kafka-request-handler-0" #60 daemon prio=5 os_prio=0 cpu=1084.69ms elapsed=2873.57s tid=0x0000ffffa12e8860 nid=0x8f waiting on condition  [0x0000ffff67a01000]
    "data-plane-kafka-request-handler-1" #61 daemon prio=5 os_prio=0 cpu=1087.53ms elapsed=2873.57s tid=0x0000ffffa12eba10 nid=0x90 waiting on condition  [0x0000ffff67802000]
    "data-plane-kafka-request-handler-2" #62 daemon prio=5 os_prio=0 cpu=1065.40ms elapsed=2873.57s tid=0x0000ffffa12ec760 nid=0x91 waiting on condition  [0x0000ffff67603000]
    "data-plane-kafka-request-handler-3" #63 daemon prio=5 os_prio=0 cpu=1147.43ms elapsed=2873.57s tid=0x0000ffffa12ed6d0 nid=0x92 waiting on condition  [0x0000ffff67404000]
    "data-plane-kafka-request-handler-4" #64 daemon prio=5 os_prio=0 cpu=1148.68ms elapsed=2873.57s tid=0x0000ffffa12ee700 nid=0x93 waiting on condition  [0x0000ffff67205000]
    "data-plane-kafka-request-handler-5" #65 daemon prio=5 os_prio=0 cpu=1068.30ms elapsed=2873.57s tid=0x0000ffffa12ef900 nid=0x94 waiting on condition  [0x0000ffff67006000]
    "data-plane-kafka-request-handler-6" #66 daemon prio=5 os_prio=0 cpu=1101.81ms elapsed=2873.57s tid=0x0000ffffa12f0b00 nid=0x95 waiting on condition  [0x0000ffff66e07000]
    "data-plane-kafka-request-handler-7" #67 daemon prio=5 os_prio=0 cpu=1144.45ms elapsed=2873.57s tid=0x0000ffffa12f1b00 nid=0x96 waiting on condition  [0x0000ffff66c08000]

     

    이들의 갯수는 num.io.threads 로 설정됩니다.

     

    여러 개의 주소가 advertised.listeners 로 설정되는 경우.

    만약 여러개의 주소 정보가 advertised.listeners 로 설정된다면,

    Kafka Broker 는 그 갯수만큼 Socket 을 생성하고, Socket Acceptor Thread 를 만듭니다.

    아래의 예시는 Kafka Broker 가 총 3개의 advertised.listeners 주소를 가지는 예시입니다.

    "kafka-sockets" 이라는 이름의 Kafka Broker 도커 컨테이너들 생성하였구요.

    "kafka-sockets" 는 아래의 3가지 주소를 가집니다.

    • 192.168.100.10:9092
    • 192.168.101.10:9093
    • 192.168.102.10:9094

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.8.1
    
    docker network create --subnet=192.168.100.0/24 net1
    docker network create --subnet=192.168.101.0/24 net2
    docker network create --subnet=192.168.102.0/24 net3
    
    docker create --name kafka-sockets --hostname kafka-sockets --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=2 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_NET2://0.0.0.0:9093,PLAINTEXT_NET3://0.0.0.0:9094 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.10:9092,PLAINTEXT_NET2://192.168.101.10:9093,PLAINTEXT_NET3://192.168.102.10:9094 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_NET2:PLAINTEXT,PLAINTEXT_NET3:PLAINTEXT \
      confluentinc/cp-kafka:7.8.1
    
    docker network connect net1 --ip 192.168.100.10 kafka-sockets
    docker network connect net2 --ip 192.168.101.10 kafka-sockets
    docker network connect net3 --ip 192.168.102.10 kafka-sockets
    
    docker start kafka-sockets

     

    ip addr show 명령을 통해서 아래와 같이 3개의 IP 를 가지는 것을 확인할 수 있구요.

    # ip addr show
    1547: eth1@if1548: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
        link/ether 02:42:c0:a8:64:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0
        inet 192.168.100.10/24 brd 192.168.100.255 scope global eth1
           valid_lft forever preferred_lft forever
    1549: eth2@if1550: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
        link/ether 02:42:c0:a8:65:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0
        inet 192.168.101.10/24 brd 192.168.101.255 scope global eth2
           valid_lft forever preferred_lft forever
    1551: eth3@if1552: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
        link/ether 02:42:c0:a8:66:0a brd ff:ff:ff:ff:ff:ff link-netnsid 0
        inet 192.168.102.10/24 brd 192.168.102.255 scope global eth3
           valid_lft forever preferred_lft forever

     

    그리고 lsof 명령어를 통해서 9092, 9093, 9094 Port 를 리스닝하는 3개의 Socket 이 확인됩니다.

    [root@kafka-sockets appuser]# lsof -i :9092
    COMMAND PID    USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
    java      1 appuser  168u  IPv6 1557976      0t0  TCP *:XmlIpcRegSvc (LISTEN)
    java      1 appuser  172u  IPv6 1553320      0t0  TCP kafka-sockets:34156->kafka-sockets:XmlIpcRegSvc (ESTABLISHED)
    java      1 appuser  173u  IPv6 1560599      0t0  TCP kafka-sockets:XmlIpcRegSvc->kafka-sockets:34156 (ESTABLISHED)
    [root@kafka-sockets appuser]# lsof -i :9093
    COMMAND PID    USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
    java      1 appuser  169u  IPv6 1557977      0t0  TCP *:copycat (LISTEN)
    [root@kafka-sockets appuser]# lsof -i :9094
    COMMAND PID    USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
    java      1 appuser  167u  IPv6 1557975      0t0  TCP *:9094 (LISTEN)

     

    마지막으로 jstack 을 통해서 생성된 3개의 Socket Acceptor Thread 도 확인할 수 있습니다.

     

    jstack 1 | grep data-plane-kafka-socket-acceptor
    "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT_NET3)-PLAINTEXT-9094" #43 prio=5 os_prio=0 cpu=114.52ms elapsed=341.82s tid=0x0000ffff9d3d3460 nid=0x9a runnable  [0x0000ffff60e17000]
    "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092" #35 prio=5 os_prio=0 cpu=121.04ms elapsed=341.81s tid=0x0000ffff9d3d7be0 nid=0x9e runnable  [0x0000ffff6061b000]
    "data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT_NET2)-PLAINTEXT-9093" #39 prio=5 os_prio=0 cpu=100.69ms elapsed=341.81s tid=0x0000ffff9d3dcd20 nid=0xa2 runnable  [0x0000ffff57dff000]

     

     

    반응형
Designed by Tistory.