ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.parti
    Kafka 2024. 1. 12. 20:09
    728x90
    반응형

    - 목차

     

    관련있는 다른 글.

    https://westlife0615.tistory.com/588

    Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.m

    - 목차 들어가며. 이번 글에서는 Kafka Consumer 의 Configuration 들 중에서 heartbeat.interval.ms, session.timeout.ms, auto.offset.reset, auto.commit.interval.ms 등 에 대해서 알아보도록 하겠습니다. heartbeat.interval.ms Kafka C

    westlife0615.tistory.com

     

    들어가며.

    이번 글에서는 Kafka Consumer 의 설정들에 대해서 자세히 알아보도록 하겠습니다.


     

    fetch.min.bytes

    fetch.min.bytes 설정은 Kafka Consumer 가 조회할 수 있는 최소한의 Record 의 크기입니다.
    KafkaConsumer.poll 과 같이 1회 요청으로 조회할 수 있는 Record 들의 총 용량이구요.
    만약 Topic 에 저장된 Record 의 크기가 fetch.min.bytes 의 크기보다 작다면 Kafka Consumer 는 Record 를 조회할 수 없습니다.
    fetch.min.bytes 의 기본값은 1 byte 로 설정되어 있으며 왠만한 Record 의 크기가 1byte  보다 크기 때문에
    fetch.min.bytes 설정으로 인해서 Consumer 가 Wait 상태에 빠지는 경우는 거의 없습니다.
     
    아래의 예시처럼 Broker 의 Partition 은 1 Byte 인 Record 4개를 가집니다.
    그리고 2개의 Consumer 는 각각 fetch.min.bytes 가 3 과 100 인 상태입니다.
    미리 말씀드리는 것은 아래는 예시일 뿐입니다.
    실제 Record 의 byte 를 측정하는게 까다로워서 아래와 같은 비유를 들었다는 점을 참고하시면 좋을 것 같습니다.
    key, value, timestamp 등을 고려한 Record 의 정확한 byte 크기는 다음에 새로운 글로 전달드리도록 하겠습니다.
    다시 돌아와서,
    현재 Broker 의 Partition 은 4개의 Record 를 가지며, 4 Bytes 의 용량을 차지합니다.
    이떄, fetch.min.bytes 가 3bytes 인 Consumer A 는 즉각적인 Response 를 받을 수 있습니다.
    반면, fetch.min.bytes 가 100bytes 인 Consumer B 는 새로운 Record 들이 추가되어야 Response 를 받을 수 있습니다.

     
    실제 코드로 예를 들어 보겠습니다.
    현재 test 라는 Topic 에 {key : null, value : 1, timestamp : 2024-01-16 XXX} 인 Record 1개가 존재합니다.
    일부로 작은 크기의 Record 를 생성하였구요.

     

    fetch.min.bytes 가 1 인 경우.

    fetch.min.bytes 가 1 인 경우는 즉각적인 응답이 이루어집니다.
    그 이유는 Partition 에 존재하는 { key : Null, value : 1 } 인 Record 는 1 byte 가 넘는 크기이기 때문입니다.

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        fetch_min_bytes=1,
        fetch_max_wait_ms=5 * 1000
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=60 * 1000)
    for partition, records in response.items():
        print(f"value : {records[0].value}")
    print(f"time : {datetime.now() - start_time}")
    value : b'1'
    time : 0:00:00.360588

     

    fetch.min.bytes 가 100000 인 경우.

    fetch.min.bytes 의 값을 매우 큰 값으로 설정해보았습니다.
    큰 의미는 없습니다.
    이 경우에는 Broker 의 Partition 에 fetch.min.bytes 만큼의 Record 가 존재하지 않습니다.
    따라서 즉각적으로 응답이 이루어지지 않습니다.
    이러한 케이스에서 fetch.max.wait.ms 으로 설정된 시간만큼 대기 후에 응답이 이루어지는데요.
    이 설정은 이어지는 내용에서 설명을 할 것이구요.
    아래의 결과처럼 fetch.max.wait.ms 인 5초 이후에 Record 조회가 발생하는 것을 확인할 수 있습니다.
     

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        fetch_min_bytes=1000000,
        fetch_max_wait_ms=5 * 1000
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=60 * 1000)
    for partition, records in response.items():
        print(f"value : {records[0].value}")
    print(f"time : {datetime.now() - start_time}")

     

    value : b'1'
    time : 0:00:05.373158

     
    그리고 fetch.min.bytes 값을 테스트해본 결과 {key : Null, value : 1} 레코드에 한하여
    fetch.min.bytes 값이 69 바이트일 때에 즉각적인 조회가 발생합니다.

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        fetch_min_bytes=69,
        fetch_max_wait_ms=5 * 1000
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=60 * 1000)
    for partition, records in response.items():
        print(f"value : {records[0].value}")
    print(f"time : {datetime.now() - start_time}")
    value : b'1'
    time : 0:00:00.266172

     
     

    fetch.max.wait.ms

    fetch.max.wait.ms 와 fetch.min.bytes 는 카프카 레코드의 1회성 조회를 위한 또 다른 조건입니다.
    fetch.min.bytes 는 카프카 레코드의 조회를 위한 최소 용량이라면,
    fetch.max.wait.ms 는 fetch.min.bytes 만큼의 카프카 레코드가 없더라도 지정된 시간이 지나면 응답이 이뤄집니다.
    즉, Consumer 의 레코드 조회 요청은 시간조건과 크기조건이 필요하게 됩니다.
     
    아래의 상황에서 Partition 의 Record 들은 100 Bytes 가 되지 않습니다.
    반면, Consumer B 는 fetch.min.bytes : 100 & fetch.max.wait.ms : 5000 으로 데이터를 조회하고 있습니다.
    이 상황에서 Broker 는 100 Bytes 의 레코드가 존재하지 않으므로 5초 이후에 데이터를 제공해줍니다.

     
     

    max.partition.fetch.bytes

    max.partition.fetch.bytes 는 Broker 가 Consumer 에게 제공할 수 있는 데이터의 Bytes 제한입니다.
    이 Bytes 제한은 Partition 별로 적용되며 하나의 Consumer 가 2개의 Partition 의 Onwership 을 가진다면
    두 Partition 의 max.partition.fetch.bytes 를 수용할 수 있는 메모리가 필요합니다.
     
    먼저 test 라는 Topic 에 1.5Mb 정도의 데이터를 추가하였습니다.
    제 케이스에선 Log File 의 위치가 아래와 같으며, server.properties 의 log.dir 설정값이 곧 로그 파일의 위치입니다.

    ls -lh /var/lib/kafka/data/test-0
    total 1.5M
    -rw-r--r-- 1 appuser appuser  10M Jan 16 22:14 00000000000000000000.index
    -rw-r--r-- 1 appuser appuser 1.5M Jan 16 22:14 00000000000000000000.log
    -rw-r--r-- 1 appuser appuser  10M Jan 16 22:14 00000000000000000000.timeindex
    -rw-r--r-- 1 appuser appuser    8 Jan 16 12:21 leader-epoch-checkpoint
    -rw-r--r-- 1 appuser appuser   43 Jan 16 12:21 partition.metadata

     
     

    max.partition.fetch.bytes : 1Mb 씩 2번 Polling 하기.

    총 Record 들의 크기가 1.5Mb 이므로 max.partition.fetch.bytes 를 1Mb 로 설정한다면 2번의 Polling 으로 조회가 가능합니다.

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        max_poll_records=1000000000,
        max_partition_fetch_bytes=1 * 1024 * 1024,
        fetch_max_bytes=1 * 1024 * 1024,
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=5 * 1000)
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"first poll : {len(record_list[0])}")
    response = consumer.poll(timeout_ms=5 * 1000)
    
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"second poll : {len(record_list[0])}")
    print(f"time : {datetime.now() - start_time}")
    consumer.commit()
    first poll : 61115
    second poll : 24887
    time : 0:00:02.498157

     
     

    max.partition.fetch.bytes : 2Mb 씩 1번만에 Polling 하기.

    총 Record 의 크기가 1.5Mb 이므로 max.partition.fetch.bytes 를 2MB 로 설정한다면,
    1번의 Polling 으로 모든 레코드들을 조회할 수 있습니다.
     

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        max_poll_records=1000000000,
        max_partition_fetch_bytes=2 * 1024 * 1024,
        fetch_max_bytes=2 * 1024 * 1024,
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=5 * 1000)
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"first poll : {len(record_list[0])}")
    response = consumer.poll(timeout_ms=5 * 1000)
    
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"second poll : {len(record_list[0] if len(record_list) > 0 else [])}")
    print(f"time : {datetime.now() - start_time}")
    consumer.commit()
    first poll : 86002
    second poll : 0
    time : 0:00:07.613935

     
     

    max.poll.records

    max.poll.records 는 설정은 한번에 응답받을 수 있는 카프카 레코드의 갯수 제한을 위한 설정입니다.
    위의 max.partition.fetch.bytes 예시에서 제가 max.poll.records 값은 10000000 으로 설정하고 진행하였는데요.
    만약 max.poll.records 의 값이 500 이라면 max.partition.fetch.bytes 와 무관하게 최대 500개의 레코드만을 조회합니다.
    그리고 기본값 또한 500 개 입니다.
     
    아래의 예시에서 확인할 수 있듯이 각 Polling 마다 500개의 레코드를 조회합니다.

    from kafka import KafkaConsumer
    from datetime import datetime
    import random
    
    consumer = KafkaConsumer(
        "test",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group" + str(random.choice(range(0, 1000000))),
        auto_offset_reset="earliest",
        max_poll_records=500,
        max_partition_fetch_bytes=2 * 1024 * 1024,
        fetch_max_bytes=2 * 1024 * 1024,
    )
    start_time = datetime.now()
    response = consumer.poll(timeout_ms=5 * 1000)
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"first poll : {len(record_list[0])}")
    response = consumer.poll(timeout_ms=5 * 1000)
    
    record_list = [[record for record in records] for _, records in response.items()]
    print(f"second poll : {len(record_list[0] if len(record_list) > 0 else [])}")
    print(f"time : {datetime.now() - start_time}")
    consumer.commit()
    first poll : 500
    second poll : 500
    time : 0:00:02.562194

     

     

    마치며.

    내용이 너무 길어지는 관계로 다음 글에서 나머지 설정들에 대해서 알아보도록 하겠습니다.
    감사합니다.

    반응형
Designed by Tistory.