-
[Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기Kafka/Kafka Consumer 2024. 6. 30. 14:56반응형
- 목차
들어가며.
카프카 컨슈머는 max.poll.records 설정을 통해서 Consumer Poll 함수를 통해서 반환되는 레코드의 갯수를 제한할 수 있습니다.
예를 들어서 아래와 같은 형식으로 획득한 ConsumerRecords 를 최대 500개의 Record 들을 가지게 됩니다.
Properties props = new Properties(); props.put("max.poll.records", 500); Consumer<String, String> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); System.out.println("Fetched Records: " + records.count());
하지만 실제 Kafka Consumer 가 Broker 에게 데이터를 요청하는 Fetch API 는 max.poll.records 의 정보는 제공되지 않습니다.
왜냐하며 카프카의 Fetch API 는 min-max bytes 의 용량를 기준으로 데이터를 교환하지 갯수를 기반으로 통신이 이루어지지 않습니다.
그래서 실제로는 max.poll.records 보다 더 많은 양의 데이터가 실제로 Consumer 에게 전달됩니다.
카프카 컨슈머 내부적으로 캐싱하고 있는 데이터의 갯수와 Poll Function 을 통해서 처리 중인 데이터의 값이 일치하지 않으며,
버퍼링되어 있는 일부 데이터가 항상 존재합니다.
따라서 Consumer Offset 의 관리와 Consumer 의 갑작스런 종료 시에 캐싱된 데이터의 유실에 대해서 항상 고려해야합니다.
이어지는 글에서 max.poll.records 와 관련된 여러가지 테스트와 실험을 진행해보도록 하겠습니다.
Fetch API ?
카프카 컨슈머는 Fetch API 를 활용하여 리더 브로커로부터 데이터를 획득합니다.
우선 Fetch API 의 포맷에 대해서 알아봅니다.
아래는 Version 0 인 초기버전의 Fetch API Format 입니다.
Fetch Request (Version: 0) => replica_id max_wait_ms min_bytes [topics] replica_id => INT32 max_wait_ms => INT32 min_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset partition_max_bytes partition => INT32 fetch_offset => INT64 partition_max_bytes => INT32
- replica_id
- replica_id 는 브로커의 식별값인 Broker ID 를 의미합니다.
- 일반적으로 1, 2, 3 과 같은 값이 사용되며, 사용자가 직접 지정하는 값입니다.
- max_wait_ms
- max_wait_ms 는 컨슈머가 브로커로부터 데이터를 응답받을 수 있는 최대 시간입니다.
- 이는 내부적으로 리더 브로커가 min_bytes (fetch.min.bytes) 만큼의 데이터를 모아서 컨슈머에게 전송합니다.
- 만약 Log Segment File 에 충분한 데이터가 없다면 max_wait_ms 만큼 데이터가 쌓이길 기다린 후 컨슈머에게 데이터를 전달합니다.
- min_bytes
- max_wait_ms 와 함께 사용되는 컨슈머의 속성입니다.
- 리더 브로커는 min_bytes 만큼의 데이터를 컨슈머에게 제공하기를 보장합니다.
- 하지만 min_bytes 만큼의 데이터가 없다면, max_wait_ms 이후에 부족한 양의 데이터라도 응답합니다.
위와 같은 형식으로 컨슈머는 브로커에게 데이터를 요청할 수 있습니다.
이 단계에서 사용되는 API 가 바로 Fetch API 입니다.
아래의 포맷은 Fetch Response 의 포맷입니다.
아래의 포맷은 직관적으로 이해가 가능할 것 같아서 상세한 설명은 생략하도록 하겠습니다.
Fetch Response (Version: 0) => [responses] responses => topic [partitions] topic => STRING partitions => partition_index error_code high_watermark records partition_index => INT32 error_code => INT16 high_watermark => INT64 records => RECORDS
max.poll.records
max.poll.records 에 대해서 알아보기 이전에,우선 아래의 명령어를 통해서 Topic 와 Record 를 생성합니다.
"test-topic" 이라는 이름의 토픽과 10개의 레코드가 생성됩니다.
kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --create --partitions 1 --replication-factor 1 touch /tmp/text.txt for i in {1..10}; do echo record_${i}_end >> /tmp/text.txt; done; cat /tmp/text.txt | kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic test-topic
그리고 아래와 같이 Consumer 를 생성하고, 1회 Polling 을 수행합니다.
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "kafka1:9091"); props.put("group.id", "my-consumer-group"); props.put("max.poll.records", 5); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); System.out.println("Kafka Consumer started. Waiting for messages..."); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); System.out.println("Fetched Records: " + records.count()); consumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } }
아래는 Leader Broker 로부터 Record 를 전달받은 TCP Packet 입니다.
아래 Packet 의 Payload 부분을 확인해보면 record 1 ~ record 10 까지의 데이터가 모두 조회됨을 확인할 수 있습니다.
실제 max.poll.records 의 값과는 별개로 Fetch API 를 통해서 가능한 많은 양의 데이터를 획득하여,
Consumer 의 내부 메모리 영역에 캐싱해두게 됩니다.
04:51:01.574972 IP kafka1.9091 > competent_golick.kafka-net.43386: Flags [P.], seq 569:821, ack 277, win 50542, options [nop,nop,TS val 4176800674 ecr 1571073923], length 252 0x0000: 4500 0130 b3db 4000 4006 2dbf ac13 0003 E..0..@.@.-..... 0x0010: ac13 0004 2383 a97a b60f 9f60 f70f cdc3 ....#..z...`.... 0x0020: 8018 c56e 5950 0000 0101 080a f8f4 eba2 ...nYP.......... 0x0030: 5da4 af83 0000 0000 0000 0000 0000 00f0 ]............... 0x0040: 0000 0000 02e9 d4a9 b500 0000 0000 0900 ................ 0x0050: 0001 9540 97f2 b900 0001 9540 97f2 cc00 ...@.......@.... 0x0060: 0000 0000 0000 0300 0000 0000 0000 0000 ................ 0x0070: 0a24 0000 0001 1872 6563 6f72 645f 315f .$.....record_1_ 0x0080: 656e 6400 2400 2402 0118 7265 636f 7264 end.$.$...record 0x0090: 5f32 5f65 6e64 0024 0026 0401 1872 6563 _2_end.$.&...rec 0x00a0: 6f72 645f 335f 656e 6400 2400 2606 0118 ord_3_end.$.&... 0x00b0: 7265 636f 7264 5f34 5f65 6e64 0024 0026 record_4_end.$.& 0x00c0: 0801 1872 6563 6f72 645f 355f 656e 6400 ...record_5_end. 0x00d0: 2400 260a 0118 7265 636f 7264 5f36 5f65 $.&...record_6_e 0x00e0: 6e64 0024 0026 0c01 1872 6563 6f72 645f nd.$.&...record_ 0x00f0: 375f 656e 6400 2400 260e 0118 7265 636f 7_end.$.&...reco 0x0100: 7264 5f38 5f65 6e64 0024 0026 1001 1872 rd_8_end.$.&...r 0x0110: 6563 6f72 645f 395f 656e 6400 2600 2612 ecord_9_end.&.&. 0x0120: 011a 7265 636f 7264 5f31 305f 656e 6400 ..record_10_end.
그리고 아래의 Packet 은 Consumer 의 commitSync 함수에 의해서 요청되는 CommitOffset API Request 입니다.
0x0038 ~ 0x0039 번째 바이트의 값인 "0008" 이 CommitOffset API 의 Key 에 해당하는 값이구요.
0x00d6 의 바이트에 작성된 5 가 실제 max.poll.records 에 의해서 조회한 레코드의 Offset 에 해당합니다.
여기서 중요한 점은 실제로 Fetch API 를 통해서 조회한 레코드는 0 ~ 9 Offset 에 해당하는 모든 레코드를 다 조회하였습니다.
하지만 이는 카프카 컨슈머의 내부 캐싱 영역에서 보관되며, 실제 데이터가 처리되는 영역에서는 0 ~ 4 Offset 의 레코드들이 처리됩니다.
즉, 캐싱 영역과 처리 영역에서 관찰되는 데이터의 갯수가 다르다는 점입니다.
Commit Offset API 의 경우에는 캐싱 영역의 Offset 이 아니라 처리 영역에서의 Offset 을 기준으로 동작하게 됩니다.
04:51:01.585952 IP competent_golick.kafka-net.43370 > kafka1.9091: Flags [P.], seq 845:1016, ack 941, win 32044, options [nop,nop,TS val 1571073939 ecr 4176800656], length 171 0x0000: 4500 00df 3216 4000 4006 afd5 ac13 0004 E...2.@.@....... 0x0010: ac13 0003 a96a 2383 c4c3 3746 57d9 ffe5 .....j#...7FW... 0x0020: 8018 7d2c 58ff 0000 0101 080a 5da4 af93 ..},X.......]... 0x0030: f8f4 eb90 0000 00a7 0008 0008 0000 000b ................ 0x0040: 001e 636f 6e73 756d 6572 2d6d 792d 636f ..consumer-my-co 0x0050: 6e73 756d 6572 2d67 726f 7570 3232 2d31 nsumer-group22-1 0x0060: 0014 6d79 2d63 6f6e 7375 6d65 722d 6772 ..my-consumer-gr 0x0070: 6f75 7032 3200 0000 0144 636f 6e73 756d oup22....Dconsum 0x0080: 6572 2d6d 792d 636f 6e73 756d 6572 2d67 er-my-consumer-g 0x0090: 726f 7570 3232 2d31 2d39 6466 6362 3239 roup22-1-9dfcb29 0x00a0: 632d 3330 3937 2d34 6339 662d 6261 3965 c-3097-4c9f-ba9e 0x00b0: 2d35 3434 6135 3165 3965 6664 3700 020b -544a51e9efd7... 0x00c0: 7465 7374 2d74 6f70 6963 0200 0000 0000 test-topic...... 0x00d0: 0000 0000 0000 0500 0000 0001 0000 00 ...............
많은 수의 레코드로 테스트.
이번에는 1만개의 레코드가 존재하는 Topic 과 max.poll.records 의 값을 500 으로 설정합니다.
kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --create --partitions 1 --replication-factor 1 touch /tmp/text.txt for i in {1..10000}; do echo record_${i}_end >> /tmp/text.txt; done; cat /tmp/text.txt | kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic test-topic
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "kafka1:9091"); props.put("group.id", "my-consumer-group"); props.put("max.poll.records", 5); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); System.out.println("Kafka Consumer started. Waiting for messages..."); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); System.out.println("Fetched Records: " + records.count()); consumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } }
max.poll.records 를 500으로 설정하더라도 아래와 같이 1만개의 레코드들이 한번에 Consumer 로 전달됩니다.
그리고 Consumer 는 캐싱 영역에 1만개의 레코드를 보유하고 있지만, 데이터 처리 영역으로 500개의 레코드만을 전달하는 구조입니다.
04:27:52.194110 IP kafka1.9091 > lucid_kapitsa.kafka-net.48536: Flags [P.], seq 570:7810, ack 286, win 50538, options [nop,nop,TS val 4175411249 ecr 1569684501], length 7240 0x0000: 4500 1c7c 7628 4000 4006 5026 ac13 0003 E..|v(@.@.P&.... 0x0010: ac13 0004 2383 bd98 4c20 6e50 ceeb 017b ....#...L.nP...{ 0x0020: 8018 c56a 749c 0000 0101 080a f8df b831 ...jt..........1 0x0030: 5d8f 7c15 0000 0000 0000 0000 0000 3fe3 ].|...........?. 0x0040: 0000 0000 02a9 1064 7300 0000 0002 ec00 .......ds....... 0x0050: 0001 9540 81c9 3a00 0001 9540 81c9 6900 ...@..:....@..i. 0x0060: 0000 0000 0000 0000 0000 0000 0000 0002 ................ 0x0070: ed24 0000 0001 1872 6563 6f72 645f 315f .$.....record_1_ 0x0080: 656e 6400 2400 3002 0118 7265 636f 7264 end.$.0...record 0x0090: 5f32 5f65 6e64 0024 0030 0401 1872 6563 _2_end.$.0...rec 0x00a0: 6f72 645f 335f 656e 6400 2400 3006 0118 ord_3_end.$.0... // ... 중략... 0x6390: 7265 636f 7264 5f39 3939 355f 656e 6400 record_9995_end. 0x63a0: 2c00 06c6 0a01 1e72 6563 6f72 645f 3939 ,......record_99 0x63b0: 3936 5f65 6e64 002c 0006 c80a 011e 7265 96_end.,......re 0x63c0: 636f 7264 5f39 3939 375f 656e 6400 2c00 cord_9997_end.,. 0x63d0: 06ca 0a01 1e72 6563 6f72 645f 3939 3938 .....record_9998 0x63e0: 5f65 6e64 002c 0006 cc0a 011e 7265 636f _end.,......reco 0x63f0: 7264 5f39 3939 395f 656e 6400 2e00 06ce rd_9999_end..... 0x6400: 0a01 2072 6563 6f72 645f 3130 3030 305f ...record_10000_ 0x6410: 656e 6400 0000 00 end....
이 경우의 Offset Commit API 를 확인해보면, 아래와 같이 0x00de, 0x00df 바이트 위치에 "01f4" 인 500 이 설정됩니다.
즉, Offset 500 을 Commit 하게 됩니다.
04:27:52.207433 IP lucid_kapitsa.kafka-net.48522 > kafka1.9091: Flags [P.], seq 881:1061, ack 956, win 32044, options [nop,nop,TS val 1569684516 ecr 4175411235], length 180 0x0000: 4500 00e8 3d18 4000 4006 a4ca ac13 0004 E...=.@.@....... 0x0010: ac13 0003 bd8a 2383 d19e 555a c5a5 5a1b ......#...UZ..Z. 0x0020: 8018 7d2c 5908 0000 0101 080a 5d8f 7c24 ..},Y.......].|$ 0x0030: f8df b823 0000 00b0 0008 0008 0000 000b ...#............ 0x0040: 0021 636f 6e73 756d 6572 2d6d 792d 636f .!consumer-my-co 0x0050: 6e73 756d 6572 2d67 726f 7570 3131 3131 nsumer-group1111 0x0060: 312d 3100 176d 792d 636f 6e73 756d 6572 1-1..my-consumer 0x0070: 2d67 726f 7570 3131 3131 3100 0000 0147 -group11111....G 0x0080: 636f 6e73 756d 6572 2d6d 792d 636f 6e73 consumer-my-cons 0x0090: 756d 6572 2d67 726f 7570 3131 3131 312d umer-group11111- 0x00a0: 312d 3261 6637 6232 6239 2d61 6431 612d 1-2af7b2b9-ad1a- 0x00b0: 3430 3934 2d39 3437 642d 6136 3532 3264 4094-947d-a6522d 0x00c0: 6534 6439 3433 0002 0b74 6573 742d 746f e4d943...test-to 0x00d0: 7069 6302 0000 0000 0000 0000 0000 01f4 pic............. 0x00e0: 0000 0000 0100 0000 ........
카프카 클러스터 실행.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] __consumer_offsets Topic 알아보기 (0) 2024.07.01 [Kafka] Static Membership 과 Partition Assignment 관계 알아보기 ( group.instance.id ) (0) 2024.06.30 [Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30 - replica_id