-
[Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms )Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
카프카 컨슈머는 브로커에게 데이터를 요청할 수 있습니다.
일반적인 카프카 클라이언트 모듈에서 Poll 과 같은 함수로 이러한 데이터 요청 기능이 추상화되어 있습니다.
Poll 함수 내부적으로는 Fetch API 가 사용되구요.
Fetch API 의 요청과 응답의 형식은 아래와 같습니다.
Fetch Request (Version: 1) => replica_id max_wait_ms min_bytes [topics] replica_id => INT32 max_wait_ms => INT32 min_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset partition_max_bytes partition => INT32 fetch_offset => INT64 partition_max_bytes => INT32 Fetch Response (Version: 0) => [responses] responses => topic [partitions] topic => STRING partitions => partition_index error_code high_watermark records partition_index => INT32 error_code => INT16 high_watermark => INT64 records => RECORDS
저희가 주목할 사항은 Fetch Request 의 포맷 중에서 이번 글의 주제인 max wait ms 와 min bytes 입니다.
max.wait.ms 란 ?
max.wait.ms 는 컨슈머가 브로커로부터 데이터를 전달받는 최대 시간입니다.
즉, 컨슈머는 max.wait.ms 시간 내에 Fetch Response 를 전달받도록 보장되는 시간입니다.
약간의 시간적 오차는 있을 수 있겠지만, 1회의 Poll 요청 / 1회의 Fetch Request 는 max.wait.ms 시간 이내에 응답이 됩니다.
max.wait.ms 의 기본값은 500ms 라고 하네요.
fetch.min.bytes 란 ?
카프카 컨슈머의 데이터를 폴링하는 과정에서 사용되는 또 다른 설정인 fetch.min.bytes 는 max.wait.ms 와 함께 사용됩니다.
fetch.min.bytes 는 컨슈머가 브로커에게 요청하는 최소한의 데이터 양입니다.
fetch.min.bytes 의 기본값은 16kb 로 컨슈머는 1회의 요청에서 최소 16kb 의 데이터를 획득하기를 기대합니다.
여기서 max.wait.ms 와 fetch.min.bytes 가 함께 조합되어서 사용됩니다.
만약 Topic 내부에 데이터가 없거나 fetch.min.bytes 보다 적은 상태라면 컨슈머는 항상 max.wait.ms 시간 후에 브로커의 응답을 받습니다.
하지만 Topic 의 데이터가 fetch.min.bytes 의 양보다 많은 상황이라면 max.wait.ms 만큼 브로커의 응답을 기다리지 않습니다.
Request Handler Thread 란 ?
Kafka Broker 는 클라이언트의 컨슈머의 Fetch Request 를 처리하기 위해서 내부적으로 여러 쓰레드 구조를 가집니다.
아래의 이미지는 Kafka Broker 의 클라이언트의 요청 처리를 위한 쓰레드 구조인데요.
아래의 IO Threads 가 Request Handler Thread 를 의미합니다.
출처 : https://www.confluent.io/blog/kafka-producer-internals-handling-producer-request/ 카프카 브로커의 네트워크 및 쓰레드 구조를 간단히 설명하겠습니다.
먼저 클라이언트의 Connection 요청을 수락하기 위한 Acceptor Listener Thread 가 존재합니다.
이는 jstack 이나 ps 명령어를 통해서 아래와 같은 이름으로 확인되구요.
제가 설정한 Broker 의 19092 Port 와 Protocol 이 Suffix 로 붙습니다.
data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092
이렇게 생성된 Acceptor Listener Thread 는 클라이언트의 요청을 받아서 3 ways handshake 과정을 수행하고 실질적인 Connection 을 생성합니다.
그리고 이렇게 생성된 Thread 는 Network Threads 에서 처리합니다.
이 Network Threads 는 num.network.threads 설정을 통해서 그 갯수를 지정할 수 있습니다.
https://westlife0615.tistory.com/923
[Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기
- 목차 들어가며.Kafka Broker 는 advertised.listeners 와 같은 설정값이 존재합니다.advertised.listeners 는 카프카 브로커가 외부 클라이언트에게 자신의 IP 혹은 Domain 그리고 Port 를 노출하여 연결될 수 있
westlife0615.tistory.com
아래와 같이 여러 개의 Network Thread 들을 확인할 수 있습니다.
root@kafka1:/# jstack 1 | grep network-thread "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-0" #61 prio=5 os_prio=0 cpu=13442.62ms elapsed=55177.85s tid=0x00007ffffac15800 nid=0x259 runnable [0x00007fffc86b2000] "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-1" #62 prio=5 os_prio=0 cpu=48488.11ms elapsed=55177.85s tid=0x00007ffffac2c800 nid=0x25a runnable [0x00007fffc85b1000] "data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-2" #63 prio=5 os_prio=0 cpu=43328.34ms elapsed=55177.85s tid=0x00007ffffac2e000 nid=0x25c runnable [0x00007fffc3ffe000]
그리고 Network Thread 는 내부적인 Queue 를 통해서 IO Thread 에게 데이터 처리 요청을 넘깁니다.
IO Thread, Worker Thread, Request Handler Thread 등의 여러 이름이 있는거 같구요.
실제 Thread 의 이름은 Request Handler 로 표시됩니다.
root@kafka1:/# jstack 1 | grep request-handler "data-plane-kafka-request-handler-0" #51 daemon prio=5 os_prio=0 cpu=9828.68ms elapsed=55332.69s tid=0x00007ffffac04800 nid=0x250 waiting on condition [0x00007fffc90df000] "data-plane-kafka-request-handler-1" #52 daemon prio=5 os_prio=0 cpu=9644.36ms elapsed=55332.69s tid=0x00007ffffac06800 nid=0x251 waiting on condition [0x00007fffc8fde000] "data-plane-kafka-request-handler-2" #53 daemon prio=5 os_prio=0 cpu=9806.85ms elapsed=55332.69s tid=0x00007ffffac08800 nid=0x252 waiting on condition [0x00007fffc8edd000] "data-plane-kafka-request-handler-3" #54 daemon prio=5 os_prio=0 cpu=9965.04ms elapsed=55332.69s tid=0x00007ffffac0a000 nid=0x253 waiting on condition [0x00007fffc8ddc000] "data-plane-kafka-request-handler-4" #55 daemon prio=5 os_prio=0 cpu=10699.56ms elapsed=55332.69s tid=0x00007ffffac0c000 nid=0x254 waiting on condition [0x00007fffc8cdb000] "data-plane-kafka-request-handler-5" #56 daemon prio=5 os_prio=0 cpu=9302.87ms elapsed=55332.69s tid=0x00007ffffac0e000 nid=0x255 waiting on condition [0x00007fffc8bda000] "data-plane-kafka-request-handler-6" #57 daemon prio=5 os_prio=0 cpu=9338.56ms elapsed=55332.69s tid=0x00007ffffac10000 nid=0x256 waiting on condition [0x00007fffc8ad9000] "data-plane-kafka-request-handler-7" #58 daemon prio=5 os_prio=0 cpu=9922.94ms elapsed=55332.69s tid=0x00007ffffac12800 nid=0x257 waiting on condition [0x00007fffc89d8000]
즉, 위 상황을 요약해보면 클라이언트가 Fetch Request 를 통해서 브로커에게 데이터를 요청하게 되면,
브로커는 내부적으로 Request Handler Thread 가 실질적인 데이터 처리를 수행합니다.
Topic/Partition 의 Log Segment File 를 읽어들여서 Client 에게 제공할 수 있습니다.
이 과정에서 max.wait.ms 와 fetch.min.bytes 를 고려한 데이터 처리를 수행합니다.
TCP 관점에서 max.wait.ms 살펴보기
아래의 2개의 Packet 들은 각각 Fetch Request 와 Response 의 Packet 입니다.
첫번째 packet 의 0x0038, 0x0039 번 바이트들이 0001 로 표시되는데, 0001 이 바로 Fetch API 의 API Key 입니다.
그리고 0x003e, 0x003f 에 해당하는 000a 가 Correlation ID 에 해당합니다.
두번째 응답 패킷은 첫번째 Packet 의 응답 패킷입니다.
요청 Packet 의 Seq 와 응답 Packet 의 Ack 가 동일함을 확인할 수 있습니다.
둘의 Correlation ID 도 일치합니다.
응답 Packet 의 0x003a, 0x003b 의 000a 가 응답 패킷의 Correlation ID 에 해당합니다.
21:47:30.677496 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 760:872, ack 1142, win 512, options [nop,nop,TS val 2791324763 ecr 1983801789], length 112 0x0000: 4500 00a4 e218 4000 4006 0011 ac13 0001 E.....@.@....... 0x0010: ac13 0003 efb2 4a94 8181 0114 d359 6879 ......J......Yhy 0x0020: 8018 0200 58c1 0000 0101 080a a660 3c5b ....X........`<[ 0x0030: 763e 69bd 0000 006c 0001 000d 0000 000a v>i....l........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 00ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 21:47:40.680957 IP kafka1.19092 > 172.19.0.1.61362: Flags [P.], seq 1142:1218, ack 872, win 50470, options [nop,nop,TS val 1983811796 ecr 2791324763], length 76 0x0000: 4500 0080 e6e8 4000 4006 fb64 ac13 0003 E.....@.@..d.... 0x0010: ac13 0001 4a94 efb2 d359 6879 8181 0184 ....J....Yhy.... 0x0020: 8018 c526 589d 0000 0101 080a 763e 90d4 ...&X.......v>.. 0x0030: a660 3c5b 0000 0048 0000 000a 0000 0000 .`<[...H........ 0x0040: 0000 0000 0000 0002 a5b6 e97f d812 4015 ..............@. 0x0050: 8519 da1f 397e 58e5 0200 0000 0100 0000 ....9~X......... 0x0060: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 0x0070: 0000 0000 0000 0001 ffff ffff 0100 0000 ................
비어있는 Topic 을 대상으로 Fetch 를 시도하였고, fetch.wait.max.ms 는 10 초로 설정하였습니다.
위 패킷의 상세한 내용들보다 요청과 응답의 시간 간격이 10초가 발생합니다.
from confluent_kafka import Consumer, KafkaError config = { 'client.id': 'this_is_my_client', 'bootstrap.servers': 'localhost:19092', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'fetch.wait.max.ms': 10000 } consumer = Consumer(config) consumer.subscribe(['test-topic']) msg = consumer.poll()
비어있는 Topic 에 Polling.
만약 max wait ms 가 10초인 상황에서 비어있는 Topic 에 Polling 요청을 하게 되면 아래와 같이 10초를 주기로 요청을 반복합니다.
21:56:28.575803 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 2172721511:2172721623, ack 3545856776, win 512, options [nop,nop,TS val 2791862668 ecr 1984339695], length 112 0x0000: 4500 00a4 e254 4000 4006 ffd4 ac13 0001 E....T@.@....... 0x0010: ac13 0003 efb2 4a94 8181 1967 d359 7b08 ......J....g.Y{. 0x0020: 8018 0200 58c1 0000 0101 080a a668 718c ....X........hq. 0x0030: 7646 9eef 0000 006c 0001 000d 0000 0043 vF.....l.......C 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 01ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 21:56:38.579599 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 112:224, ack 77, win 512, options [nop,nop,TS val 2791872672 ecr 1984349700], length 112 0x0000: 4500 00a4 e255 4000 4006 ffd3 ac13 0001 E....U@.@....... 0x0010: ac13 0003 efb2 4a94 8181 19d7 d359 7b54 ......J......Y{T 0x0020: 8018 0200 58c1 0000 0101 080a a668 98a0 ....X........h.. 0x0030: 7646 c604 0000 006c 0001 000d 0000 0044 vF.....l.......D 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 01ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 21:56:48.586913 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 224:336, ack 153, win 512, options [nop,nop,TS val 2791882680 ecr 1984359706], length 112 0x0000: 4500 00a4 e256 4000 4006 ffd2 ac13 0001 E....V@.@....... 0x0010: ac13 0003 efb2 4a94 8181 1a47 d359 7ba0 ......J....G.Y{. 0x0020: 8018 0200 58c1 0000 0101 080a a668 bfb8 ....X........h.. 0x0030: 7646 ed1a 0000 006c 0001 000d 0000 0045 vF.....l.......E 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 01ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 21:56:58.592980 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 336:448, ack 229, win 512, options [nop,nop,TS val 2791892686 ecr 1984369711], length 112 0x0000: 4500 00a4 e257 4000 4006 ffd1 ac13 0001 E....W@.@....... 0x0010: ac13 0003 efb2 4a94 8181 1ab7 d359 7bec ......J......Y{. 0x0020: 8018 0200 58c1 0000 0101 080a a668 e6ce ....X........h.. 0x0030: 7647 142f 0000 006c 0001 000d 0000 0046 vG./...l.......F 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 01ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100
데이터가 충분한 Topic 에 Polling.
만약 데이터가 충분한 상황에서 Polling 요청이 이루어지면 아래와 같이 Request-Response 의 경과 시간이 짧아지고 요청 주기도 짧아지게 됩니다.
22:07:40.724444 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 336:448, ack 536, win 512, options [nop,nop,TS val 2792534825 ecr 1985011854], length 112 0x0000: 4500 00a4 e29d 4000 4006 ff8b ac13 0001 E.....@.@....... 0x0010: ac13 0003 efb2 4a94 8181 3767 d359 9174 ......J...7g.Y.t 0x0020: 8018 0200 58c1 0000 0101 080a a672 b329 ....X........r.) 0x0030: 7650 e08e 0000 006c 0001 000d 0000 0088 vP.....l........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 08ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 22:07:43.054687 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 448:560, ack 745, win 512, options [nop,nop,TS val 2792537156 ecr 1985014183], length 112 0x0000: 4500 00a4 e29f 4000 4006 ff89 ac13 0001 E.....@.@....... 0x0010: ac13 0003 efb2 4a94 8181 37d7 d359 9245 ......J...7..Y.E 0x0020: 8018 0200 58c1 0000 0101 080a a672 bc44 ....X........r.D 0x0030: 7650 e9a7 0000 006c 0001 000d 0000 0089 vP.....l........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 0cff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 22:07:45.319852 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 560:672, ack 954, win 512, options [nop,nop,TS val 2792539421 ecr 1985016449], length 112 0x0000: 4500 00a4 e2a1 4000 4006 ff87 ac13 0001 E.....@.@....... 0x0010: ac13 0003 efb2 4a94 8181 3847 d359 9316 ......J...8G.Y.. 0x0020: 8018 0200 58c1 0000 0101 080a a672 c51d ....X........r.. 0x0030: 7650 f281 0000 006c 0001 000d 0000 008a vP.....l........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 10ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 .... 22:07:47.484937 IP 172.19.0.1.61362 > kafka1.19092: Flags [P.], seq 672:784, ack 1108, win 512, options [nop,nop,TS val 2792541586 ecr 1985018613], length 112 0x0000: 4500 00a4 e2a3 4000 4006 ff85 ac13 0001 E.....@.@....... 0x0010: ac13 0003 efb2 4a94 8181 38b7 d359 93b0 ......J...8..Y.. 0x0020: 8018 0200 58c1 0000 0101 080a a672 cd92 ....X........r.. 0x0030: 7650 faf5 0000 006c 0001 000d 0000 008b vP.....l........ 0x0040: 0011 7468 6973 5f69 735f 6d79 5f63 6c69 ..this_is_my_cli 0x0050: 656e 7400 ffff ffff 0000 2710 0000 0001 ent.......'..... 0x0060: 0320 0000 0100 0000 00ff ffff ff02 a5b6 ................ 0x0070: e97f d812 4015 8519 da1f 397e 58e5 0200 ....@.....9~X... 0x0080: 0000 0100 0000 0000 0000 0000 0000 11ff ................ 0x0090: ffff ffff ffff ffff ffff ff00 1000 0000 ................ 0x00a0: 0001 0100 ....
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] SyncGroup API 알아보기 (0) 2024.06.30 [Kafka] Consumer 는 JoinGroup API 를 어떻게 요청할까 ? (0) 2024.06.30 [Kafka] FindCoordinator API 와 Group Coordinator 알아보기 (0) 2024.06.29