-
kafka __consumer_offsets topic 이해하기Kafka 2023. 9. 18. 22:16728x90반응형
- 목차
키워드.
- Offset
- Consumer
- Consumer Group
들어가며.
카프카는 "__consumer_offsets" 이라는 이름의 토픽을 가집니다.
"__consumer_offsets" 토픽의 목적은 Consumer Group 이 조회한 토픽의 offset 들을 기록하는 것입니다.
예를 들어,
"topicA" 라는 토픽이 있고 파티션의 개수는 3개입니다.
그리고 "groupA" 라는 이름의 Consumer Group 이 있고 각 파티션 별로 1개 메시지를 조회한 상황입니다.
이 상황의 "__consumer_offsets" 에는- topicA-0 & groupA : 1
- topicA-1 & groupA : 1
- topicA-2 & groupA : 1
의 기록이 생성됩니다. ( 참고로 Partition 의 이름은 Topic 의 이름 뒤에 숫자가 Suffix 로 붙습니다. )
이 기록의 의미는 groupA 가 topicA 의 0,1,2 의 파티션을 1개씩 조회하였다는 뜻입니다.
만약 topicA 의 2번 파티션의 메시지를 2개 더 조회한 상황이라면
topicA-2 & groupA : 3 이 추가됩니다.이러한 방식으로 topic : partition : group.id 별로 commit 된 offset 이 기록됩니다.
__consumer_offsets 토픽은 internal topic 이라고 하여 기본적으로 생성되는 토픽입니다.
생성되는 구체적인 시점은 kafka consumer 에 의해서 consume 이 시작되는 시점에 짠! 하고 생성됩니다.commit offset 과정.
구조적으로 kafka consumer 의 offset 이 커밋되는 과정을 살펴보겠습니다.
Kafka Broker 와 Kafka Consumer 가 서로를 인지하고 토픽의 메시지들을 조회하고 있다고 가정하겠습니다.
토픽을 조회하는 과정에서 Kafka Consumer 는 몇번째 메시지까지 조회했다라는 기록을 해야합니다.
이것을 Offset Commit 이라고 부릅니다.
Offset Commit 이 중요한 이유는 Rebalancing 되는 상황에서 Consumer 가 새롭게 구동되어 토픽 조회를 다시 시작할 때 토픽의 몇번째 메시지부터 다시 읽어들여야하는지에 관한 문제점을 해결해주시 때문입니다.
이를 멱등성이라고 부르는데요.
Consumer 가 수천개의 데이터를 조회하는 과정에서 재시작되었다고해서 수천개의 데이터를 처음부터 조회할 수는 없기 때문입니다.
데이터 파이프라인에서 메시지를 전달하는 방식이 크게 3가지가 존재합니다.
- At-Least-Once
- At-Most-Once
- Exactly-Once
At-Least-Once 는 최소 한번 이상 메시지를 전송할 수 있고, 중복을 허용하는 관점입니다.
At-Most-Once 는 최대 한번만 메시지를 전송하고, 누락을 허용하되 중복을 허용하지 않는 관점입니다.
Exactly-One 는 누락과 중복없이 반드시 1회의 메시지 전송을 허용하는 관점입니다.
만약 특정 서비스가 At-Least-Once 관점을 중시한다면 Offset Commit 은 필요하지 않을 수도 있습니다.
왜냐하면 중복을 허용하기 때문입니다.
하지만 대부분의 서비스는 Exactly One 를 지향하기 때문에 Offset 의 관리가 중요합니다.
Kafka Consumer들은 JoinGroup Request , SyncGroup Request 과정에서 어떤 Kafka Broker 가 Group Coordinator 인지 알 수 있습니다.
<카프카 Consumer 의 전반적인 과정에 대한 다른 블로그 컨텐츠입니다.>
Kafka Consumer 는 Offset Commit 하기 위해서 Group Coordinator 에게 OffsetCommitRequest 를 요청합니다.
OffsetCommitRequest 의 요청 정보는 아래와 같습니다.
Group ID : Consumer Group 의 이름. Member ID : Consumer Group 에 속한 개별 Consumer 들의 id Topic & Partition Offset : 현재 Commit 을 한 Topic & Partition & Offset 정보 Genreation ID : Consumer Group 의 고유값으로 Consumer Group 이 생성될 때 Generation ID 는 생성됩니다. 리밸런싱이 발생하여 Consumer Group 이 재구성될 때, Generation ID 또한 새로 만들어집니다.
위 정보들로 구성된 OffsetCommitRequest 를 GroupCoordinator 가 받게되면,
Group ID, Topic, Partition, Offset 등의 정보를 기반으로 __consumer_offsets 토픽에 기록합니다.
* __consumer_offsets 파일 확인 및 실습
__consumer_offsets 토픽을 저장하는 파일은 대개 /var/lib/kafka/data/ 리덱토리 하위에 저장됩니다.
자세한 저장 위치는 server.properties 의 log.dirs 설정을 보셔야합니다.
__consumer_offsets 토픽의 파티션 수는 기본적으로 50개입니다.
그 이유는
offsets.topic.num.partitions : 50
의 설정을 따르기 때문입니다.
그리고
__consumer_offsets 토픽의 retention 은 7일로 기본 설정됩니다.
offsets.retention.minutes : 7day
실습을 통해서 자세한 내용을 살펴보겠습니다.
먼저 docker-compose 로 kafka cluster 를 생성하겠습니다.아래의 docker-compose.yml 를 실행하여 카프카 클러스터를 생성합니다.
<docker-compose.yml>
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:2181" kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
카프카 클러스터의 컨테이너가 생성되었다면,
docker exec -it kafka-kafka-1 /bin/bash
또는 도커 데스크탑을 활용해서 kafka-kafka-1 컨테이너 내부로 진입합니다.
server.properties 파일의 log.dirs 설정을 확인합니다.
log.dirs 는 로그 파일이 생성되는 경로인데요.
해당 위치에 __consumer_commits 로그파일이 저장됩니다.
cat /etc/kafka/server.properties | grep log.dirs -A 5 log.dirs=/var/lib/kafka # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1
카프카 클러스터가 생성된 최초의 로그파일의 상태입니다.
어떤 토픽도 생성하지 않았기 때문에 __consumer_offsets 파일은 존재하지 않습니다.
ls /var/lib/kafka/data cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint
토픽을 하나 생성합니다.
토픽의 이름은 testTopic 이며 replication-factor 는 1, partitions 은 3개를 가집니다./usr/bin/kafka-topics --create --topic testTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 Created topic testTopic.
생성된 토픽 (testTopic) 에 데이터를 추가합니다.
1부터 9 까지의 숫자를 추가하였습니다./usr/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic testTopic >1 >2 >3 >4 >5 >6 >7 >8 >9
/var/lib/kafka/data 디렉토리 하위에 3개의 파일이 생성되었습니다.
각 파일의 이름은 testTopic-0, testTopic-1, testTopic-2 이며
각 파일은 testTopic 토픽의 3개의 파티션 정보가 저장됩니다.ls /var/lib/kafka/data testTopic-0 testTopic-1 testTopic-2 cleaner-offset-checkpoint meta.properties replication-offset-checkpoint log-start-offset-checkpoint recovery-point-offset-checkpoint
압축과 인코딩으로 형태를 알아보긴 쉽지않지만 kafka-console-producer 를 통해서 추가한 데이터가 보입니다.
cat /var/lib/kafka/data/testTopic-0/00000000000000000000.log TK((13 4Tbeh((567 9'({(9
그리고 kafka-console-consumer 를 통해서 testTopic 을 조회합니다.
Consumer Group 의 이름은 testGroup1 입니다.
/usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning --group testGroup1
생성된 Consumer Group 과 Offset 의 Commit 상태를 확인할 수 있습니다.
GROUP - TOPIC - PARTITION 에 따른 Commit Offset 의 상황을 확인할 수 있습니다.
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --list testGroup1 kafka-consumer-groups --bootstrap-server localhost:9092 --all-groups --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testGroup1 testTopic 0 15 15 0 - - - testGroup1 testTopic 1 0 0 0 - - - testGroup1 testTopic 2 0 0 0 - - -
그리고 __consumer_offsets 관련된 파일들이 생성되어 있습니다.
(__consumer_offsets 토픽은 Consume 이 시작되는 시점에 생성됨)
ls /var/lib/kafka/data/ __consumer_offsets-0 __consumer_offsets-15 __consumer_offsets-21 __consumer_offsets-28 __consumer_offsets-34 __consumer_offsets-40 __consumer_offsets-47 __consumer_offsets-9 testTopic-1 __consumer_offsets-1 __consumer_offsets-16 __consumer_offsets-22 __consumer_offsets-29 __consumer_offsets-35 __consumer_offsets-41 __consumer_offsets-48 cleaner-offset-checkpoint testTopic-2 __consumer_offsets-10 __consumer_offsets-17 __consumer_offsets-23 __consumer_offsets-3 __consumer_offsets-36 __consumer_offsets-42 __consumer_offsets-49 log-start-offset-checkpoint __consumer_offsets-11 __consumer_offsets-18 __consumer_offsets-24 __consumer_offsets-30 __consumer_offsets-37 __consumer_offsets-43 __consumer_offsets-5 meta.properties __consumer_offsets-12 __consumer_offsets-19 __consumer_offsets-25 __consumer_offsets-31 __consumer_offsets-38 __consumer_offsets-44 __consumer_offsets-6 recovery-point-offset-checkpoint __consumer_offsets-13 __consumer_offsets-2 __consumer_offsets-26 __consumer_offsets-32 __consumer_offsets-39 __consumer_offsets-45 __consumer_offsets-7 replication-offset-checkpoint __consumer_offsets-14 __consumer_offsets-20 __consumer_offsets-27 __consumer_offsets-33 __consumer_offsets-4 __consumer_offsets-46 __consumer_offsets-8 testTopic-0
그리고 du 명령어로 __consumer_offsets 의 용량 상태를 확인해보겠습니다.du -sh * 12K __consumer_offsets-0 12K __consumer_offsets-1 12K __consumer_offsets-10 12K __consumer_offsets-11 12K __consumer_offsets-12 12K __consumer_offsets-13 12K __consumer_offsets-14 12K __consumer_offsets-15 12K __consumer_offsets-16 12K __consumer_offsets-17 12K __consumer_offsets-18 12K __consumer_offsets-19 12K __consumer_offsets-2 12K __consumer_offsets-20 12K __consumer_offsets-21 12K __consumer_offsets-22 12K __consumer_offsets-23 12K __consumer_offsets-24 12K __consumer_offsets-25 12K __consumer_offsets-26 12K __consumer_offsets-27 12K __consumer_offsets-28 12K __consumer_offsets-29 12K __consumer_offsets-3 12K __consumer_offsets-30 12K __consumer_offsets-31 12K __consumer_offsets-32 12K __consumer_offsets-33 12K __consumer_offsets-34 12K __consumer_offsets-35 12K __consumer_offsets-36 12K __consumer_offsets-37 12K __consumer_offsets-38 12K __consumer_offsets-39 12K __consumer_offsets-4 12K __consumer_offsets-40 12K __consumer_offsets-41 12K __consumer_offsets-42 12K __consumer_offsets-43 20K __consumer_offsets-44 12K __consumer_offsets-45 12K __consumer_offsets-46 12K __consumer_offsets-47 12K __consumer_offsets-48 12K __consumer_offsets-49 12K __consumer_offsets-5 12K __consumer_offsets-6 12K __consumer_offsets-7 12K __consumer_offsets-8 12K __consumer_offsets-9
다른 __consumer_offsets 파일들의 사용 용량은 12K인데 반해 20K 인 __consumer_offsets-44 이 존재합니다.
해당 파일에 Commit Offset 의 데이터가 기록되어 있습니다.
로그파일을 확인해보겠습니다.
로그파일의 위치는 __consumer_offsets-44/00000000000000000000.log 입니다.
cat /var/lib/kafka/data/__consumer_offsets-44/00000000000000000000.log N^????@ D??@ D??????????????? testGroup1consumerrange5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??@ :5console-consumer-5fb62126-13a5-4f91-bda7-58d7b15cbc0d??console-consumer /172.24.0.3???? testTopic??????????% testTopic?????B4C??@???@???????????v: testGroup1 testTopic0??????@?v: testGroup1 testTopic0??@?v: testGroup1 testTopic0??????@??mO0B??@3j??@3j??????????v: testGroup1 testTopic0??????@3gv: testGroup1 testTopic0??@3gv: testGroup1 testTopic0??????@3g?՜?]??@F???@F???????????v: testGroup1 testTopic0??????@F?v: testGroup1 testTopic0??@F?v: testGroup1 testTopic0??????@F? ??;\%??@Zr??@Zr??????????v: testGroup1 testTopic0??????@Zqv: testGroup1 testTopic0??@Zqv: ???EP??@m???@m???????????v:?????@Zq testGroup1 testTopic0??????@m?v: testGroup1 testTopic0??@m?v: testGroup1 testTopic0??????@m??F????@????@????????????v: testGroup1 testTopic0??????@??v: testGroup1 testTopic0??@??v: testGroup1 testTopic0??????@?????r??@???@???????????v: testGroup1 testTopic0??????@?v: testGroup1 testTopic0??@?v: testGroup1 testTopic0??????@???G??@????@????????????v: testGroup1 testTopic0??????@??v: testGroup1 testTopic0??@??v: testGroup1 testTopic0??????@?????|??@?'??@?'??????????v: testGroup1 testTopic0??????@?%v: testGroup1 testTopic0??@?%v: testGroup1 testTopic0??????@?%???7???@ϲ??@ϲ??????????v: testGroup1 testTopic0??????@ϯv: testGroup1 testTopic0??@ϯv: testGroup1 testTopic0??????@ϯ??.[??@?7??@?7??????????v: testGroup1 testTopic0??????@?5v: testGroup1 testTopic0??@?5v: testGroup1 testTopic0??????@?5"?y?K*??@????@????????????v: testGroup1 testTopic0??????@??v: testGroup1 testTopic0??@??v: testGroup1 testTopic0??????@??%?-W%??A
역시 압축과 인코딩으로 인해서 쉽게 파악할 순 없지만, 해당 로그파일에 Commit 된 Offset 정보들이 기록됨을 알 수 있습니다.
반응형'Kafka' 카테고리의 다른 글
Kafka-Connect & MySQL 구현하기 (0) 2023.12.09 Kafka Replication (메시지 복제) 이해하기 (0) 2023.09.21 Kafka Consumer 개념 (0) 2023.09.13 Kafka Producer Process (카프카 프로듀서 과정) (0) 2023.09.09 kafka consumer (0) 2023.03.02