ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka __consumer_offsets topic 이해하기
    Kafka 2023. 9. 18. 22:16
    728x90
    반응형

    - 목차

     

    키워드.

    • Offset
    • Consumer
    • Consumer Group

     

    들어가며.

     

    카프카는 "__consumer_offsets" 이라는 이름의 토픽을 가집니다.
    "__consumer_offsets"  토픽의 목적은 Consumer Group 이 조회한 토픽의 offset 들을 기록하는 것입니다.

    예를 들어,
    "topicA" 라는 토픽이 있고 파티션의 개수는 3개입니다.
    그리고 "groupA" 라는 이름의 Consumer Group 이 있고 각 파티션 별로 1개 메시지를 조회한 상황입니다.
    이 상황의 "__consumer_offsets" 에는

    • topicA-0 & groupA : 1
    • topicA-1 & groupA : 1
    • topicA-2 & groupA : 1

    의 기록이 생성됩니다. ( 참고로 Partition 의 이름은 Topic 의 이름 뒤에 숫자가 Suffix 로 붙습니다. )

    이 기록의 의미는 groupA 가 topicA 의 0,1,2 의 파티션을 1개씩 조회하였다는 뜻입니다.

    만약 topicA 의 2번 파티션의 메시지를 2개 더 조회한 상황이라면
    topicA-2 & groupA : 3 이 추가됩니다.

    이러한 방식으로 topic : partition : group.id 별로 commit 된 offset 이 기록됩니다.

    __consumer_offsets 토픽은 internal topic 이라고 하여 기본적으로 생성되는 토픽입니다.
    생성되는 구체적인 시점은 kafka consumer 에 의해서 consume 이 시작되는 시점에 짠! 하고 생성됩니다.


    commit offset 과정.

     

     

    구조적으로 kafka consumer 의 offset 이 커밋되는 과정을 살펴보겠습니다.

    Kafka Broker 와 Kafka Consumer 가 서로를 인지하고 토픽의 메시지들을 조회하고 있다고 가정하겠습니다.

    토픽을 조회하는 과정에서 Kafka Consumer 는 몇번째 메시지까지 조회했다라는 기록을 해야합니다.

    이것을 Offset Commit 이라고 부릅니다.

    Offset Commit 이 중요한 이유는 Rebalancing 되는 상황에서 Consumer 가 새롭게 구동되어 토픽 조회를 다시 시작할 때 토픽의 몇번째 메시지부터 다시 읽어들여야하는지에 관한 문제점을 해결해주시 때문입니다.

    이를 멱등성이라고 부르는데요.

    Consumer 가 수천개의 데이터를 조회하는 과정에서 재시작되었다고해서 수천개의 데이터를 처음부터 조회할 수는 없기 때문입니다.

     

    데이터 파이프라인에서 메시지를 전달하는 방식이 크게 3가지가 존재합니다.

    - At-Least-Once

    - At-Most-Once

    - Exactly-Once

     

    At-Least-Once 는 최소 한번 이상 메시지를 전송할 수 있고, 중복을 허용하는 관점입니다.

    At-Most-Once 는 최대 한번만 메시지를 전송하고, 누락을 허용하되 중복을 허용하지 않는 관점입니다.

    Exactly-One 는 누락과 중복없이 반드시 1회의 메시지 전송을 허용하는 관점입니다.

    만약 특정 서비스가 At-Least-Once 관점을 중시한다면 Offset Commit 은 필요하지 않을 수도 있습니다.

    왜냐하면 중복을 허용하기 때문입니다.

    하지만 대부분의 서비스는 Exactly One 를 지향하기 때문에 Offset 의 관리가 중요합니다.

     

     

    Kafka Consumer들은 JoinGroup Request , SyncGroup Request 과정에서 어떤 Kafka Broker 가 Group Coordinator 인지 알 수 있습니다.

    <카프카 Consumer 의 전반적인 과정에 대한 다른 블로그 컨텐츠입니다.>

    Kafka Consumer 는 Offset Commit 하기 위해서 Group Coordinator 에게 OffsetCommitRequest 를 요청합니다.

    OffsetCommitRequest 의 요청 정보는 아래와 같습니다.

    Group ID : Consumer Group 의 이름.
    
    Member ID : Consumer Group 에 속한 개별 Consumer 들의 id
    
    Topic & Partition Offset : 현재 Commit 을 한 Topic & Partition & Offset 정보
    
    Genreation ID : Consumer Group 의 고유값으로 Consumer Group 이 생성될 때 Generation ID 는 생성됩니다.
    리밸런싱이 발생하여 Consumer Group 이 재구성될 때, Generation ID 또한 새로 만들어집니다.

    위 정보들로 구성된 OffsetCommitRequest 를 GroupCoordinator 가 받게되면,

    Group ID, Topic, Partition, Offset 등의 정보를 기반으로 __consumer_offsets 토픽에 기록합니다.

     

    * __consumer_offsets 파일 확인 및 실습

    __consumer_offsets 토픽을 저장하는 파일은 대개 /var/lib/kafka/data/ 리덱토리 하위에 저장됩니다.

    자세한 저장 위치는 server.properties 의 log.dirs 설정을 보셔야합니다.

     

    __consumer_offsets 토픽의 파티션 수는 기본적으로 50개입니다.

    그 이유는

    offsets.topic.num.partitions : 50

    의 설정을 따르기 때문입니다.

     

    그리고

    __consumer_offsets 토픽의 retention 은 7일로 기본 설정됩니다.

    offsets.retention.minutes : 7day

     

    실습을 통해서 자세한 내용을 살펴보겠습니다.



     먼저 docker-compose 로 kafka cluster 를 생성하겠습니다.

    아래의 docker-compose.yml 를 실행하여 카프카 클러스터를 생성합니다.

     

    <docker-compose.yml>

    version: '2'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000

     

    카프카 클러스터의 컨테이너가 생성되었다면,

    docker exec -it kafka-kafka-1 /bin/bash

    또는 도커 데스크탑을 활용해서 kafka-kafka-1 컨테이너 내부로 진입합니다.

     

     

    server.properties 파일의 log.dirs 설정을 확인합니다.

    log.dirs 는 로그 파일이 생성되는 경로인데요.

    해당 위치에 __consumer_commits 로그파일이 저장됩니다.

    cat /etc/kafka/server.properties | grep log.dirs -A 5
    
    log.dirs=/var/lib/kafka
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1

     

     

    카프카 클러스터가 생성된 최초의 로그파일의 상태입니다.

    어떤 토픽도 생성하지 않았기 때문에 __consumer_offsets 파일은 존재하지 않습니다.

    ls /var/lib/kafka/data
    
    cleaner-offset-checkpoint  
    log-start-offset-checkpoint  
    meta.properties  
    recovery-point-offset-checkpoint  
    replication-offset-checkpoint

     

     
     

    토픽을 하나 생성합니다.
    토픽의 이름은 testTopic 이며 replication-factor 는 1, partitions 은 3개를 가집니다.

    /usr/bin/kafka-topics --create --topic testTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
    
    Created topic testTopic.

     

    생성된 토픽 (testTopic) 에 데이터를 추가합니다.
    1부터 9 까지의 숫자를 추가하였습니다.

    /usr/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic testTopic
    
    >1
    >2
    >3
    >4
    >5
    >6
    >7
    >8
    >9

     

    /var/lib/kafka/data 디렉토리 하위에 3개의 파일이 생성되었습니다.
    각 파일의 이름은 testTopic-0, testTopic-1, testTopic-2 이며
    각 파일은 testTopic 토픽의 3개의 파티션 정보가 저장됩니다.

    ls /var/lib/kafka/data
    
    testTopic-0                    
    testTopic-1
    testTopic-2
    cleaner-offset-checkpoint    
    meta.properties                   
    replication-offset-checkpoint  
    log-start-offset-checkpoint  
    recovery-point-offset-checkpoint

     

    압축과 인코딩으로 형태를 알아보긴 쉽지않지만 kafka-console-producer 를 통해서 추가한 데이터가 보입니다.

    cat /var/lib/kafka/data/testTopic-0/00000000000000000000.log
    
    TK((13
          4Tbeh((567
                    9'({(9

     

    그리고 kafka-console-consumer 를 통해서 testTopic 을 조회합니다.

    Consumer Group 의 이름은 testGroup1 입니다.

    /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning --group testGroup1

     

    생성된 Consumer Group 과 Offset 의 Commit 상태를 확인할 수 있습니다.

    GROUP - TOPIC - PARTITION 에 따른 Commit Offset 의 상황을 확인할 수 있습니다.

    /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --list
    
    testGroup1
    
    kafka-consumer-groups --bootstrap-server localhost:9092 --all-groups --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    testGroup1      testTopic       0          15              15              0               -               -               -
    testGroup1      testTopic       1          0               0               0               -               -               -
    testGroup1      testTopic       2          0               0               0               -               -               -

     

    그리고 __consumer_offsets 관련된 파일들이 생성되어 있습니다.

    (__consumer_offsets 토픽은 Consume 이 시작되는 시점에 생성됨)

     ls /var/lib/kafka/data/
    __consumer_offsets-0   __consumer_offsets-15  __consumer_offsets-21  __consumer_offsets-28  __consumer_offsets-34  __consumer_offsets-40  __consumer_offsets-47  __consumer_offsets-9		   testTopic-1
    __consumer_offsets-1   __consumer_offsets-16  __consumer_offsets-22  __consumer_offsets-29  __consumer_offsets-35  __consumer_offsets-41  __consumer_offsets-48  cleaner-offset-checkpoint	   testTopic-2
    __consumer_offsets-10  __consumer_offsets-17  __consumer_offsets-23  __consumer_offsets-3   __consumer_offsets-36  __consumer_offsets-42  __consumer_offsets-49  log-start-offset-checkpoint
    __consumer_offsets-11  __consumer_offsets-18  __consumer_offsets-24  __consumer_offsets-30  __consumer_offsets-37  __consumer_offsets-43  __consumer_offsets-5	 meta.properties
    __consumer_offsets-12  __consumer_offsets-19  __consumer_offsets-25  __consumer_offsets-31  __consumer_offsets-38  __consumer_offsets-44  __consumer_offsets-6	 recovery-point-offset-checkpoint
    __consumer_offsets-13  __consumer_offsets-2   __consumer_offsets-26  __consumer_offsets-32  __consumer_offsets-39  __consumer_offsets-45  __consumer_offsets-7	 replication-offset-checkpoint
    __consumer_offsets-14  __consumer_offsets-20  __consumer_offsets-27  __consumer_offsets-33  __consumer_offsets-4   __consumer_offsets-46  __consumer_offsets-8	 testTopic-0

     

     


     그리고 du 명령어로 __consumer_offsets 의 용량 상태를 확인해보겠습니다.

    du -sh *
    12K	__consumer_offsets-0
    12K	__consumer_offsets-1
    12K	__consumer_offsets-10
    12K	__consumer_offsets-11
    12K	__consumer_offsets-12
    12K	__consumer_offsets-13
    12K	__consumer_offsets-14
    12K	__consumer_offsets-15
    12K	__consumer_offsets-16
    12K	__consumer_offsets-17
    12K	__consumer_offsets-18
    12K	__consumer_offsets-19
    12K	__consumer_offsets-2
    12K	__consumer_offsets-20
    12K	__consumer_offsets-21
    12K	__consumer_offsets-22
    12K	__consumer_offsets-23
    12K	__consumer_offsets-24
    12K	__consumer_offsets-25
    12K	__consumer_offsets-26
    12K	__consumer_offsets-27
    12K	__consumer_offsets-28
    12K	__consumer_offsets-29
    12K	__consumer_offsets-3
    12K	__consumer_offsets-30
    12K	__consumer_offsets-31
    12K	__consumer_offsets-32
    12K	__consumer_offsets-33
    12K	__consumer_offsets-34
    12K	__consumer_offsets-35
    12K	__consumer_offsets-36
    12K	__consumer_offsets-37
    12K	__consumer_offsets-38
    12K	__consumer_offsets-39
    12K	__consumer_offsets-4
    12K	__consumer_offsets-40
    12K	__consumer_offsets-41
    12K	__consumer_offsets-42
    12K	__consumer_offsets-43
    20K	__consumer_offsets-44
    12K	__consumer_offsets-45
    12K	__consumer_offsets-46
    12K	__consumer_offsets-47
    12K	__consumer_offsets-48
    12K	__consumer_offsets-49
    12K	__consumer_offsets-5
    12K	__consumer_offsets-6
    12K	__consumer_offsets-7
    12K	__consumer_offsets-8
    12K	__consumer_offsets-9

    다른 __consumer_offsets 파일들의 사용 용량은 12K인데 반해 20K 인 __consumer_offsets-44 이 존재합니다.

    해당 파일에 Commit Offset 의 데이터가 기록되어 있습니다.

     

    로그파일을 확인해보겠습니다.

    로그파일의 위치는 __consumer_offsets-44/00000000000000000000.log 입니다.

    cat /var/lib/kafka/data/__consumer_offsets-44/00000000000000000000.log
    
    N^????@
           D??@
               D???????????????
    testGroup1consumerrange5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??@
                                                                                    :5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??console-consumer
                                                                                                                                                             /172.24.0.3????	testTopic??????????%	testTopic?????B4C??@???@???????????v:
    testGroup1	testTopic0??????@?v:
    testGroup1	testTopic0??@?v:
    testGroup1	testTopic0??????@??mO0B??@3j??@3j??????????v:
    testGroup1	testTopic0??????@3gv:
    testGroup1	testTopic0??@3gv:
    testGroup1	testTopic0??????@3g?՜?]??@F???@F???????????v:
    testGroup1	testTopic0??????@F?v:
    testGroup1	testTopic0??@F?v:
    testGroup1	testTopic0??????@F?
    ??;\%??@Zr??@Zr??????????v:
    testGroup1	testTopic0??????@Zqv:
    testGroup1	testTopic0??@Zqv:
    ???EP??@m???@m???????????v:?????@Zq
    testGroup1	testTopic0??????@m?v:
    testGroup1	testTopic0??@m?v:
    testGroup1	testTopic0??????@m??F????@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@?????r??@???@???????????v:
    testGroup1	testTopic0??????@?v:
    testGroup1	testTopic0??@?v:
    testGroup1	testTopic0??????@???G??@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@?????|??@?'??@?'??????????v:
    testGroup1	testTopic0??????@?%v:
    testGroup1	testTopic0??@?%v:
    testGroup1	testTopic0??????@?%???7???@ϲ??@ϲ??????????v:
    testGroup1	testTopic0??????@ϯv:
    testGroup1	testTopic0??@ϯv:
    testGroup1	testTopic0??????@ϯ??.[??@?7??@?7??????????v:
    testGroup1	testTopic0??????@?5v:
    testGroup1	testTopic0??@?5v:
    testGroup1	testTopic0??????@?5"?y?K*??@????@????????????v:
    testGroup1	testTopic0??????@??v:
    testGroup1	testTopic0??@??v:
    testGroup1	testTopic0??????@??%?-W%??A

     

    역시 압축과 인코딩으로 인해서 쉽게 파악할 순 없지만, 해당 로그파일에 Commit 된 Offset 정보들이 기록됨을 알 수 있습니다.

     

     

     

    반응형

    'Kafka' 카테고리의 다른 글

    Kafka-Connect & MySQL 구현하기  (0) 2023.12.09
    Kafka Replication (메시지 복제) 이해하기  (0) 2023.09.21
    Kafka Consumer 개념  (0) 2023.09.13
    Kafka Producer Process (카프카 프로듀서 과정)  (0) 2023.09.09
    kafka consumer  (0) 2023.03.02
Designed by Tistory.