ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • kafka __consumer_offsets topic 이해하기
    Kafka 2023. 9. 18. 22:16
    반응형

    - 목차

     

    키워드.

    • Offset
    • Consumer
    • Consumer Group

     

    들어가며.

     

    카프카는 "__consumer_offsets" 이라는 이름의 토픽을 가집니다.
    "__consumer_offsets"  토픽의 목적은 Consumer Group 이 조회한 토픽의 offset 들을 기록하는 것입니다.

    예를 들어,
    "topicA" 라는 토픽이 있고 파티션의 개수는 3개입니다.
    그리고 "groupA" 라는 이름의 Consumer Group 이 있고 각 파티션 별로 1개 메시지를 조회한 상황입니다.
    이 상황의 "__consumer_offsets" 에는

    • topicA-0 & groupA : 1
    • topicA-1 & groupA : 1
    • topicA-2 & groupA : 1

    의 기록이 생성됩니다. ( 참고로 Partition 의 이름은 Topic 의 이름 뒤에 숫자가 Suffix 로 붙습니다. )

    이 기록의 의미는 groupA 가 topicA 의 0,1,2 의 파티션을 1개씩 조회하였다는 뜻입니다.

    만약 topicA 의 2번 파티션의 메시지를 2개 더 조회한 상황이라면
    topicA-2 & groupA : 3 이 추가됩니다.

    이러한 방식으로 topic : partition : group.id 별로 commit 된 offset 이 기록됩니다.

    __consumer_offsets 토픽은 internal topic 이라고 하여 기본적으로 생성되는 토픽입니다.
    생성되는 구체적인 시점은 kafka consumer 에 의해서 consume 이 시작되는 시점에 짠! 하고 생성됩니다.


    commit offset 과정.

     

     

    구조적으로 kafka consumer 의 offset 이 커밋되는 과정을 살펴보겠습니다.

    Kafka Broker 와 Kafka Consumer 가 서로를 인지하고 토픽의 메시지들을 조회하고 있다고 가정하겠습니다.

    토픽을 조회하는 과정에서 Kafka Consumer 는 몇번째 메시지까지 조회했다라는 기록을 해야합니다.

    이것을 Offset Commit 이라고 부릅니다.

    Offset Commit 이 중요한 이유는 Rebalancing 되는 상황에서 Consumer 가 새롭게 구동되어 토픽 조회를 다시 시작할 때 토픽의 몇번째 메시지부터 다시 읽어들여야하는지에 관한 문제점을 해결해주시 때문입니다.

    이를 멱등성이라고 부르는데요.

    Consumer 가 수천개의 데이터를 조회하는 과정에서 재시작되었다고해서 수천개의 데이터를 처음부터 조회할 수는 없기 때문입니다.

     

    데이터 파이프라인에서 메시지를 전달하는 방식이 크게 3가지가 존재합니다.

    - At-Least-Once

    - At-Most-Once

    - Exactly-Once

     

    At-Least-Once 는 최소 한번 이상 메시지를 전송할 수 있고, 중복을 허용하는 관점입니다.

    At-Most-Once 는 최대 한번만 메시지를 전송하고, 누락을 허용하되 중복을 허용하지 않는 관점입니다.

    Exactly-One 는 누락과 중복없이 반드시 1회의 메시지 전송을 허용하는 관점입니다.

    만약 특정 서비스가 At-Least-Once 관점을 중시한다면 Offset Commit 은 필요하지 않을 수도 있습니다.

    왜냐하면 중복을 허용하기 때문입니다.

    하지만 대부분의 서비스는 Exactly One 를 지향하기 때문에 Offset 의 관리가 중요합니다.

     

     

    Kafka Consumer들은 JoinGroup Request , SyncGroup Request 과정에서 어떤 Kafka Broker 가 Group Coordinator 인지 알 수 있습니다.

    <카프카 Consumer 의 전반적인 과정에 대한 다른 블로그 컨텐츠입니다.>

    Kafka Consumer 는 Offset Commit 하기 위해서 Group Coordinator 에게 OffsetCommitRequest 를 요청합니다.

    OffsetCommitRequest 의 요청 정보는 아래와 같습니다.

    Group ID : Consumer Group 의 이름.
    
    Member ID : Consumer Group 에 속한 개별 Consumer 들의 id
    
    Topic & Partition Offset : 현재 Commit 을 한 Topic & Partition & Offset 정보
    
    Genreation ID : Consumer Group 의 고유값으로 Consumer Group 이 생성될 때 Generation ID 는 생성됩니다.
    리밸런싱이 발생하여 Consumer Group 이 재구성될 때, Generation ID 또한 새로 만들어집니다.

    위 정보들로 구성된 OffsetCommitRequest 를 GroupCoordinator 가 받게되면,

    Group ID, Topic, Partition, Offset 등의 정보를 기반으로 __consumer_offsets 토픽에 기록합니다.

     

    * __consumer_offsets 파일 확인 및 실습

    __consumer_offsets 토픽을 저장하는 파일은 대개 /var/lib/kafka/data/ 리덱토리 하위에 저장됩니다.

    자세한 저장 위치는 server.properties 의 log.dirs 설정을 보셔야합니다.

     

    __consumer_offsets 토픽의 파티션 수는 기본적으로 50개입니다.

    그 이유는

    offsets.topic.num.partitions : 50

    의 설정을 따르기 때문입니다.

     

    그리고

    __consumer_offsets 토픽의 retention 은 7일로 기본 설정됩니다.

    offsets.retention.minutes : 7day

     

    실습을 통해서 자세한 내용을 살펴보겠습니다.



     먼저 docker-compose 로 kafka cluster 를 생성하겠습니다.

    아래의 docker-compose.yml 를 실행하여 카프카 클러스터를 생성합니다.

     

    <docker-compose.yml>

    version: '2'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000

     

    카프카 클러스터의 컨테이너가 생성되었다면,

    docker exec -it kafka-kafka-1 /bin/bash

    또는 도커 데스크탑을 활용해서 kafka-kafka-1 컨테이너 내부로 진입합니다.

     

     

    server.properties 파일의 log.dirs 설정을 확인합니다.

    log.dirs 는 로그 파일이 생성되는 경로인데요.

    해당 위치에 __consumer_commits 로그파일이 저장됩니다.

    cat /etc/kafka/server.properties | grep log.dirs -A 5
    
    log.dirs=/var/lib/kafka
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1

     

     

    카프카 클러스터가 생성된 최초의 로그파일의 상태입니다.

    어떤 토픽도 생성하지 않았기 때문에 __consumer_offsets 파일은 존재하지 않습니다.

    ls /var/lib/kafka/data
    
    cleaner-offset-checkpoint  
    log-start-offset-checkpoint  
    meta.properties  
    recovery-point-offset-checkpoint  
    replication-offset-checkpoint

     

     
     

    토픽을 하나 생성합니다.
    토픽의 이름은 testTopic 이며 replication-factor 는 1, partitions 은 3개를 가집니다.

    /usr/bin/kafka-topics --create --topic testTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
    
    Created topic testTopic.

     

    생성된 토픽 (testTopic) 에 데이터를 추가합니다.
    1부터 9 까지의 숫자를 추가하였습니다.

    /usr/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic testTopic
    
    >1
    >2
    >3
    >4
    >5
    >6
    >7
    >8
    >9

     

    /var/lib/kafka/data 디렉토리 하위에 3개의 파일이 생성되었습니다.
    각 파일의 이름은 testTopic-0, testTopic-1, testTopic-2 이며
    각 파일은 testTopic 토픽의 3개의 파티션 정보가 저장됩니다.

    ls /var/lib/kafka/data
    
    testTopic-0                    
    testTopic-1
    testTopic-2
    cleaner-offset-checkpoint    
    meta.properties                   
    replication-offset-checkpoint  
    log-start-offset-checkpoint  
    recovery-point-offset-checkpoint

     

    압축과 인코딩으로 형태를 알아보긴 쉽지않지만 kafka-console-producer 를 통해서 추가한 데이터가 보입니다.

    cat /var/lib/kafka/data/testTopic-0/00000000000000000000.log
    
    TK((13
          4Tbeh((567
                    9'({(9

     

    그리고 kafka-console-consumer 를 통해서 testTopic 을 조회합니다.

    Consumer Group 의 이름은 testGroup1 입니다.

    /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning --group testGroup1

     

    생성된 Consumer Group 과 Offset 의 Commit 상태를 확인할 수 있습니다.

    GROUP - TOPIC - PARTITION 에 따른 Commit Offset 의 상황을 확인할 수 있습니다.

    /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --list
    
    testGroup1
    
    kafka-consumer-groups --bootstrap-server localhost:9092 --all-groups --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    testGroup1      testTopic       0          15              15              0               -               -               -
    testGroup1      testTopic       1          0               0               0               -               -               -
    testGroup1      testTopic       2          0               0               0               -               -               -

     

    그리고 __consumer_offsets 관련된 파일들이 생성되어 있습니다.

    (__consumer_offsets 토픽은 Consume 이 시작되는 시점에 생성됨)

     ls /var/lib/kafka/data/
    __consumer_offsets-0   __consumer_offsets-15  __consumer_offsets-21  __consumer_offsets-28  __consumer_offsets-34  __consumer_offsets-40  __consumer_offsets-47  __consumer_offsets-9		   testTopic-1
    __consumer_offsets-1   __consumer_offsets-16  __consumer_offsets-22  __consumer_offsets-29  __consumer_offsets-35  __consumer_offsets-41  __consumer_offsets-48  cleaner-offset-checkpoint	   testTopic-2
    __consumer_offsets-10  __consumer_offsets-17  __consumer_offsets-23  __consumer_offsets-3   __consumer_offsets-36  __consumer_offsets-42  __consumer_offsets-49  log-start-offset-checkpoint
    __consumer_offsets-11  __consumer_offsets-18  __consumer_offsets-24  __consumer_offsets-30  __consumer_offsets-37  __consumer_offsets-43  __consumer_offsets-5	 meta.properties
    __consumer_offsets-12  __consumer_offsets-19  __consumer_offsets-25  __consumer_offsets-31  __consumer_offsets-38  __consumer_offsets-44  __consumer_offsets-6	 recovery-point-offset-checkpoint
    __consumer_offsets-13  __consumer_offsets-2   __consumer_offsets-26  __consumer_offsets-32  __consumer_offsets-39  __consumer_offsets-45  __consumer_offsets-7	 replication-offset-checkpoint
    __consumer_offsets-14  __consumer_offsets-20  __consumer_offsets-27  __consumer_offsets-33  __consumer_offsets-4   __consumer_offsets-46  __consumer_offsets-8	 testTopic-0

     

     


     그리고 du 명령어로 __consumer_offsets 의 용량 상태를 확인해보겠습니다.

    du -sh *
    12K	__consumer_offsets-0
    12K	__consumer_offsets-1
    12K	__consumer_offsets-10
    12K	__consumer_offsets-11
    12K	__consumer_offsets-12
    12K	__consumer_offsets-13
    12K	__consumer_offsets-14
    12K	__consumer_offsets-15
    12K	__consumer_offsets-16
    12K	__consumer_offsets-17
    12K	__consumer_offsets-18
    12K	__consumer_offsets-19
    12K	__consumer_offsets-2
    12K	__consumer_offsets-20
    12K	__consumer_offsets-21
    12K	__consumer_offsets-22
    12K	__consumer_offsets-23
    12K	__consumer_offsets-24
    12K	__consumer_offsets-25
    12K	__consumer_offsets-26
    12K	__consumer_offsets-27
    12K	__consumer_offsets-28
    12K	__consumer_offsets-29
    12K	__consumer_offsets-3
    12K	__consumer_offsets-30
    12K	__consumer_offsets-31
    12K	__consumer_offsets-32
    12K	__consumer_offsets-33
    12K	__consumer_offsets-34
    12K	__consumer_offsets-35
    12K	__consumer_offsets-36
    12K	__consumer_offsets-37
    12K	__consumer_offsets-38
    12K	__consumer_offsets-39
    12K	__consumer_offsets-4
    12K	__consumer_offsets-40
    12K	__consumer_offsets-41
    12K	__consumer_offsets-42
    12K	__consumer_offsets-43
    20K	__consumer_offsets-44
    12K	__consumer_offsets-45
    12K	__consumer_offsets-46
    12K	__consumer_offsets-47
    12K	__consumer_offsets-48
    12K	__consumer_offsets-49
    12K	__consumer_offsets-5
    12K	__consumer_offsets-6
    12K	__consumer_offsets-7
    12K	__consumer_offsets-8
    12K	__consumer_offsets-9

    다른 __consumer_offsets 파일들의 사용 용량은 12K인데 반해 20K 인 __consumer_offsets-44 이 존재합니다.

    해당 파일에 Commit Offset 의 데이터가 기록되어 있습니다.

     

    로그파일을 확인해보겠습니다.

    로그파일의 위치는 __consumer_offsets-44/00000000000000000000.log 입니다.

    cat /var/lib/kafka/data/__consumer_offsets-44/00000000000000000000.log
    
    N^????@
           D??@
               D???????????????
    testGroup1consumerrange5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??@
                                                                                    :5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??console-consumer
                                                                                                                                                             /172.24.0.3????	testTopic??????????%	testTopic?????B4C??@???@???????????v:
    testGroup1	testTopic0??????@?v:
    testGroup1	testTopic0??@?v:
    testGroup1	testTopic0??????@??mO0B??@3j??@3j??????????v:
    testGroup1	testTopic0??????@3gv:
    testGroup1	testTopic0??@3gv:
    testGroup1	testTopic0??????@3g?՜?]??@F???@F???????????v:
    testGroup1	testTopic0??????@F?v:
    testGroup1	testTopic0??@F?v:
    testGroup1	testTopic0??????@F?
    ??;\%??@Zr??@Zr??????????v:
    testGroup1	testTopic0??????@Zqv:
    testGroup1	testTopic0??@Zqv:
    ???EP??@m???@m???????????v:?????@Zq
    testGroup1	testTopic0??????@m?v:
    testGroup1	testTopic0??@m?v:
    testGroup1	testTopic0??????@m??F????@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@?????r??@???@???????????v:
    testGroup1	testTopic0??????@?v:
    testGroup1	testTopic0??@?v:
    testGroup1	testTopic0??????@???G??@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@?????|??@?'??@?'??????????v:
    testGroup1	testTopic0??????@?%v:
    testGroup1	testTopic0??@?%v:
    testGroup1	testTopic0??????@?%???7???@ϲ??@ϲ??????????v:
    testGroup1	testTopic0??????@ϯv:
    testGroup1	testTopic0??@ϯv:
    testGroup1	testTopic0??????@ϯ??.[??@?7??@?7??????????v:
    testGroup1	testTopic0??????@?5v:
    testGroup1	testTopic0??@?5v:
    testGroup1	testTopic0??????@?5"?y?K*??@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@??%?-W%??A

     

    역시 압축과 인코딩으로 인해서 쉽게 파악할 순 없지만, 해당 로그파일에 Commit 된 Offset 정보들이 기록됨을 알 수 있습니다.

     

     

     

    반응형

    'Kafka' 카테고리의 다른 글

Designed by Tistory.