-
[Kafka] __consumer_offsets Topic 알아보기Kafka/Kafka Consumer 2024. 7. 1. 06:15반응형
- 목차
__consumer_offsets Topic 의 생성 시점.
"__consumer_offsets" Topic 은 Kafka 의 내장된 토픽입니다.
그래서 이는 "kafka-topics.sh" 명령어를 사용하여 사용자가 직접 생성하는 토픽이 아닙니다.
하지만 빌트인 형태로 존재하는 토픽이더라도 __consumer_offsets 토픽은 카프카 클러스터가 생성됨과 동시에 생성되지 않습니다.
__consumer_offsets 토픽은 최초로 Consumer 가 실행되는 시점에 생성되며, Lazy 한 방식으로 __consumer_offsets 토픽을 생성합니다.
아래의 명령어는 현재 생성된 토픽의 목록을 출력하는 명령어입니다.
( 저의 경우에는 카프카 브로커의 advertised.listeners 주소가 kafka1:9092 입니다. )
kafka-topics.sh --bootstrap-server kafka1:9092 --list > test-topic
사전에 생성해준 "test-topic" 이라는 이름의 토픽명이 출력됩니다.
아직까지 최초의 Consumer 가 실행되지 않았기 때문에 __consumer_offsets 토픽은 생성되지 않았습니다.
그리고 Consumer 가 실행된 이후에 kafka-topics --list 명령을 실행합니다.
그럼 아래와 같이 __consumer_offsets 토픽이 생성됨을 확인할 수 있습니다.
kafka-topics.sh --bootstrap-server kafka1:9092 --list __consumer_offsets test-topic
__consumer_offsets Topic 은 FindCoordinator API 이 호출될 때에 생성.
__consumer_offsets 토픽의 정확한 생성 시점은 Client 가 FindCoordinator API 를 호출하는 시점에 생성됩니다.
아래의 이미지는 Kafka Consumer 가 Broker 에게 FindCoordinator API 를 요청하는 상황입니다.
FindCoordinator 는 Consumer Group 의 Heartbeat, Rebalancing 등의 관리하는 브로커입니다.
Consumer 가 여러 브로커들 중에서 적절한 Group Coordinator 를 찾기 위해서 FindCoordinator API 를 전달하는데요.
이 과정에서 브로커는 __consumer_offsets 토픽을 생성하게 됩니다.
출처 : https://developer.confluent.io/courses/architecture/consumer-group-protocol/ cleanup.policy == compact
Kafka 의 토픽은 2개의 cleanup.policy 를 가집니다.
하나가 delete 그리고 다른 하나는 compact 입니다.
먼저 delete cleanup.policy 에 대해서 말씀드리겠습니다.
delete cleanup.policy 는 일반적은 Topic 에 적용되는 cleanup.policy 입니다.
이는 시간과 용량에 대한 조건이 있으며, 지정된 시간보다 오래거나 지정된 용량을 초과하게 되면 오래된 레코드 순서로 제거됩니다.
delete cleanup.policy 를 적용하기 위해서 retention.ms 나 retention.bytes 설정을 토픽 레벨에서 지정할 수 있습니다.
https://westlife0615.tistory.com/930
[Kafka] Topic 의 retention.bytes & retention.ms 알아보기
- 목차 들어가며.카프카는 레코드가 저장되는 토픽과 파티션에 Retention 을 설정할 수 있습니다.Retention 의 적용 기준은 용량과 시간입니다.즉, 일정 바이트 이상으로 데이터가 쌓이거나 설정된
westlife0615.tistory.com
그리고 compact cleanup.policy 는 Record 의 Key 를 기준으로 동일한 Key 를 가지는 오래된 레코드들을 삭제합니다.
따라서 이론적으로 가장 최신의 Key 가 보존되는 방식입니다.
일반적으로 __consumer_offsets 에 저장되는 레코드들은 Consumer Group 의 상태를 나태냅니다.
아래와 같은 형식으로 Key 특정 Consumer Group 이 어떤 Member 들로 구성되어 있는지에 대한 정보를 포함합니다.
이 또한 Compact cleanup.policy 에 의거해서 가장 최신의 레코드를 확인하면 현재 시점의 Consumer Group 의 Membership 정보를 알 수 있습니다.
Key = { "group": "my-consumer-group" } Value = { "protocol_type": "consumer", "generation": 13, "protocol": "roundrobin", "leader": "this_is_client_id-cb21db20-1cd7-4d01-8f41-777285c36caa", "current_state_timestamp": 1740622550866, "members": [ { "member_id": "this_is_client_id-1cfb4161-58b9-404d-bc83-db49e2cd029a", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.8", "rebalance_timeout": 300000, "session_timeout": 45000, "subscription": "AAMAAAABAAp0ZXN0LXRvcGljAAAAAAAAAAD/////AAA=", "assignment": "AAAAAAABAAp0ZXN0LXRvcGljAAAAAgAAAAAAAAACAAAAAA==" }, { "member_id": "this_is_client_id-cb21db20-1cd7-4d01-8f41-777285c36caa", "group_instance_id": null, "client_id": "this_is_client_id", "client_host": "/172.19.0.6", "rebalance_timeout": 300000, "session_timeout": 45000, "subscription": "AAMAAAABAAp0ZXN0LXRvcGljAAAAAAAAAAD/////AAA=", "assignment": "AAAAAAABAAp0ZXN0LXRvcGljAAAAAQAAAAEAAAAA" } ] }
또한 아래와 같은 형식으로 Consumer Group 와 Topic/Partition 의 커밋된 Offset 기록 또한 확인할 수 있습니다.
Key = { "group": "my-consumer-group", "topic": "test-topic", "partition": 1 } Value = { "offset": 7482233, "leader_epoch": 0, "metadata": "", "commit_timestamp": 1610622416392 }
__consumer_offsets Topic 읽는 방법.
Kafdrop 이나 Kafka UI 와 같은 툴을 활용하여 __consumer_offsets Topic 을 열람할 수 있습니다.
뿐만 아니라 kafka-console-consumer.sh 명령어를 통해서도 __consumer_offsets 토픽의 열람이 가능합니다.
kafka-console-consumer.sh --bootstrap-server kafka1:9092 \ --topic __consumer_offsets \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --from-beginning --property print.value=true
아래와 같은 형식으로 레코드들이 출력되게 됩니다.
[my-consumer-group,test-topic,1]::OffsetAndMetadata(offset=10042309, leaderEpoch=Optional[0], metadata=, commitTimestamp=1610622720392, expireTimestamp=None) [my-consumer-group,test-topic,1]::OffsetAndMetadata(offset=10042309, leaderEpoch=Optional[0], metadata=, commitTimestamp=1610622720401, expireTimestamp=None)
Group Coordinator 와 __consumer_offsets Partition Leader.
여러 Broker 들 중에서 Group Coordinator 가 선택되는 방식은 group.id 를 Hashing 한 결과를 따릅니다.
Group Coordinator 을 결정하는 Hashing 의 구체적은 구현 내용을 모르지만,
Consumer 는 FindCoordinator API 를 통해서 특정 Group Coordinator 와 통신을 수행하게 됩니다.
그리고 Consumer Group 은 일반적으로 50개의 Partition 을 가지는 __consumer_offsets Topic 의 단 하나의 Partition 에 Group 의 상태를 기록합니다.
그리고 이 Partition 의 Leader Broker 가 바로 Group Coordinator 가 됩니다.
현재 저는 3개의 Broker 와 1개의 Consumer 를 실행 중입니다.
아래의 기록은 Consumer 가 Broker 와 맺고 있는 Socket 의 정보입니다.
주목할 점은 "kafka1.kafka-net:9091" 와 2개의 Socket 을 맺고있습니다.
하나의 소켓은 데이터를 Fetch 하기 위한 소켓이며, 다른 하나는 Heartbeat 를 전송하기 위한 소켓입니다.
python 11 root 11u IPv4 2209667 0t0 TCP 3d3fc962b9b0:56506->kafka1.kafka-net:9091 (ESTABLISHED) python 11 root 12u IPv4 2219207 0t0 TCP 3d3fc962b9b0:49888->kafka1.kafka-net:9091 (ESTABLISHED) python 11 root 15u IPv4 2214419 0t0 TCP 3d3fc962b9b0:53182->kafka3.kafka-net:9091 (ESTABLISHED) python 11 root 16u IPv4 2218236 0t0 TCP 3d3fc962b9b0:49900->kafka2.kafka-net:9091 (ESTABLISHED)
Fetch API 는 각 Partition 의 Leader Broker 와 통신을 수행합니다.
그리고 Heartbeat API 의 경우에는 Group Coordinator 와 통신을 맺게 됩니다.
따라서 현재 Kafka1 브로커가 Consumer Group 의 Coordinator 가 됩니다.
그리고 아래의 명령어를 통해서 현재 __consumer_offsets 토픽의 Leader Broker 를 파악할 수 있습니다.
kafka-topics.sh --bootstrap-server kafka1:9092 --topic __consumer_offsets --partition 13 --describe
Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 2,3,1
반응형'Kafka > Kafka Consumer' 카테고리의 다른 글
[Kafka] Static Membership 과 Partition Assignment 관계 알아보기 ( group.instance.id ) (0) 2024.06.30 [Kafka] Consumer 의 max.poll.records 와 Offset Commit 알아보기 (0) 2024.06.30 [Kafka] Consumer가 추가되면 Rebalance는 어떻게 동작할까? (0) 2024.06.30 [Kafka] Consumer Heartbeat 의 내부 동작 원리 (0) 2024.06.30 [Kafka] Consumer 의 Fetch Request 와 max_wait_ms 관계 알아보기 ( fetch.wait.max.ms ) (0) 2024.06.30