-
[Kafka Consumer] Exactly-Once 구현하기Kafka 2024. 1. 13. 05:32반응형
- 목차
들어가며.
Kafka Consumer 는 하나의 Data Processor 로써 동작합니다.
그리고 처리한 데이터를 토대로 외부 Application 들과 상호작용하게 되는데요.
예를 들어, 조회한 Topic 의 레코드를 처리하여 데이터베이스에 삽입을 한다던지.
처리한 데이터를 또 다른 Topic 으로 Produce 하거나, Email 이나 SMS 전송을 시도할 수 있습니다.
이러한 상황에서 Kafka Consumer 가 FailOver 또는 Repartitioning 에 의해서 재시작하게 되면,
일부 데이터의 손실이나 중복 처리의 위험이 발생합니다.
데이터의 손실이 발생하는 경우.
데이터의 손실이 발생하는 경우는 Offset Commit 을 너무 빨리 시도하는 경우에 발생합니다.
먼저 아래의 케이스를 살펴보시죠.
1. Topic 에 3개의 Record 가 존재합니다.
Kafka -> Processor -> Database 로 이어지는 파이프라인입니다.
Consumer 는 Record1 부터 Record3 까지 처리하여 Database 로 처리된 데이터를 삽입해야합니다.
2. Record 1 을 처리하고 Database 에 삽입합니다.
그리고 Record 1 을 처리하였기 때문에 Commit Offset 을 수행합니다.
3. Record 2 를 처리합니다.
이 과정에서 데이터의 처리와 데이터의 삽입이 완료되기 전에 Offset Commit 이 발생합니다.
4. 하지만 Consumer 는 Processing 과 Sink 를 실패합니다.
5. 다음 Offset 인 Record 3 을 조회합니다.
이 과정에서 Data Loss 가 발생할 수 있습니다.
즉, Processing 과 Sink 과정이 완료되기 이전에 Offset 을 Commit 하게 되면 위와 같은 데이터 손실이 발생할 가능성이 생깁니다.
데이터의 중복 처리가 발생하는 경우.
Kafka Consumer 는 브로커로부터 1개의 Record 만을 조회하지 않습니다.
max.fetch.bytes, max.poll.records, 등의 옵션들을 통해서 Record 의 용량과 갯수가 허용하는 선에서 Batch Read 를 수행합니다.
그래서 1000개의 레코드를 조회한 후 40개 정도의 데이터를 처리한 후에 Restart 하게 된다면,
그 다음 시도에서 40개의 레코드가 중복으로 처리되게 됩니다.
아래의 상황을 보면서 자세히 설명드리겠습니다.
1. Topic 에는 총 300개의 Record 가 존재하며, Consumer 는 100개씩 Batch Read 수행합니다.
2. 첫 100개의 Record 들은 처리 후 데이터베이스로 정상적으로 삽입됩니다.
3. 두번째 Processing 과정에서 40개의 데이터는 성공적으로 처리합니다.
하지만 나머지 60개의 데이터를 처리하는 과정에서 Consumer 는 종료됩니다.
이 상황에서 40개의 데이터가 Database 로 삽입됩니다.
4. Consumer 의 Restart 이후 정상 가동됩니다.
그리고 40개의 데이터 중복이 발생합니다.
Data Loss 재현하기.
먼저 아래의 코드를 실행시켜 1 부터 100 까지의 Record 를 가지는 nums 토픽을 생성하였습니다.
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", value_serializer=lambda x: x.encode("utf-8"), key_serializer=None, ) for num in range(1, 101): producer.send(topic="nums", key=None, value=f"{num}") producer.flush()
먼저 nums 토픽을 조회해보겠습니다.
from kafka import KafkaConsumer consumer = KafkaConsumer( "nums", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="num-consumer-group", auto_offset_reset="earliest", enable_auto_commit=False ) while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value)
b'1' b'2' b'3' b'4' ... b'97' b'98' b'99' b'100'
Data Loss 의 케이스는 다음과 같습니다.
Kafka Consumer 는 1 부터 100 까지의 100 개의 데이터를 10개 단위로 조회합니다.
그리고 Kafka Consumer 는 레코드 조회 이후에 바로 Commit Offset 을 수행합니다.
즉, Database 로의 Sink 를 고려하지 않습니다.
Database 로의 Sink 가 마무리되지 않은 상황에서 프로그램이 재시작된다면 데이터의 손실이 발생하게 됩니다.
아래의 예시 코드가 Data Loss 의 가능성을 내포하는 코드입니다.
from kafka import KafkaConsumer, TopicPartition import mysql.connector import time consumer = KafkaConsumer( "nums", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="num-consumer-group3", auto_offset_reset="earliest", fetch_min_bytes=1024, max_poll_records=10, enable_auto_commit=False ) connection = mysql.connector.connect( host="localhost", port=3306, user="root", password="1234", database="mysql", ) connection.autocommit = True cursor = connection.cursor() while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): consumer.commit() committed_offset = consumer.committed(TopicPartition("nums", 0)) print(F"committed_offset : {committed_offset}") time.sleep(5) for record in records: cursor.execute(f"insert into test_table(id) values({record.value.decode('utf-8')});") time.sleep(1)
참고로 test_table 의 스키마는 아래와 같습니다.
CREATE TABLE `test_table` ( `id` int DEFAULT NULL ) ENGINE=InnoDB
위 Python 프로그램은 실행 과정에서 몇 차례 강제로 재시작하였는데요.
결과는 아래처럼 9개의 Loss 가 발생합니다.
select count(*) from test_table; +----------+ | count(*) | +----------+ | 91 | +----------+ 1 row in set (0.00 sec)
데이터 손실을 방지하는 코드 예시.
Commit Offset 의 위치를 MySQL Insert 쿼리 실행문보다 아래에 둠으로써 일차적인 방어를 수행할 수 있습니다.
먼저 MySQL Insert 를 Batch Insert 쿼리문으로 변경하고 auto-commit 이 아닌 Transaction 처리를 합니다.
MySQL Insert 가 성공한다면, Kafka Consumer 의 Commit Offset 을 수행합니다.
이로써 Consumer 프로그램이 재시작하더라도 데이터의 손실을 최소화할 수 있습니다.
즉, 두가지 데이터 소스인 Kafka 와 MySQL 의 트랜잭션 처리의 동기화가 매우 중요합니다.
from kafka import KafkaConsumer, TopicPartition import mysql.connector import time consumer = KafkaConsumer( "nums", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="num-consumer-group5", auto_offset_reset="earliest", fetch_min_bytes=1024, max_poll_records=10, enable_auto_commit=False ) connection = mysql.connector.connect( host="localhost", port=3306, user="root", password="1234", database="mysql", ) connection.autocommit = False cursor = connection.cursor() while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): committed_offset = consumer.committed(TopicPartition("nums", 0)) print(F"committed_offset : {committed_offset}") time.sleep(5) success_or_not = cursor.execute( f"insert into test_table(id) values {','.join(['(' + record.value.decode('utf-8') + ')' for record in records])};") connection.commit() consumer.commit() time.sleep(1)
마치며.
내용이 길어지는 관계로 Consumer 의 Rebalancing 케이스와 데이터 중복에 대한 처리 내용은 다른 글에서 이어 설명하도록 하겠습니다.
Kafka Consumer 가 외부 데이터 소스와 연결된 상태에서 Exactly-Once 처리는
위와 같이 두 데이터 소스의 트랜잭션의 동기화가 매우 중요합니다.
반응형'Kafka' 카테고리의 다른 글
Kafka Controller 알아보기 (0) 2024.01.17 [Kafka] Replication 의 시간은 얼마나 걸릴까 ? ( kafka-reassign-partitions ) (0) 2024.01.14 Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.ms ) (0) 2024.01.13 Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.parti (1) 2024.01.12 Kafka Log Compaction 알아보기 (0) 2024.01.12