-
Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.partiKafka 2024. 1. 12. 20:09728x90반응형
- 목차
관련있는 다른 글.
https://westlife0615.tistory.com/588
들어가며.
이번 글에서는 Kafka Consumer 의 설정들에 대해서 자세히 알아보도록 하겠습니다.
fetch.min.bytes
fetch.min.bytes 설정은 Kafka Consumer 가 조회할 수 있는 최소한의 Record 의 크기입니다.
KafkaConsumer.poll 과 같이 1회 요청으로 조회할 수 있는 Record 들의 총 용량이구요.
만약 Topic 에 저장된 Record 의 크기가 fetch.min.bytes 의 크기보다 작다면 Kafka Consumer 는 Record 를 조회할 수 없습니다.
fetch.min.bytes 의 기본값은 1 byte 로 설정되어 있으며 왠만한 Record 의 크기가 1byte 보다 크기 때문에
fetch.min.bytes 설정으로 인해서 Consumer 가 Wait 상태에 빠지는 경우는 거의 없습니다.
아래의 예시처럼 Broker 의 Partition 은 1 Byte 인 Record 4개를 가집니다.
그리고 2개의 Consumer 는 각각 fetch.min.bytes 가 3 과 100 인 상태입니다.
미리 말씀드리는 것은 아래는 예시일 뿐입니다.
실제 Record 의 byte 를 측정하는게 까다로워서 아래와 같은 비유를 들었다는 점을 참고하시면 좋을 것 같습니다.
key, value, timestamp 등을 고려한 Record 의 정확한 byte 크기는 다음에 새로운 글로 전달드리도록 하겠습니다.
다시 돌아와서,
현재 Broker 의 Partition 은 4개의 Record 를 가지며, 4 Bytes 의 용량을 차지합니다.
이떄, fetch.min.bytes 가 3bytes 인 Consumer A 는 즉각적인 Response 를 받을 수 있습니다.
반면, fetch.min.bytes 가 100bytes 인 Consumer B 는 새로운 Record 들이 추가되어야 Response 를 받을 수 있습니다.
실제 코드로 예를 들어 보겠습니다.
현재 test 라는 Topic 에 {key : null, value : 1, timestamp : 2024-01-16 XXX} 인 Record 1개가 존재합니다.
일부로 작은 크기의 Record 를 생성하였구요.fetch.min.bytes 가 1 인 경우.
fetch.min.bytes 가 1 인 경우는 즉각적인 응답이 이루어집니다.
그 이유는 Partition 에 존재하는 { key : Null, value : 1 } 인 Record 는 1 byte 가 넘는 크기이기 때문입니다.from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", fetch_min_bytes=1, fetch_max_wait_ms=5 * 1000 ) start_time = datetime.now() response = consumer.poll(timeout_ms=60 * 1000) for partition, records in response.items(): print(f"value : {records[0].value}") print(f"time : {datetime.now() - start_time}")
value : b'1' time : 0:00:00.360588
fetch.min.bytes 가 100000 인 경우.
fetch.min.bytes 의 값을 매우 큰 값으로 설정해보았습니다.
큰 의미는 없습니다.
이 경우에는 Broker 의 Partition 에 fetch.min.bytes 만큼의 Record 가 존재하지 않습니다.
따라서 즉각적으로 응답이 이루어지지 않습니다.
이러한 케이스에서 fetch.max.wait.ms 으로 설정된 시간만큼 대기 후에 응답이 이루어지는데요.
이 설정은 이어지는 내용에서 설명을 할 것이구요.
아래의 결과처럼 fetch.max.wait.ms 인 5초 이후에 Record 조회가 발생하는 것을 확인할 수 있습니다.
from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", fetch_min_bytes=1000000, fetch_max_wait_ms=5 * 1000 ) start_time = datetime.now() response = consumer.poll(timeout_ms=60 * 1000) for partition, records in response.items(): print(f"value : {records[0].value}") print(f"time : {datetime.now() - start_time}")
value : b'1' time : 0:00:05.373158
그리고 fetch.min.bytes 값을 테스트해본 결과 {key : Null, value : 1} 레코드에 한하여
fetch.min.bytes 값이 69 바이트일 때에 즉각적인 조회가 발생합니다.from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", fetch_min_bytes=69, fetch_max_wait_ms=5 * 1000 ) start_time = datetime.now() response = consumer.poll(timeout_ms=60 * 1000) for partition, records in response.items(): print(f"value : {records[0].value}") print(f"time : {datetime.now() - start_time}")
value : b'1' time : 0:00:00.266172
fetch.max.wait.ms
fetch.max.wait.ms 와 fetch.min.bytes 는 카프카 레코드의 1회성 조회를 위한 또 다른 조건입니다.
fetch.min.bytes 는 카프카 레코드의 조회를 위한 최소 용량이라면,
fetch.max.wait.ms 는 fetch.min.bytes 만큼의 카프카 레코드가 없더라도 지정된 시간이 지나면 응답이 이뤄집니다.
즉, Consumer 의 레코드 조회 요청은 시간조건과 크기조건이 필요하게 됩니다.
아래의 상황에서 Partition 의 Record 들은 100 Bytes 가 되지 않습니다.
반면, Consumer B 는 fetch.min.bytes : 100 & fetch.max.wait.ms : 5000 으로 데이터를 조회하고 있습니다.
이 상황에서 Broker 는 100 Bytes 의 레코드가 존재하지 않으므로 5초 이후에 데이터를 제공해줍니다.
max.partition.fetch.bytes
max.partition.fetch.bytes 는 Broker 가 Consumer 에게 제공할 수 있는 데이터의 Bytes 제한입니다.
이 Bytes 제한은 Partition 별로 적용되며 하나의 Consumer 가 2개의 Partition 의 Onwership 을 가진다면
두 Partition 의 max.partition.fetch.bytes 를 수용할 수 있는 메모리가 필요합니다.
먼저 test 라는 Topic 에 1.5Mb 정도의 데이터를 추가하였습니다.
제 케이스에선 Log File 의 위치가 아래와 같으며, server.properties 의 log.dir 설정값이 곧 로그 파일의 위치입니다.ls -lh /var/lib/kafka/data/test-0
total 1.5M -rw-r--r-- 1 appuser appuser 10M Jan 16 22:14 00000000000000000000.index -rw-r--r-- 1 appuser appuser 1.5M Jan 16 22:14 00000000000000000000.log -rw-r--r-- 1 appuser appuser 10M Jan 16 22:14 00000000000000000000.timeindex -rw-r--r-- 1 appuser appuser 8 Jan 16 12:21 leader-epoch-checkpoint -rw-r--r-- 1 appuser appuser 43 Jan 16 12:21 partition.metadata
max.partition.fetch.bytes : 1Mb 씩 2번 Polling 하기.
총 Record 들의 크기가 1.5Mb 이므로 max.partition.fetch.bytes 를 1Mb 로 설정한다면 2번의 Polling 으로 조회가 가능합니다.
from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", max_poll_records=1000000000, max_partition_fetch_bytes=1 * 1024 * 1024, fetch_max_bytes=1 * 1024 * 1024, ) start_time = datetime.now() response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"first poll : {len(record_list[0])}") response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"second poll : {len(record_list[0])}") print(f"time : {datetime.now() - start_time}") consumer.commit()
first poll : 61115 second poll : 24887 time : 0:00:02.498157
max.partition.fetch.bytes : 2Mb 씩 1번만에 Polling 하기.
총 Record 의 크기가 1.5Mb 이므로 max.partition.fetch.bytes 를 2MB 로 설정한다면,
1번의 Polling 으로 모든 레코드들을 조회할 수 있습니다.
from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", max_poll_records=1000000000, max_partition_fetch_bytes=2 * 1024 * 1024, fetch_max_bytes=2 * 1024 * 1024, ) start_time = datetime.now() response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"first poll : {len(record_list[0])}") response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"second poll : {len(record_list[0] if len(record_list) > 0 else [])}") print(f"time : {datetime.now() - start_time}") consumer.commit()
first poll : 86002 second poll : 0 time : 0:00:07.613935
max.poll.records
max.poll.records 는 설정은 한번에 응답받을 수 있는 카프카 레코드의 갯수 제한을 위한 설정입니다.
위의 max.partition.fetch.bytes 예시에서 제가 max.poll.records 값은 10000000 으로 설정하고 진행하였는데요.
만약 max.poll.records 의 값이 500 이라면 max.partition.fetch.bytes 와 무관하게 최대 500개의 레코드만을 조회합니다.
그리고 기본값 또한 500 개 입니다.
아래의 예시에서 확인할 수 있듯이 각 Polling 마다 500개의 레코드를 조회합니다.from kafka import KafkaConsumer from datetime import datetime import random consumer = KafkaConsumer( "test", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group" + str(random.choice(range(0, 1000000))), auto_offset_reset="earliest", max_poll_records=500, max_partition_fetch_bytes=2 * 1024 * 1024, fetch_max_bytes=2 * 1024 * 1024, ) start_time = datetime.now() response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"first poll : {len(record_list[0])}") response = consumer.poll(timeout_ms=5 * 1000) record_list = [[record for record in records] for _, records in response.items()] print(f"second poll : {len(record_list[0] if len(record_list) > 0 else [])}") print(f"time : {datetime.now() - start_time}") consumer.commit()
first poll : 500 second poll : 500 time : 0:00:02.562194
마치며.
내용이 너무 길어지는 관계로 다음 글에서 나머지 설정들에 대해서 알아보도록 하겠습니다.
감사합니다.반응형'Kafka' 카테고리의 다른 글
[Kafka Consumer] Exactly-Once 구현하기 (0) 2024.01.13 Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.ms ) (0) 2024.01.13 Kafka Log Compaction 알아보기 (0) 2024.01.12 [ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries ) (0) 2024.01.09 [Kafka-Streams] mapValues 알아보기 (0) 2024.01.09