ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms )
    Kafka/Kafka Consumer 2024. 6. 30. 14:56
    반응형

    - 목차

     

    들어가며.

    카프카 컨슈머는 브로커에게 데이터를 요청할 수 있습니다.

    일반적인 카프카 클라이언트 모듈에서 Poll 과 같은 함수로 이러한 데이터 요청 기능이 추상화되어 있습니다.

    Poll 함수 내부적으로는 Fetch API 가 사용되구요.

    Fetch API 의 요청과 응답의 형식은 아래와 같습니다.

     

    Fetch Request (Version: 1) => replica_id max_wait_ms min_bytes [topics] 
      replica_id => INT32
      max_wait_ms => INT32
      min_bytes => INT32
      topics => topic [partitions] 
        topic => STRING
        partitions => partition fetch_offset partition_max_bytes 
          partition => INT32
          fetch_offset => INT64
          partition_max_bytes => INT32
          
    Fetch Response (Version: 0) => [responses] 
      responses => topic [partitions] 
        topic => STRING
        partitions => partition_index error_code high_watermark records 
          partition_index => INT32
          error_code => INT16
          high_watermark => INT64
          records => RECORDS

     

    저희가 주목할 사항은 Fetch Request 의 포맷 중에서 이번 글의 주제인 max wait ms 와 min bytes 입니다. 

     

    max.wait.ms 란 ?

    max.wait.ms 는 컨슈머가 브로커로부터 데이터를 전달받는 최대 시간입니다.

    즉, 컨슈머는 max.wait.ms 시간 내에 Fetch Response 를 전달받도록 보장되는 시간입니다.

    약간의 시간적 오차는 있을 수 있겠지만, 1회의 Poll 요청 / 1회의 Fetch Request 는 max.wait.ms 시간 이내에 응답이 됩니다.

    max.wait.ms 의 기본값은 500ms 라고 하네요.

     

    fetch.min.bytes 란 ?

    카프카 컨슈머의 데이터를 폴링하는 과정에서 사용되는 또 다른 설정인 fetch.min.bytes 는 max.wait.ms 와 함께 사용됩니다.

    fetch.min.bytes 는 컨슈머가 브로커에게 요청하는 최소한의 데이터 양입니다.

    fetch.min.bytes 의 기본값은 16kb 로 컨슈머는 1회의 요청에서 최소 16kb 의 데이터를 획득하기를 기대합니다.

     

    여기서 max.wait.ms 와 fetch.min.bytes 가 함께 조합되어서 사용됩니다.

    만약 Topic 내부에 데이터가 없거나 fetch.min.bytes 보다 적은 상태라면 컨슈머는 항상 max.wait.ms 시간 후에 브로커의 응답을 받습니다.

    하지만 Topic 의 데이터가 fetch.min.bytes 의 양보다 많은 상황이라면 max.wait.ms 만큼 브로커의 응답을 기다리지 않습니다.

     

    Request Handler Thread 란 ?

    Kafka Broker 는 클라이언트의 컨슈머의 Fetch Request 를 처리하기 위해서 내부적으로 여러 쓰레드 구조를 가집니다.

    아래의 이미지는 Kafka Broker 의 클라이언트의 요청 처리를 위한 쓰레드 구조인데요.

    아래의 IO Threads 가 Request Handler Thread 를 의미합니다.

     

    출처 : https://www.confluent.io/blog/kafka-producer-internals-handling-producer-request/

     

    카프카 브로커의 네트워크 및 쓰레드 구조를 간단히 설명하겠습니다.

    먼저 클라이언트의 Connection 요청을 수락하기 위한 Acceptor Listener Thread 가 존재합니다.

    이는 jstack 이나 ps 명령어를 통해서 아래와 같은 이름으로 확인되구요.

    제가 설정한 Broker 의 19092 Port 와 Protocol 이 Suffix 로 붙습니다.

    data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092

     

     

    이렇게 생성된 Acceptor Listener Thread 는 클라이언트의 요청을 받아서 3 ways handshake 과정을 수행하고 실질적인 Connection 을 생성합니다.

     

    그리고 이렇게 생성된 Thread 는 Network Threads 에서 처리합니다.

    이 Network Threads 는 num.network.threads 설정을 통해서 그 갯수를 지정할 수 있습니다.

     

    https://westlife0615.tistory.com/923

     

    [Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기

    - 목차 들어가며.Kafka Broker 는 advertised.listeners 와 같은 설정값이 존재합니다.advertised.listeners 는 카프카 브로커가 외부 클라이언트에게 자신의 IP 혹은 Domain 그리고 Port 를 노출하여 연결될 수 있

    westlife0615.tistory.com

     

    아래와 같이 여러 개의 Network Thread 들을 확인할 수 있습니다. 

    root@kafka1:/# jstack 1 | grep network-thread
    
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-0" #61 prio=5 os_prio=0 cpu=13442.62ms elapsed=55177.85s tid=0x00007ffffac15800 nid=0x259 runnable  [0x00007fffc86b2000]
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-1" #62 prio=5 os_prio=0 cpu=48488.11ms elapsed=55177.85s tid=0x00007ffffac2c800 nid=0x25a runnable  [0x00007fffc85b1000]
    "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-2" #63 prio=5 os_prio=0 cpu=43328.34ms elapsed=55177.85s tid=0x00007ffffac2e000 nid=0x25c runnable  [0x00007fffc3ffe000]

     

    그리고 Network Thread 는 내부적인 Queue 를 통해서 IO Thread 에게 데이터 처리 요청을 넘깁니다.

    IO Thread, Worker Thread, Request Handler Thread 등의 여러 이름이 있는거 같구요.

    실제 Thread 의 이름은 Request Handler 로 표시됩니다.

     

    root@kafka1:/# jstack 1 | grep request-handler
    "data-plane-kafka-request-handler-0" #51 daemon prio=5 os_prio=0 cpu=9828.68ms elapsed=55332.69s tid=0x00007ffffac04800 nid=0x250 waiting on condition  [0x00007fffc90df000]
    "data-plane-kafka-request-handler-1" #52 daemon prio=5 os_prio=0 cpu=9644.36ms elapsed=55332.69s tid=0x00007ffffac06800 nid=0x251 waiting on condition  [0x00007fffc8fde000]
    "data-plane-kafka-request-handler-2" #53 daemon prio=5 os_prio=0 cpu=9806.85ms elapsed=55332.69s tid=0x00007ffffac08800 nid=0x252 waiting on condition  [0x00007fffc8edd000]
    "data-plane-kafka-request-handler-3" #54 daemon prio=5 os_prio=0 cpu=9965.04ms elapsed=55332.69s tid=0x00007ffffac0a000 nid=0x253 waiting on condition  [0x00007fffc8ddc000]
    "data-plane-kafka-request-handler-4" #55 daemon prio=5 os_prio=0 cpu=10699.56ms elapsed=55332.69s tid=0x00007ffffac0c000 nid=0x254 waiting on condition  [0x00007fffc8cdb000]
    "data-plane-kafka-request-handler-5" #56 daemon prio=5 os_prio=0 cpu=9302.87ms elapsed=55332.69s tid=0x00007ffffac0e000 nid=0x255 waiting on condition  [0x00007fffc8bda000]
    "data-plane-kafka-request-handler-6" #57 daemon prio=5 os_prio=0 cpu=9338.56ms elapsed=55332.69s tid=0x00007ffffac10000 nid=0x256 waiting on condition  [0x00007fffc8ad9000]
    "data-plane-kafka-request-handler-7" #58 daemon prio=5 os_prio=0 cpu=9922.94ms elapsed=55332.69s tid=0x00007ffffac12800 nid=0x257 waiting on condition  [0x00007fffc89d8000]

     

     

    즉, 위 상황을 요약해보면 클라이언트가 Fetch Request 를 통해서 브로커에게 데이터를 요청하게 되면,

    브로커는 내부적으로 Request Handler Thread 가 실질적인 데이터 처리를 수행합니다.

    Topic/Partition 의 Log Segment File 를 읽어들여서 Client 에게 제공할 수 있습니다.

    이 과정에서 max.wait.ms 와 fetch.min.bytes 를 고려한 데이터 처리를 수행합니다.

     

     

    TCP 관점에서 max.wait.ms 살펴보기

    아래의 2개의 Packet 들은 각각 Fetch Request 와 Response 의 Packet 입니다.

    첫번째 packet 의 0x0038, 0x0039 번 바이트들이 0001 로 표시되는데, 0001 이 바로 Fetch API 의 API Key 입니다.

    그리고 0x003e, 0x003f 에 해당하는 000a 가 Correlation ID 에 해당합니다.

     

    두번째 응답 패킷은 첫번째 Packet 의 응답 패킷입니다.

    요청 Packet 의 Seq 와 응답 Packet 의 Ack 가 동일함을 확인할 수 있습니다.

    둘의 Correlation ID 도 일치합니다.

    응답 Packet 의 0x003a, 0x003b 의 000a 가 응답 패킷의 Correlation ID 에 해당합니다.

    21:47:30.677496 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 760:872, ack 1142, win 512, options [nop,nop,TS val 2791324763 ecr 1983801789], length 112
    	0x0000:  4500 00a4 e218 4000 4006 0011 ac13 0001  E.....@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 0114 d359 6879  ......J......Yhy
    	0x0020:  8018 0200 58c1 0000 0101 080a a660 3c5b  ....X........`<[
    	0x0030:  763e 69bd 0000 006c 0001 000d 0000 000a  v>i....l........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 00ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
        
        
        
    21:47:40.680957 IP kafka1.19092 > 172.19.0.1.61362: Flags [P.], seq 1142:1218, ack 872, win 50470, options [nop,nop,TS val 1983811796 ecr 2791324763], length 76
    	0x0000:  4500 0080 e6e8 4000 4006 fb64 ac13 0003  E.....@.@..d....
    	0x0010:  ac13 0001 4a94 efb2 d359 6879 8181 0184  ....J....Yhy....
    	0x0020:  8018 c526 589d 0000 0101 080a 763e 90d4  ...&X.......v>..
    	0x0030:  a660 3c5b 0000 0048 0000 000a 0000 0000  .`<[...H........
    	0x0040:  0000 0000 0000 0002 a5b6 e97f d812 4015  ..............@.
    	0x0050:  8519 da1f 397e 58e5 0200 0000 0100 0000  ....9~X.........
    	0x0060:  0000 0000 0000 0000 0000 0000 0000 0000  ................
    	0x0070:  0000 0000 0000 0001 ffff ffff 0100 0000  ................

     

    비어있는 Topic 을 대상으로 Fetch 를 시도하였고, fetch.wait.max.ms 는 10 초로 설정하였습니다.

    위 패킷의 상세한 내용들보다 요청과 응답의 시간 간격이 10초가 발생합니다.

     

    from confluent_kafka import Consumer, KafkaError
    
    config = {
        'client.id': 'this_is_my_client',
        'bootstrap.servers': 'localhost:19092',
        'group.id': 'my-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'fetch.wait.max.ms': 10000
    }
    
    consumer = Consumer(config)
    consumer.subscribe(['test-topic'])
    msg = consumer.poll()

     

     

     

    비어있는 Topic 에 Polling.

    만약 max wait ms 가 10초인 상황에서 비어있는 Topic 에 Polling 요청을 하게 되면 아래와 같이 10초를 주기로 요청을 반복합니다.

    21:56:28.575803 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 2172721511:2172721623, ack 3545856776, win 512, options [nop,nop,TS val 2791862668 ecr 1984339695], length 112
    	0x0000:  4500 00a4 e254 4000 4006 ffd4 ac13 0001  E....T@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 1967 d359 7b08  ......J....g.Y{.
    	0x0020:  8018 0200 58c1 0000 0101 080a a668 718c  ....X........hq.
    	0x0030:  7646 9eef 0000 006c 0001 000d 0000 0043  vF.....l.......C
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 01ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    21:56:38.579599 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 112:224, ack 77, win 512, options [nop,nop,TS val 2791872672 ecr 1984349700], length 112
    	0x0000:  4500 00a4 e255 4000 4006 ffd3 ac13 0001  E....U@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 19d7 d359 7b54  ......J......Y{T
    	0x0020:  8018 0200 58c1 0000 0101 080a a668 98a0  ....X........h..
    	0x0030:  7646 c604 0000 006c 0001 000d 0000 0044  vF.....l.......D
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 01ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    21:56:48.586913 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 224:336, ack 153, win 512, options [nop,nop,TS val 2791882680 ecr 1984359706], length 112
    	0x0000:  4500 00a4 e256 4000 4006 ffd2 ac13 0001  E....V@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 1a47 d359 7ba0  ......J....G.Y{.
    	0x0020:  8018 0200 58c1 0000 0101 080a a668 bfb8  ....X........h..
    	0x0030:  7646 ed1a 0000 006c 0001 000d 0000 0045  vF.....l.......E
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 01ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    21:56:58.592980 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 336:448, ack 229, win 512, options [nop,nop,TS val 2791892686 ecr 1984369711], length 112
    	0x0000:  4500 00a4 e257 4000 4006 ffd1 ac13 0001  E....W@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 1ab7 d359 7bec  ......J......Y{.
    	0x0020:  8018 0200 58c1 0000 0101 080a a668 e6ce  ....X........h..
    	0x0030:  7647 142f 0000 006c 0001 000d 0000 0046  vG./...l.......F
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 01ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100

     

     

    데이터가 충분한 Topic 에 Polling.

    만약 데이터가 충분한 상황에서 Polling 요청이 이루어지면 아래와 같이 Request-Response 의 경과 시간이 짧아지고 요청 주기도 짧아지게 됩니다.

     

    22:07:40.724444 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 336:448, ack 536, win 512, options [nop,nop,TS val 2792534825 ecr 1985011854], length 112
    	0x0000:  4500 00a4 e29d 4000 4006 ff8b ac13 0001  E.....@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 3767 d359 9174  ......J...7g.Y.t
    	0x0020:  8018 0200 58c1 0000 0101 080a a672 b329  ....X........r.)
    	0x0030:  7650 e08e 0000 006c 0001 000d 0000 0088  vP.....l........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 08ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    22:07:43.054687 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 448:560, ack 745, win 512, options [nop,nop,TS val 2792537156 ecr 1985014183], length 112
    	0x0000:  4500 00a4 e29f 4000 4006 ff89 ac13 0001  E.....@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 37d7 d359 9245  ......J...7..Y.E
    	0x0020:  8018 0200 58c1 0000 0101 080a a672 bc44  ....X........r.D
    	0x0030:  7650 e9a7 0000 006c 0001 000d 0000 0089  vP.....l........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 0cff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    22:07:45.319852 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 560:672, ack 954, win 512, options [nop,nop,TS val 2792539421 ecr 1985016449], length 112
    	0x0000:  4500 00a4 e2a1 4000 4006 ff87 ac13 0001  E.....@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 3847 d359 9316  ......J...8G.Y..
    	0x0020:  8018 0200 58c1 0000 0101 080a a672 c51d  ....X........r..
    	0x0030:  7650 f281 0000 006c 0001 000d 0000 008a  vP.....l........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 10ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....
    22:07:47.484937 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 672:784, ack 1108, win 512, options [nop,nop,TS val 2792541586 ecr 1985018613], length 112
    	0x0000:  4500 00a4 e2a3 4000 4006 ff85 ac13 0001  E.....@.@.......
    	0x0010:  ac13 0003 efb2 4a94 8181 38b7 d359 93b0  ......J...8..Y..
    	0x0020:  8018 0200 58c1 0000 0101 080a a672 cd92  ....X........r..
    	0x0030:  7650 faf5 0000 006c 0001 000d 0000 008b  vP.....l........
    	0x0040:  0011 7468 6973 5f69 735f 6d79 5f63 6c69  ..this_is_my_cli
    	0x0050:  656e 7400 ffff ffff 0000 2710 0000 0001  ent.......'.....
    	0x0060:  0320 0000 0100 0000 00ff ffff ff02 a5b6  ................
    	0x0070:  e97f d812 4015 8519 da1f 397e 58e5 0200  ....@.....9~X...
    	0x0080:  0000 0100 0000 0000 0000 0000 0000 11ff  ................
    	0x0090:  ffff ffff ffff ffff ffff ff00 1000 0000  ................
    	0x00a0:  0001 0100                                ....

     

    반응형
Designed by Tistory.