ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기
    Kafka/Kafka Consumer 2024. 6. 30. 14:56
    반응형

    - 목차

     

    들어가며.

    카프카 컨슈머는 max.poll.records 설정을 통해서 Consumer Poll 함수를 통해서 반환되는 레코드의 갯수를 제한할 수 있습니다.

    예를 들어서 아래와 같은 형식으로 획득한 ConsumerRecords 를 최대 500개의 Record 들을 가지게 됩니다. 

    Properties props = new Properties();
    props.put("max.poll.records", 500);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
    System.out.println("Fetched Records: " + records.count());

     

    하지만 실제 Kafka Consumer 가 Broker 에게 데이터를 요청하는 Fetch API 는 max.poll.records 의 정보는 제공되지 않습니다. 

    왜냐하며 카프카의 Fetch API 는 min-max bytes 의 용량를 기준으로 데이터를 교환하지 갯수를 기반으로 통신이 이루어지지 않습니다.

    그래서 실제로는 max.poll.records 보다 더 많은 양의 데이터가 실제로 Consumer 에게 전달됩니다.

     

    카프카 컨슈머 내부적으로 캐싱하고 있는 데이터의 갯수와 Poll Function 을 통해서 처리 중인 데이터의 값이 일치하지 않으며,

    버퍼링되어 있는 일부 데이터가 항상 존재합니다.

    따라서 Consumer Offset 의 관리와 Consumer 의 갑작스런 종료 시에 캐싱된 데이터의 유실에 대해서 항상 고려해야합니다.

     

    이어지는 글에서 max.poll.records 와 관련된 여러가지 테스트와 실험을 진행해보도록 하겠습니다.

    Fetch API ?

    카프카 컨슈머는 Fetch API 를 활용하여 리더 브로커로부터 데이터를 획득합니다.

    우선 Fetch API 의 포맷에 대해서 알아봅니다.

    아래는 Version 0 인 초기버전의 Fetch API Format 입니다.

    Fetch Request (Version: 0) => replica_id max_wait_ms min_bytes [topics] 
      replica_id => INT32
      max_wait_ms => INT32
      min_bytes => INT32
      topics => topic [partitions] 
        topic => STRING
        partitions => partition fetch_offset partition_max_bytes 
          partition => INT32
          fetch_offset => INT64
          partition_max_bytes => INT32

     

    • replica_id
      • replica_id 는 브로커의 식별값인 Broker ID 를 의미합니다.
      • 일반적으로 1, 2, 3 과 같은 값이 사용되며, 사용자가 직접 지정하는 값입니다.

     

    • max_wait_ms
      • max_wait_ms 는 컨슈머가 브로커로부터 데이터를 응답받을 수 있는 최대 시간입니다.
      • 이는 내부적으로 리더 브로커가 min_bytes (fetch.min.bytes) 만큼의 데이터를 모아서 컨슈머에게 전송합니다.
      • 만약 Log Segment File 에 충분한 데이터가 없다면 max_wait_ms 만큼 데이터가 쌓이길 기다린 후 컨슈머에게 데이터를 전달합니다.

     

    • min_bytes
      • max_wait_ms 와 함께 사용되는 컨슈머의 속성입니다.
      • 리더 브로커는 min_bytes 만큼의 데이터를 컨슈머에게 제공하기를 보장합니다.
      • 하지만 min_bytes 만큼의 데이터가 없다면, max_wait_ms 이후에 부족한 양의 데이터라도 응답합니다.

     

    위와 같은 형식으로 컨슈머는 브로커에게 데이터를 요청할 수 있습니다.

    이 단계에서 사용되는 API 가 바로 Fetch API 입니다.

     

    아래의 포맷은 Fetch Response 의 포맷입니다. 

    아래의 포맷은 직관적으로 이해가 가능할 것 같아서 상세한 설명은 생략하도록 하겠습니다.

    Fetch Response (Version: 0) => [responses] 
      responses => topic [partitions] 
        topic => STRING
        partitions => partition_index error_code high_watermark records 
          partition_index => INT32
          error_code => INT16
          high_watermark => INT64
          records => RECORDS

     

     

    max.poll.records

    max.poll.records 에 대해서 알아보기 이전에,우선 아래의 명령어를 통해서 Topic 와 Record 를 생성합니다.

    "test-topic" 이라는 이름의 토픽과 10개의 레코드가 생성됩니다. 

    kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --create --partitions 1 --replication-factor 1
    
    touch /tmp/text.txt
    for i in {1..10};
    do echo record_${i}_end >> /tmp/text.txt;
    done;
    
    cat /tmp/text.txt | kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic test-topic

     

    그리고 아래와 같이 Consumer 를 생성하고, 1회 Polling 을 수행합니다. 

     

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka1:9091");
            props.put("group.id", "my-consumer-group");
            props.put("max.poll.records", 5);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("auto.offset.reset", "earliest");
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("test-topic"));
    
            System.out.println("Kafka Consumer started. Waiting for messages...");
    
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                System.out.println("Fetched Records: " + records.count());
                consumer.commitSync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }

     

    아래는 Leader Broker 로부터 Record 를 전달받은 TCP Packet 입니다.

    아래 Packet 의 Payload 부분을 확인해보면 record 1 ~ record 10 까지의 데이터가 모두 조회됨을 확인할 수 있습니다.

    실제 max.poll.records 의 값과는 별개로 Fetch API 를 통해서 가능한 많은 양의 데이터를 획득하여,

    Consumer 의 내부 메모리 영역에 캐싱해두게 됩니다.

     

    04:51:01.574972 IP kafka1.9091 > competent_golick.kafka-net.43386: Flags [P.], seq 569:821, ack 277, win 50542, options [nop,nop,TS val 4176800674 ecr 1571073923], length 252
    	0x0000:  4500 0130 b3db 4000 4006 2dbf ac13 0003  E..0..@.@.-.....
    	0x0010:  ac13 0004 2383 a97a b60f 9f60 f70f cdc3  ....#..z...`....
    	0x0020:  8018 c56e 5950 0000 0101 080a f8f4 eba2  ...nYP..........
    	0x0030:  5da4 af83 0000 0000 0000 0000 0000 00f0  ]...............
    	0x0040:  0000 0000 02e9 d4a9 b500 0000 0000 0900  ................
    	0x0050:  0001 9540 97f2 b900 0001 9540 97f2 cc00  ...@.......@....
    	0x0060:  0000 0000 0000 0300 0000 0000 0000 0000  ................
    	0x0070:  0a24 0000 0001 1872 6563 6f72 645f 315f  .$.....record_1_
    	0x0080:  656e 6400 2400 2402 0118 7265 636f 7264  end.$.$...record
    	0x0090:  5f32 5f65 6e64 0024 0026 0401 1872 6563  _2_end.$.&...rec
    	0x00a0:  6f72 645f 335f 656e 6400 2400 2606 0118  ord_3_end.$.&...
    	0x00b0:  7265 636f 7264 5f34 5f65 6e64 0024 0026  record_4_end.$.&
    	0x00c0:  0801 1872 6563 6f72 645f 355f 656e 6400  ...record_5_end.
    	0x00d0:  2400 260a 0118 7265 636f 7264 5f36 5f65  $.&...record_6_e
    	0x00e0:  6e64 0024 0026 0c01 1872 6563 6f72 645f  nd.$.&...record_
    	0x00f0:  375f 656e 6400 2400 260e 0118 7265 636f  7_end.$.&...reco
    	0x0100:  7264 5f38 5f65 6e64 0024 0026 1001 1872  rd_8_end.$.&...r
    	0x0110:  6563 6f72 645f 395f 656e 6400 2600 2612  ecord_9_end.&.&.
    	0x0120:  011a 7265 636f 7264 5f31 305f 656e 6400  ..record_10_end.

     

    그리고 아래의 Packet 은 Consumer 의 commitSync 함수에 의해서 요청되는 CommitOffset API Request 입니다.

    0x0038 ~ 0x0039 번째 바이트의 값인 "0008" 이 CommitOffset API 의 Key 에 해당하는 값이구요.

    0x00d6 의 바이트에 작성된 5 가 실제 max.poll.records 에 의해서 조회한 레코드의 Offset 에 해당합니다.

     

    여기서 중요한 점은 실제로 Fetch API 를 통해서 조회한 레코드는 0 ~ 9 Offset 에 해당하는 모든 레코드를 다 조회하였습니다.

    하지만 이는 카프카 컨슈머의 내부 캐싱 영역에서 보관되며, 실제 데이터가 처리되는 영역에서는 0 ~ 4 Offset 의 레코드들이 처리됩니다.

    즉, 캐싱 영역과 처리 영역에서 관찰되는 데이터의 갯수가 다르다는 점입니다. 

    Commit Offset API 의 경우에는 캐싱 영역의 Offset 이 아니라 처리 영역에서의 Offset 을 기준으로 동작하게 됩니다.

     

    04:51:01.585952 IP competent_golick.kafka-net.43370 > kafka1.9091: Flags [P.], seq 845:1016, ack 941, win 32044, options [nop,nop,TS val 1571073939 ecr 4176800656], length 171
    	0x0000:  4500 00df 3216 4000 4006 afd5 ac13 0004  E...2.@.@.......
    	0x0010:  ac13 0003 a96a 2383 c4c3 3746 57d9 ffe5  .....j#...7FW...
    	0x0020:  8018 7d2c 58ff 0000 0101 080a 5da4 af93  ..},X.......]...
    	0x0030:  f8f4 eb90 0000 00a7 0008 0008 0000 000b  ................
    	0x0040:  001e 636f 6e73 756d 6572 2d6d 792d 636f  ..consumer-my-co
    	0x0050:  6e73 756d 6572 2d67 726f 7570 3232 2d31  nsumer-group22-1
    	0x0060:  0014 6d79 2d63 6f6e 7375 6d65 722d 6772  ..my-consumer-gr
    	0x0070:  6f75 7032 3200 0000 0144 636f 6e73 756d  oup22....Dconsum
    	0x0080:  6572 2d6d 792d 636f 6e73 756d 6572 2d67  er-my-consumer-g
    	0x0090:  726f 7570 3232 2d31 2d39 6466 6362 3239  roup22-1-9dfcb29
    	0x00a0:  632d 3330 3937 2d34 6339 662d 6261 3965  c-3097-4c9f-ba9e
    	0x00b0:  2d35 3434 6135 3165 3965 6664 3700 020b  -544a51e9efd7...
    	0x00c0:  7465 7374 2d74 6f70 6963 0200 0000 0000  test-topic......
    	0x00d0:  0000 0000 0000 0500 0000 0001 0000 00    ...............

     

     

    많은 수의 레코드로 테스트.

    이번에는 1만개의 레코드가 존재하는 Topic 과 max.poll.records 의 값을 500 으로 설정합니다.

     

    kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --create --partitions 1 --replication-factor 1
    
    touch /tmp/text.txt
    for i in {1..10000};
    do echo record_${i}_end >> /tmp/text.txt;
    done;
    
    cat /tmp/text.txt | kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic test-topic

     

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka1:9091");
            props.put("group.id", "my-consumer-group");
            props.put("max.poll.records", 5);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("auto.offset.reset", "earliest");
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("test-topic"));
    
            System.out.println("Kafka Consumer started. Waiting for messages...");
    
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                System.out.println("Fetched Records: " + records.count());
                consumer.commitSync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }

     

     

    max.poll.records 를 500으로 설정하더라도 아래와 같이 1만개의 레코드들이 한번에 Consumer 로 전달됩니다.

    그리고 Consumer 는 캐싱 영역에 1만개의 레코드를 보유하고 있지만, 데이터 처리 영역으로 500개의 레코드만을 전달하는 구조입니다.

     

    04:27:52.194110 IP kafka1.9091 > lucid_kapitsa.kafka-net.48536: Flags [P.], seq 570:7810, ack 286, win 50538, options [nop,nop,TS val 4175411249 ecr 1569684501], length 7240
    	0x0000:  4500 1c7c 7628 4000 4006 5026 ac13 0003  E..|v(@.@.P&....
    	0x0010:  ac13 0004 2383 bd98 4c20 6e50 ceeb 017b  ....#...L.nP...{
    	0x0020:  8018 c56a 749c 0000 0101 080a f8df b831  ...jt..........1
    	0x0030:  5d8f 7c15 0000 0000 0000 0000 0000 3fe3  ].|...........?.
    	0x0040:  0000 0000 02a9 1064 7300 0000 0002 ec00  .......ds.......
    	0x0050:  0001 9540 81c9 3a00 0001 9540 81c9 6900  ...@..:....@..i.
    	0x0060:  0000 0000 0000 0000 0000 0000 0000 0002  ................
    	0x0070:  ed24 0000 0001 1872 6563 6f72 645f 315f  .$.....record_1_
    	0x0080:  656e 6400 2400 3002 0118 7265 636f 7264  end.$.0...record
    	0x0090:  5f32 5f65 6e64 0024 0030 0401 1872 6563  _2_end.$.0...rec
    	0x00a0:  6f72 645f 335f 656e 6400 2400 3006 0118  ord_3_end.$.0...
    	// ... 중략...
    	0x6390:  7265 636f 7264 5f39 3939 355f 656e 6400  record_9995_end.
    	0x63a0:  2c00 06c6 0a01 1e72 6563 6f72 645f 3939  ,......record_99
    	0x63b0:  3936 5f65 6e64 002c 0006 c80a 011e 7265  96_end.,......re
    	0x63c0:  636f 7264 5f39 3939 375f 656e 6400 2c00  cord_9997_end.,.
    	0x63d0:  06ca 0a01 1e72 6563 6f72 645f 3939 3938  .....record_9998
    	0x63e0:  5f65 6e64 002c 0006 cc0a 011e 7265 636f  _end.,......reco
    	0x63f0:  7264 5f39 3939 395f 656e 6400 2e00 06ce  rd_9999_end.....
    	0x6400:  0a01 2072 6563 6f72 645f 3130 3030 305f  ...record_10000_
    	0x6410:  656e 6400 0000 00                        end....

     

     

    이 경우의 Offset Commit API 를 확인해보면, 아래와 같이 0x00de, 0x00df 바이트 위치에 "01f4" 인 500 이 설정됩니다.

    즉, Offset 500 을 Commit 하게 됩니다.

     

    04:27:52.207433 IP lucid_kapitsa.kafka-net.48522 > kafka1.9091: Flags [P.], seq 881:1061, ack 956, win 32044, options [nop,nop,TS val 1569684516 ecr 4175411235], length 180
    	0x0000:  4500 00e8 3d18 4000 4006 a4ca ac13 0004  E...=.@.@.......
    	0x0010:  ac13 0003 bd8a 2383 d19e 555a c5a5 5a1b  ......#...UZ..Z.
    	0x0020:  8018 7d2c 5908 0000 0101 080a 5d8f 7c24  ..},Y.......].|$
    	0x0030:  f8df b823 0000 00b0 0008 0008 0000 000b  ...#............
    	0x0040:  0021 636f 6e73 756d 6572 2d6d 792d 636f  .!consumer-my-co
    	0x0050:  6e73 756d 6572 2d67 726f 7570 3131 3131  nsumer-group1111
    	0x0060:  312d 3100 176d 792d 636f 6e73 756d 6572  1-1..my-consumer
    	0x0070:  2d67 726f 7570 3131 3131 3100 0000 0147  -group11111....G
    	0x0080:  636f 6e73 756d 6572 2d6d 792d 636f 6e73  consumer-my-cons
    	0x0090:  756d 6572 2d67 726f 7570 3131 3131 312d  umer-group11111-
    	0x00a0:  312d 3261 6637 6232 6239 2d61 6431 612d  1-2af7b2b9-ad1a-
    	0x00b0:  3430 3934 2d39 3437 642d 6136 3532 3264  4094-947d-a6522d
    	0x00c0:  6534 6439 3433 0002 0b74 6573 742d 746f  e4d943...test-to
    	0x00d0:  7069 6302 0000 0000 0000 0000 0000 01f4  pic.............
    	0x00e0:  0000 0000 0100 0000                      ........

     

     

     

     

     

    카프카 클러스터 실행.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2

     

     

     

     

     

    반응형
Designed by Tistory.