-
[Kafka] Topic 의 retention.bytes & retention.ms 알아보기Kafka 2024. 6. 24. 05:28반응형
- 목차
들어가며.
카프카는 레코드가 저장되는 토픽과 파티션에 Retention 을 설정할 수 있습니다.
Retention 의 적용 기준은 용량과 시간입니다.
즉, 일정 바이트 이상으로 데이터가 쌓이거나 설정된 시간을 초과한 오래된 레코드는 삭제됩니다.
일반적으로 아래와 같은 방식으로 Topic 의 생성과 Retention 을 설정할 수 있습니다.
kafka-topics.sh --create --bootstrap-server localhost:9092 \ --topic my-topic \ --partitions 3 \ --replication-factor 2 \ --config retention.ms=86400000 \ --config retention.bytes=1073741824
간단히 Retention 이 실제 어떤 식으로 적용되는지 간단하게 알아보겠습니다.
우선 아래의 도커 명령어를 실행하게 되면, 1개의 Zookeeper 와 Broker 를 실행할 수 있습니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092 \ -e KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=10000 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
저는 "KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS" 이라는 환경변수를 Kafka Broker Container 실행 명령어에서 사용하였는데요.
이는 server.properties 파일에 추가되어야 할 log.retention.check.interval.ms 에 해당하는 정보를 환경변수로써 추가하였습니다.
( 이러한 동작은 confluentinc/cp-kafka 도커 이미지에서 활용이 가능합니다. )
log.retention.check.interval.ms 를 10초로 설정하여 10초 마다 Retention 를 수행합니다.
다만 10초마다 Retention 이 수행되지는 않고, 제거할 수 있는 Log Segment 파일이 존재하는 경우에만 Retention 이 적용됩니다.
그리고 Retention 이 1분인 Topic 을 생성합니다.
kafka-topics --create --bootstrap-server kafka1:9092 \ --topic test \ --partitions 1 \ --replication-factor 1 \ --config retention.ms=60000
그리고 kafka-console-producer 명령어를 통해서 데이터를 추가할 수 있습니다.
kafka-console-producer --bootstrap-server kafka1:9092 --topic test
지금까지의 상황을 정리해보면 Retention 이 1분인 Kafka Topic 을 생성하였습니다.
그리고 10초마다 Kafka 는 Retention 을 수행하게 됩니다.
그래서 대략 1분의 시간이 지나면, 브로커는 아래의 로그와 함께 데이터를 삭제하게 됩니다.
INFO Deleted log for partition test-0 in /var/lib/kafka/data/test-0.1d4c4983e5834af3a64ffccc517edcfd-delete. (kafka.log.LogManager) INFO [LocalLog partition=test-0, dir=/var/lib/kafka/data] Rolled new log segment at offset 9 in 7 ms. (kafka.log.LocalLog) INFO [ProducerStateManager partition=test-0]Wrote producer snapshot at offset 9 with 1 producer ids in 4 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager) INFO [UnifiedLog partition=test-0, dir=/var/lib/kafka/data] Incremented log start offset to 9 due to segment deletion (kafka.log.UnifiedLog)
이때의 Kafka Log Segment 파일들의 상태를 확인해보면 아래와 같이 기존의 Log Segment 파일들은 .deleted 라는 Suffix 가 붙은 파일명으로 변경됩니다.
이는 곧 삭제될 파일임을 의미하며, 카프카의 Retention 은 File 단위로 데이터가 삭제됨을 의미하기도 합니다.
그래서 데이터의 생성이 진행 중일 때에는 Log Segment File 이 Active 한 상태이기에 Retention 이 진행되지 않습니다.
00000000000000000000.index.deleted 00000000000000000000.timeindex.deleted 00000000000000000000.log.deleted 00000000000000000009.log 00000000000000000009.snapshot 00000000000000000009.log 00000000000000000009.timeindex partition.metadata leader-epoch-checkpoint
retention.bytes 이 설정된 Topic 의 데이터 삭제
빠른 Retention 의 확인을 위해서 retention.bytes 로 1KB 를 설정하였습니다.
kafka-topics --create --bootstrap-server kafka1:9092 \ --topic test2 \ --partitions 1 \ --replication-factor 1 \ --config retention.bytes=1024 \ --config segment.bytes=1024
위의 과정과 동일하게 데이터의 생성 후 Log Segment File 의 상태를 확인합니다.
주목할 점은 segment.bytes 크기와 retention.bytes 의 크기를 동일하게 설정해야지, retention.bytes 의 동작을 쉽게 확인할 수 있습니다.
그 이유는 Retention 에 의한 데이터 삭제는 파일 단위로 이루어지며, Active 상태의 Log Segment File 을 삭제가 되지 않습니다.
그래서 segment.bytes 사이즈를 작게 설정하여 Inactive 상태의 Log Segment File 이 만들어져야 합니다.
그리고 시간이 경과한 후에 아래와 같이 Retention Delete 가 가능한 Log Segment File 은 삭제되게 됩니다.
00000000000000000000.index.deleted 00000000000000000000.log.deleted 00000000000000000000.timeindex.deleted 00000000000000000017.timeindex.deleted 00000000000000000017.index.deleted 00000000000000000017.log.deleted 00000000000000000017.snapshot.deleted 00000000000000000036.index 00000000000000000036.snapshot 00000000000000000036.log 00000000000000000036.timeindex 00000000000000000054.snapshot 00000000000000000054.log 00000000000000000054.timeindex 00000000000000000054.index partition.metadata leader-epoch-checkpoint
반응형'Kafka' 카테고리의 다른 글
[Kafka] request.timeout.ms 와 데이터 중복 생성 알아보기 (0) 2024.06.25 [Kafka] Pull 방식의 복제와 Producer 속도 관계 알아보기 ( replica.fetch.wait.max.ms ) (0) 2024.06.25 [Kafka] num.replica.fetchers 와 ReplicaFetcherThread 알아보기 (0) 2024.06.24 [Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기 (0) 2024.06.22 [Kafka] Adaptive Partitioning 코드로 살펴보는 내부 동작 원리 ( BuiltInPartitioner ) (0) 2024.06.09