-
[Kafka] Timeindex 알아보기Kafka 2024. 6. 1. 09:26728x90반응형
- 목차
들어가며.
카프카는 Producer 로부터 전달받은 데이터들을 저장합니다.
이 데이터들은 Log Segment 라는 단위로 파일로써 저장되는데요.
Log Segment 는 log.segment.bytes, log.segment.ms 라는 기준에 따라서 Flush 가 진행됩니다.
예를 들어, log.segment.bytes 가 1KB 이고 2KB 만큼의 데이터가 Producer 로부터 생성된다면 총 2개의 Log Segment 파일이 생성되는 구조입니다.
또한 Producer 로부터 생성되는 Record 는 저마다 Timestamp 를 가지게 되며,
log.segment.ms 가 1분이라면 1분을 기준으로 Log Segment 파일이 생성됩니다.
Log Segment File 을 생성하기 위한 기준에 대해 대략적인 느낌은 받으셨나요 ?
그리고 이러한 Log Segment 들은 아래의 이미지와 같이 .log, .index, .timeindex 와 같은 파일들로 표현됩니다.
이번 글에서 알아볼 timeindex 는 .log 파일에 저장된 Log Segment 데이터의 Timestamp 의 Indexing 을 지원하는 파일이자 기능입니다.
timeindex 를 통해서 Kafka Consumer 는 Timestamp 를 활용한 Consume 이 가능해지죠.
이어지는 글에서 자세히 알아보도록 하겠습니다.
https://westlife0615.tistory.com/514
Sparse Index.
kafka 는 Dense Index 가 아닌 Sparse Index 를 지원합니다.
MySQL 과 같은 Database 는 Primary Key 를 활용한 Dense Index 를 지원합니다.
이는 모든 데이터 또는 Row 들이 저마다의 Index 를 가지며, Primary Key 를 통해서 빠른 조회가 가능합니다.
즉, id 를 명시하여 데이터를 조회하는 경우에는 id 에 따른 Physical Location 을 알 수 있기 때문에 빠른 조회가 가능해집니다.
반면, Kafka 의 경우에는 Record 의 Timestamp 에 따른 Dense Index 를 지원하지 않습니다.
따라서 특정 Timestamp 에 해당하는 Record 를 조회할 때에 해당하는 Physical Location 정보를 확보할 순 없죠.
Timeindex 는 시간의 범위를 지정하여 하나의 Index 로 시간 범위의 모든 데이터를 인덱싱합니다.
예를 들어, 1분 기준으로 Indexing 을 지원한다고 가정하겠습니다.
그럼 01:00 ~ 01:59 까지의 데이터들이 하나의 단위로 묶입니다.
02:00 ~ 02:59 범위의 데이터들도 마찬가지죠.
이렇게 Sparse Indexing 을 적용함으로써 03:44 의 Record 를 조회할 때에 03:00 ~ 03:59 에 해당하는 Index 를 Lookup 하고,
이를 토대로 빠른 조회가 가능해집니다.
아래의 예시는 .timeindex 파일에 존재하는 데이터를 읽어들인 결과입니다.
timestamp: 1717393739592 offset: 17 timestamp: 1717393739615 offset: 26 timestamp: 1717393739640 offset: 35 timestamp: 1717393739663 offset: 44 timestamp: 1717393755966 offset: 53 timestamp: 1717393759124 offset: 62 timestamp: 1717393762255 offset: 71 timestamp: 1717393765436 offset: 80 timestamp: 1717393768599 offset: 89
아래와 같은 명령어를 통해서 특정 timeindex 파일을 읽기 좋은 형태로 변형할 수 있습니다.
kafka-run-class kafka.tools.DumpLogSegments --files /var/lib/kafka/data/test_topic-0/00000000000000000000.timeindex
1717393755966 ~ 1717393759123 까지 Unixtime 범위의 데이터들은 53 Offset 을 Indexing 하게 되며, 이러한 방식으로 빠른 조회가 가능하게 됩니다.
log.index.interval.bytes
하나의 timeindex 파일은 하나의 Log Segment 의 Indexing 을 담당합니다.
동일한 File Name 을 가지는 .log, .index, .timeindex 파일은 동일한 Log Segment 를 관리하게 되죠.
예를 들어, 0000.log, 0000.index, 0000.timeindex 인 세개의 파일은 0 번 Offset 부터 101118 번 까지의 Offset 에 해당하는 데이터들을 관리합니다.
그리고 이러한 데이터의 묶음을 Log Segment 라고 부르죠.
하나의 Log Segment 를 Indexing 하기 위해서 적절한 단위로 또는 범위로 Grouping 하는 것이 중요한데요.
이러한 설정값이 log.index.interval.bytes 입니다.
log.index.interval.bytes 의 기본값은 4KB 입니다.
하나의 Log Segment 의 크기를 결정하는 log.segment.bytes 의 기본값이 1GB 인 것을 감안하면,
하나의 timeindex 내부에는 대략 최대 250,000 개의 Timeindex Entry 들이 생성될 수 있습니다.
Timestamp 를 기준으로 데이터 조회하기.
Timeindex 를 통해서 데이터를 조회하는 예시를 살펴보기 위해서 간단한 실습을 진행해보도록 하겠습니다.
아래의 링크를 통해서 간단하게 Kafka Cluster 를 Docker Container 를 통해 실행할 수 있습니다.
https://westlife0615.tistory.com/474
아래의 코드 예시는 test_topic 이라는 이름의 Topic 에 메시지를 생성하는 내용입니다.
아래의 Producer 코드 예시는 간단히 요약해드리면,
test_topic 이라는 Topic 으로 데이터를 Producing 합니다.
1초 간격으로 데이터를 생성하며, 1초 간격을 주어 각 Record 의 Timestamp 마다 1초의 Gap 을 줍니다.
그리고 Record 의 메시지는 메시지의 생성 시각 정보를 담고 있습니다.
from kafka import KafkaProducer import time from datetime import datetime bootstrap_servers = ['localhost:29091', 'localhost:29092', 'localhost:29093'] value_serializer = lambda v: v.encode("utf-8") producer = KafkaProducer( bootstrap_servers=bootstrap_servers, key_serializer=None, value_serializer=value_serializer, ) topic = "test_topic" for i in range(0, 1000): msg = datetime.now().strftime("%H: %M: %S") producer.send(topic, value=msg) time.sleep(1) producer.flush() producer.close() print("## finished")
아래의 이미지는 Producer 에 의해서 메시지가 생성된 결과입니다.
아래의 내용은 메시지가 생성된 이후에 timeindex 파일의 내용입니다.
아래와 같이 timestamp 별로 offset 을 확인할 수 있도록 Indexing 이 적용되어 있습니다.
그럼 아래의 timestamp 내용을 기반으로 데이터 조회를 수행해보겠습니다.
Dumping /var/lib/kafka/data/test_topic-0/00000000000000000000.timeindex timestamp: 1727468028404 offset: 53 timestamp: 1727468081708 offset: 106 timestamp: 1727468134983 offset: 159 timestamp: 1727468188234 offset: 212 timestamp: 1727468241479 offset: 265 timestamp: 1727468294733 offset: 318 timestamp: 1727468348005 offset: 371 timestamp: 1727468401283 offset: 424 timestamp: 1727468454569 offset: 477 timestamp: 1727468507827 offset: 530 timestamp: 1727468561090 offset: 583 timestamp: 1727468614349 offset: 636 timestamp: 1727468667608 offset: 689 timestamp: 1727468720880 offset: 742 timestamp: 1727468774122 offset: 795 timestamp: 1727468827416 offset: 848
Index Interval 사이의 Timestamp 로 조회하기.
Index Interval 사이에 해당하는 Timestamp 값으로 데이터를 조회해보겠습니다.
timestamp: 1727468028404 offset: 53 timestamp: 1727468081708 offset: 106
1727468028404 Timestamp 는 53 번 Offset 을 가리키며,
1727468081708 Timestamp 는 106 번 Offset 을 가리킵니다.
그럼 이 사이값 중 하나인 1727468029404 으로 데이터 조회를 시도해보겠습니다.
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer( bootstrap_servers=['localhost:29091', 'localhost:29092', 'localhost:29093'], auto_offset_reset='earliest', enable_auto_commit=False, group_id='my-group', consumer_timeout_ms=1000 ) topic = 'test_topic' partition = 0 topic_partition = TopicPartition(topic, partition) consumer.assign([topic_partition]) offsets_for_times = consumer.offsets_for_times({topic_partition: 1727468029404}) offset = offsets_for_times[topic_partition].offset consumer.seek(topic_partition, offset) for message in consumer: print(f"Offset: {message.offset}, Key: {message.key}, Value: {message.value}, Timestamp: {message.timestamp}") consumer.close()
저는 의도적으로 생성되는 Record 의 시간 간격을 1초로 유지해두었습니다.
Offset 이 53 번인 1727468028404 Timestamp 보다 1초 이후인 1727468029404 를 기준으로 조회를 하였기 때문에 Offset 54 번의 레코드가 조회됩니다.
Offset: 54, Key: None, Value: b'05: 13: 49', Timestamp: 1727468029409
Indexing 된 Timestamp 로 조회하기.
Indexing 된 Timestamp 를 기준으로 조회를 하게 된다면, 아래와 같이 Timestamp 에 해당하는 Offset 을 조회할 수 있습니다.
offsets_for_times = consumer.offsets_for_times({topic_partition: 1727468028404}) offset = offsets_for_times[topic_partition].offset consumer.seek(topic_partition, offset)
Offset: 53, Key: None, Value: b'05: 13: 48', Timestamp: 1727468028404
https://westlife0615.tistory.com/536
반응형'Kafka' 카테고리의 다른 글
[Kafka] Partition 알아보기 (0) 2024.05.12 [Kafka] max.block.ms 알아보기 (0) 2024.02.18 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 [Kafka-Streams] Json 기반 Custom Serdes 구현하기 (0) 2024.02.17 [Kafka] Transaction Coordinator 알아보기 (0) 2024.02.07