ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] __consumer_offsets Topic 알아보기
    Kafka/Kafka Consumer 2024. 7. 1. 06:15
    반응형

    - 목차

     

    __consumer_offsets Topic 의 생성 시점.

    "__consumer_offsets" Topic 은 Kafka 의 내장된 토픽입니다.

    그래서 이는 "kafka-topics.sh" 명령어를 사용하여 사용자가 직접 생성하는 토픽이 아닙니다.

    하지만 빌트인 형태로 존재하는 토픽이더라도 __consumer_offsets 토픽은 카프카 클러스터가 생성됨과 동시에 생성되지 않습니다.

    __consumer_offsets 토픽은 최초로 Consumer 가 실행되는 시점에 생성되며, Lazy 한 방식으로 __consumer_offsets 토픽을 생성합니다.

     

    아래의 명령어는 현재 생성된 토픽의 목록을 출력하는 명령어입니다.

    ( 저의 경우에는 카프카 브로커의 advertised.listeners 주소가 kafka1:9092 입니다. )

    kafka-topics.sh --bootstrap-server kafka1:9092 --list
    
    > test-topic

     

    사전에 생성해준 "test-topic" 이라는 이름의 토픽명이 출력됩니다.

    아직까지 최초의 Consumer 가 실행되지 않았기 때문에 __consumer_offsets 토픽은 생성되지 않았습니다.

     

    그리고 Consumer 가 실행된 이후에 kafka-topics --list 명령을 실행합니다.

    그럼 아래와 같이 __consumer_offsets 토픽이 생성됨을 확인할 수 있습니다.

    kafka-topics.sh --bootstrap-server kafka1:9092 --list
    
    __consumer_offsets
    test-topic

     

     

    __consumer_offsets Topic 은 FindCoordinator API 이 호출될 때에 생성.

    __consumer_offsets 토픽의 정확한 생성 시점은 Client 가 FindCoordinator API 를 호출하는 시점에 생성됩니다.

    아래의 이미지는 Kafka Consumer 가 Broker 에게 FindCoordinator API 를 요청하는 상황입니다.

    FindCoordinator 는 Consumer Group 의 Heartbeat, Rebalancing 등의 관리하는 브로커입니다.

    Consumer 가 여러 브로커들 중에서 적절한 Group Coordinator 를 찾기 위해서 FindCoordinator API 를 전달하는데요.

    이 과정에서 브로커는 __consumer_offsets 토픽을 생성하게 됩니다.

     

     

    출처 : https://developer.confluent.io/courses/architecture/consumer-group-protocol/

     

     

    cleanup.policy == compact

    Kafka 의 토픽은 2개의 cleanup.policy 를 가집니다.

    하나가 delete 그리고 다른 하나는 compact 입니다.

    먼저 delete cleanup.policy 에 대해서 말씀드리겠습니다.

    delete cleanup.policy 는 일반적은 Topic 에 적용되는 cleanup.policy 입니다.

    이는 시간과 용량에 대한 조건이 있으며, 지정된 시간보다 오래거나 지정된 용량을 초과하게 되면 오래된 레코드 순서로 제거됩니다.

    delete cleanup.policy 를 적용하기 위해서 retention.ms 나 retention.bytes 설정을 토픽 레벨에서 지정할 수 있습니다.

     

    https://westlife0615.tistory.com/930

     

    [Kafka] Topic 의 retention.bytes & retention.ms 알아보기

    - 목차 들어가며.카프카는 레코드가 저장되는 토픽과 파티션에 Retention 을 설정할 수 있습니다.Retention 의 적용 기준은 용량과 시간입니다.즉, 일정 바이트 이상으로 데이터가 쌓이거나 설정된

    westlife0615.tistory.com

     

    그리고 compact cleanup.policy 는 Record 의 Key 를 기준으로 동일한 Key 를 가지는 오래된 레코드들을 삭제합니다.

    따라서 이론적으로 가장 최신의 Key 가 보존되는 방식입니다. 

     

    일반적으로 __consumer_offsets 에 저장되는 레코드들은 Consumer Group 의 상태를 나태냅니다.

    아래와 같은 형식으로 Key 특정 Consumer Group 이 어떤 Member 들로 구성되어 있는지에 대한 정보를 포함합니다.

    이 또한 Compact cleanup.policy 에 의거해서 가장 최신의 레코드를 확인하면 현재 시점의 Consumer Group 의 Membership 정보를 알 수 있습니다.

    Key = {
    	"group": "my-consumer-group"
    }
    
    
    Value = {
    	"protocol_type": "consumer",
    	"generation": 13,
    	"protocol": "roundrobin",
    	"leader": "this_is_client_id-cb21db20-1cd7-4d01-8f41-777285c36caa",
    	"current_state_timestamp": 1740622550866,
    	"members": [
    		{
    			"member_id": "this_is_client_id-1cfb4161-58b9-404d-bc83-db49e2cd029a",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.8",
    			"rebalance_timeout": 300000,
    			"session_timeout": 45000,
    			"subscription": "AAMAAAABAAp0ZXN0LXRvcGljAAAAAAAAAAD/////AAA=",
    			"assignment": "AAAAAAABAAp0ZXN0LXRvcGljAAAAAgAAAAAAAAACAAAAAA=="
    		},
    		{
    			"member_id": "this_is_client_id-cb21db20-1cd7-4d01-8f41-777285c36caa",
    			"group_instance_id": null,
    			"client_id": "this_is_client_id",
    			"client_host": "/172.19.0.6",
    			"rebalance_timeout": 300000,
    			"session_timeout": 45000,
    			"subscription": "AAMAAAABAAp0ZXN0LXRvcGljAAAAAAAAAAD/////AAA=",
    			"assignment": "AAAAAAABAAp0ZXN0LXRvcGljAAAAAQAAAAEAAAAA"
    		}
    	]
    }

     

     

    또한 아래와 같은 형식으로 Consumer Group 와 Topic/Partition 의 커밋된 Offset 기록 또한 확인할 수 있습니다.

     

    Key = {
    	"group": "my-consumer-group",
    	"topic": "test-topic",
    	"partition": 1
    }
    
    Value = {
    	"offset": 7482233,
    	"leader_epoch": 0,
    	"metadata": "",
    	"commit_timestamp": 1610622416392
    }

     

     

     

    __consumer_offsets Topic 읽는 방법.

    Kafdrop 이나 Kafka UI 와 같은 툴을 활용하여 __consumer_offsets Topic 을 열람할 수 있습니다.

    뿐만 아니라 kafka-console-consumer.sh 명령어를 통해서도 __consumer_offsets 토픽의 열람이 가능합니다.

     

    kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
                           --topic __consumer_offsets \
                           --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
                           --from-beginning --property print.value=true

     

    아래와 같은 형식으로 레코드들이 출력되게 됩니다.

     

    [my-consumer-group,test-topic,1]::OffsetAndMetadata(offset=10042309, leaderEpoch=Optional[0], metadata=, commitTimestamp=1610622720392, expireTimestamp=None)
    [my-consumer-group,test-topic,1]::OffsetAndMetadata(offset=10042309, leaderEpoch=Optional[0], metadata=, commitTimestamp=1610622720401, expireTimestamp=None)

     

     

    Group Coordinator 와 __consumer_offsets Partition Leader.

    여러 Broker 들 중에서 Group Coordinator 가 선택되는 방식은 group.id 를 Hashing 한 결과를 따릅니다.

    Group Coordinator 을 결정하는 Hashing 의 구체적은 구현 내용을 모르지만,

    Consumer 는 FindCoordinator API 를 통해서 특정 Group Coordinator 와 통신을 수행하게 됩니다.

     

    그리고 Consumer Group 은 일반적으로 50개의 Partition 을 가지는 __consumer_offsets Topic 의 단 하나의 Partition 에 Group 의 상태를 기록합니다.

    그리고 이 Partition 의 Leader Broker 가 바로 Group Coordinator 가 됩니다.

     

    현재 저는 3개의 Broker 와 1개의 Consumer 를 실행 중입니다.

    아래의 기록은 Consumer 가 Broker 와 맺고 있는 Socket 의 정보입니다.

    주목할 점은 "kafka1.kafka-net:9091" 와 2개의 Socket 을 맺고있습니다.

    하나의 소켓은 데이터를 Fetch 하기 위한 소켓이며, 다른 하나는 Heartbeat 를 전송하기 위한 소켓입니다.

    python   11 root   11u  IPv4 2209667      0t0     TCP 3d3fc962b9b0:56506->kafka1.kafka-net:9091 (ESTABLISHED)
    python   11 root   12u  IPv4 2219207      0t0     TCP 3d3fc962b9b0:49888->kafka1.kafka-net:9091 (ESTABLISHED)
    python   11 root   15u  IPv4 2214419      0t0     TCP 3d3fc962b9b0:53182->kafka3.kafka-net:9091 (ESTABLISHED)
    python   11 root   16u  IPv4 2218236      0t0     TCP 3d3fc962b9b0:49900->kafka2.kafka-net:9091 (ESTABLISHED)

     

    Fetch API 는 각 Partition 의 Leader Broker 와 통신을 수행합니다.

    그리고 Heartbeat API 의 경우에는 Group Coordinator 와 통신을 맺게 됩니다.

    따라서 현재 Kafka1 브로커가 Consumer Group 의 Coordinator 가 됩니다.

     

    그리고 아래의 명령어를 통해서 현재 __consumer_offsets 토픽의 Leader Broker 를 파악할 수 있습니다.

    kafka-topics.sh --bootstrap-server kafka1:9092 --topic __consumer_offsets --partition 13 --describe
    Topic: __consumer_offsets	Partition: 13	Leader: 1	Replicas: 1,2,3	Isr: 2,3,1

     

     

     

    반응형
Designed by Tistory.