ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Timeindex 알아보기
    Kafka 2024. 6. 1. 09:26
    728x90
    반응형

     

    - 목차

     

    들어가며.

    카프카는 Producer 로부터 전달받은 데이터들을 저장합니다.

    이 데이터들은 Log Segment 라는 단위로 파일로써 저장되는데요.

    Log Segment 는 log.segment.bytes, log.segment.ms 라는 기준에 따라서 Flush 가 진행됩니다.

    예를 들어, log.segment.bytes 가 1KB 이고 2KB 만큼의 데이터가 Producer 로부터 생성된다면 총 2개의 Log Segment 파일이 생성되는 구조입니다.

    또한 Producer 로부터 생성되는 Record 는 저마다 Timestamp 를 가지게 되며,

    log.segment.ms 가 1분이라면 1분을 기준으로 Log Segment 파일이 생성됩니다.

    Log Segment File 을 생성하기 위한 기준에 대해 대략적인 느낌은 받으셨나요 ?

     

    그리고 이러한 Log Segment 들은 아래의 이미지와 같이 .log, .index, .timeindex 와 같은 파일들로 표현됩니다.

     

    이번 글에서 알아볼 timeindex 는 .log 파일에 저장된 Log Segment 데이터의 Timestamp 의 Indexing 을 지원하는 파일이자 기능입니다.

    timeindex 를 통해서 Kafka Consumer 는 Timestamp 를 활용한 Consume 이 가능해지죠.

    이어지는 글에서 자세히 알아보도록 하겠습니다.

     

    https://westlife0615.tistory.com/514

     

    Kafka Log Segment 알아보기

    - 목차 함께 보면 좋은 글. https://westlife0615.tistory.com/474 Docker 로 Kafka Cluster 구축해보기. - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-comp

    westlife0615.tistory.com

     

     

    Sparse Index.

    kafka 는 Dense Index 가 아닌 Sparse Index 를 지원합니다.

    MySQL 과 같은 Database 는 Primary Key 를 활용한 Dense Index 를 지원합니다.

    이는 모든 데이터 또는 Row 들이 저마다의 Index 를 가지며, Primary Key 를 통해서 빠른 조회가 가능합니다.

    즉, id 를 명시하여 데이터를 조회하는 경우에는 id 에 따른 Physical Location 을 알 수 있기 때문에 빠른 조회가 가능해집니다.

     

    반면, Kafka 의 경우에는 Record 의 Timestamp 에 따른 Dense Index 를 지원하지 않습니다.

    따라서 특정 Timestamp 에 해당하는 Record 를 조회할 때에 해당하는 Physical Location 정보를 확보할 순 없죠.

    Timeindex 는 시간의 범위를 지정하여 하나의 Index 로 시간 범위의 모든 데이터를 인덱싱합니다.

    예를 들어, 1분 기준으로 Indexing 을 지원한다고 가정하겠습니다.

    그럼 01:00 ~ 01:59 까지의 데이터들이 하나의 단위로 묶입니다.

    02:00 ~ 02:59 범위의 데이터들도 마찬가지죠.

    이렇게 Sparse Indexing 을 적용함으로써 03:44 의 Record 를 조회할 때에 03:00 ~ 03:59 에 해당하는 Index 를 Lookup 하고,

    이를 토대로 빠른 조회가 가능해집니다.

     

    아래의 예시는 .timeindex 파일에 존재하는 데이터를 읽어들인 결과입니다.

    timestamp: 1717393739592 offset: 17
    timestamp: 1717393739615 offset: 26
    timestamp: 1717393739640 offset: 35
    timestamp: 1717393739663 offset: 44
    timestamp: 1717393755966 offset: 53
    timestamp: 1717393759124 offset: 62
    timestamp: 1717393762255 offset: 71
    timestamp: 1717393765436 offset: 80
    timestamp: 1717393768599 offset: 89

     

    아래와 같은 명령어를 통해서 특정 timeindex 파일을 읽기 좋은 형태로 변형할 수 있습니다.

    kafka-run-class kafka.tools.DumpLogSegments --files /var/lib/kafka/data/test_topic-0/00000000000000000000.timeindex

     

     

    1717393755966 ~ 1717393759123 까지 Unixtime 범위의 데이터들은 53 Offset 을 Indexing 하게 되며, 이러한 방식으로 빠른 조회가 가능하게 됩니다.

     

    log.index.interval.bytes

    하나의 timeindex 파일은 하나의 Log Segment 의 Indexing 을 담당합니다.

    동일한 File Name 을 가지는 .log, .index, .timeindex 파일은 동일한 Log Segment 를 관리하게 되죠.

    예를 들어, 0000.log, 0000.index, 0000.timeindex 인 세개의 파일은 0 번 Offset 부터 101118 번 까지의 Offset 에 해당하는 데이터들을 관리합니다.

    그리고 이러한 데이터의 묶음을 Log Segment 라고 부르죠.

     

    하나의 Log Segment 를 Indexing 하기 위해서 적절한 단위로 또는 범위로 Grouping 하는 것이 중요한데요.

    이러한 설정값이 log.index.interval.bytes 입니다.

     

    log.index.interval.bytes 의 기본값은 4KB 입니다.

    하나의 Log Segment 의 크기를 결정하는 log.segment.bytes 의 기본값이 1GB 인 것을 감안하면,

    하나의 timeindex 내부에는 대략 최대 250,000 개의 Timeindex Entry 들이 생성될 수 있습니다.

     

    Timestamp 를 기준으로 데이터 조회하기.

    Timeindex 를 통해서 데이터를 조회하는 예시를 살펴보기 위해서 간단한 실습을 진행해보도록 하겠습니다.

    아래의 링크를 통해서 간단하게 Kafka Cluster 를 Docker Container 를 통해 실행할 수 있습니다.

     

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    아래의 코드 예시는 test_topic 이라는 이름의 Topic 에 메시지를 생성하는 내용입니다.

    아래의 Producer 코드 예시는 간단히 요약해드리면,

    test_topic 이라는 Topic 으로 데이터를 Producing 합니다.

    1초 간격으로 데이터를 생성하며, 1초 간격을 주어 각 Record 의 Timestamp 마다 1초의 Gap 을 줍니다.

    그리고 Record 의 메시지는 메시지의 생성 시각 정보를 담고 있습니다.

    from kafka import KafkaProducer
    import time
    from datetime import datetime
    
    bootstrap_servers = ['localhost:29091', 'localhost:29092', 'localhost:29093']
    value_serializer = lambda v: v.encode("utf-8")
    
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        key_serializer=None,
        value_serializer=value_serializer,
    )
    
    topic = "test_topic"
    for i in range(0, 1000):
        msg = datetime.now().strftime("%H: %M: %S")
        producer.send(topic, value=msg)
        time.sleep(1)
    
    producer.flush()
    producer.close()
    
    print("## finished")

     

    아래의 이미지는 Producer 에 의해서 메시지가 생성된 결과입니다.

     

     

     

    아래의 내용은 메시지가 생성된 이후에 timeindex 파일의 내용입니다.

    아래와 같이 timestamp 별로 offset 을 확인할 수 있도록 Indexing 이 적용되어 있습니다.

    그럼 아래의 timestamp 내용을 기반으로 데이터 조회를 수행해보겠습니다.

    Dumping /var/lib/kafka/data/test_topic-0/00000000000000000000.timeindex
    timestamp: 1727468028404 offset: 53
    timestamp: 1727468081708 offset: 106
    timestamp: 1727468134983 offset: 159
    timestamp: 1727468188234 offset: 212
    timestamp: 1727468241479 offset: 265
    timestamp: 1727468294733 offset: 318
    timestamp: 1727468348005 offset: 371
    timestamp: 1727468401283 offset: 424
    timestamp: 1727468454569 offset: 477
    timestamp: 1727468507827 offset: 530
    timestamp: 1727468561090 offset: 583
    timestamp: 1727468614349 offset: 636
    timestamp: 1727468667608 offset: 689
    timestamp: 1727468720880 offset: 742
    timestamp: 1727468774122 offset: 795
    timestamp: 1727468827416 offset: 848

     

     

    Index Interval 사이의 Timestamp 로 조회하기.

    Index Interval 사이에 해당하는 Timestamp 값으로 데이터를 조회해보겠습니다.

    timestamp: 1727468028404 offset: 53
    timestamp: 1727468081708 offset: 106

     

    1727468028404 Timestamp 는 53 번 Offset 을 가리키며,

    1727468081708 Timestamp 는 106 번 Offset 을 가리킵니다.

    그럼 이 사이값 중 하나인 1727468029404 으로 데이터 조회를 시도해보겠습니다.

     

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    consumer = KafkaConsumer(
        bootstrap_servers=['localhost:29091', 'localhost:29092', 'localhost:29093'],
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        group_id='my-group',
        consumer_timeout_ms=1000
    )
    
    topic = 'test_topic'
    partition = 0
    topic_partition = TopicPartition(topic, partition)
    consumer.assign([topic_partition])
    offsets_for_times = consumer.offsets_for_times({topic_partition: 1727468029404})
    
    offset = offsets_for_times[topic_partition].offset
    consumer.seek(topic_partition, offset)
    
    for message in consumer:
        print(f"Offset: {message.offset}, Key: {message.key}, Value: {message.value}, Timestamp: {message.timestamp}")
    
    consumer.close()

     

    저는 의도적으로 생성되는 Record 의 시간 간격을 1초로 유지해두었습니다.

    Offset 이 53 번인 1727468028404 Timestamp 보다 1초 이후인 1727468029404 를 기준으로 조회를 하였기 때문에 Offset 54 번의 레코드가 조회됩니다.

    Offset: 54, Key: None, Value: b'05: 13: 49', Timestamp: 1727468029409

     

    Indexing 된 Timestamp 로 조회하기.

    Indexing 된 Timestamp 를 기준으로 조회를 하게 된다면, 아래와 같이 Timestamp 에 해당하는 Offset 을 조회할 수 있습니다.

     

    offsets_for_times = consumer.offsets_for_times({topic_partition: 1727468028404})
    offset = offsets_for_times[topic_partition].offset
    consumer.seek(topic_partition, offset)
    Offset: 53, Key: None, Value: b'05: 13: 48', Timestamp: 1727468028404

     

     

     

    https://westlife0615.tistory.com/536

     

    [Kafka] Log Index File 알아보기

    - 목차  들어가며.Kafka 는 Log Management 를 위해서 3가지 파일을 사용합니다.첫번째 파일은 Log 파일로써 실질적인 Log Segment 를 저장합니다.Kafka Producer 가 생성하는 레코드들이 파티션 별로 저장되

    westlife0615.tistory.com

     

    반응형
Designed by Tistory.