ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Consumer] Exactly-Once 구현하기
    Kafka 2024. 1. 13. 05:32
    반응형

    - 목차

     

    들어가며.

    Kafka Consumer 는 하나의 Data Processor 로써 동작합니다.

    그리고 처리한 데이터를 토대로 외부 Application 들과 상호작용하게 되는데요.

    예를 들어, 조회한 Topic 의 레코드를 처리하여 데이터베이스에 삽입을 한다던지.

    처리한 데이터를 또 다른 Topic 으로 Produce 하거나, Email 이나 SMS 전송을 시도할 수 있습니다.

    이러한 상황에서 Kafka Consumer 가 FailOver 또는 Repartitioning 에 의해서 재시작하게 되면,

    일부 데이터의 손실이나 중복 처리의 위험이 발생합니다.

     

    데이터의 손실이 발생하는 경우.

    데이터의 손실이 발생하는 경우는 Offset Commit 을 너무 빨리 시도하는 경우에 발생합니다.

    먼저 아래의 케이스를 살펴보시죠.

     

    1. Topic 에 3개의 Record 가 존재합니다.

    Kafka -> Processor -> Database 로 이어지는 파이프라인입니다.

    Consumer 는 Record1 부터 Record3 까지 처리하여 Database 로 처리된 데이터를 삽입해야합니다.

    2. Record 1 을 처리하고 Database 에 삽입합니다.

    그리고 Record 1 을 처리하였기 때문에 Commit Offset 을 수행합니다.

     

    3. Record 2 를 처리합니다.

    이 과정에서 데이터의 처리와 데이터의 삽입이 완료되기 전에 Offset Commit 이 발생합니다.

     

    4. 하지만 Consumer 는 Processing 과 Sink 를 실패합니다.

     

    5. 다음 Offset 인 Record 3 을 조회합니다.

    이 과정에서 Data Loss 가 발생할 수 있습니다.

     

    즉, Processing 과 Sink 과정이 완료되기 이전에 Offset 을 Commit 하게 되면 위와 같은 데이터 손실이 발생할 가능성이 생깁니다.

     

     

    데이터의 중복 처리가 발생하는 경우.

    Kafka Consumer 는 브로커로부터 1개의 Record 만을 조회하지 않습니다.

    max.fetch.bytes, max.poll.records, 등의 옵션들을 통해서 Record 의 용량과 갯수가 허용하는 선에서 Batch Read 를 수행합니다.

    그래서 1000개의 레코드를 조회한 후 40개 정도의 데이터를 처리한 후에 Restart 하게 된다면,

    그 다음 시도에서 40개의 레코드가 중복으로 처리되게 됩니다.

     

    아래의 상황을 보면서 자세히 설명드리겠습니다.

     

    1. Topic 에는 총 300개의 Record 가 존재하며, Consumer 는 100개씩 Batch Read 수행합니다.

     

     

     

    2. 첫 100개의 Record 들은 처리 후 데이터베이스로 정상적으로 삽입됩니다.

     

     

    3. 두번째 Processing 과정에서 40개의 데이터는 성공적으로 처리합니다.

    하지만 나머지 60개의 데이터를 처리하는 과정에서 Consumer 는 종료됩니다.

    이 상황에서 40개의 데이터가 Database 로 삽입됩니다.

     

     

     

    4. Consumer 의 Restart 이후 정상 가동됩니다.

    그리고 40개의 데이터 중복이 발생합니다.

     

     

     

    Data Loss 재현하기.

    먼저 아래의 코드를 실행시켜 1 부터 100 까지의 Record 를 가지는 nums 토픽을 생성하였습니다.

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        value_serializer=lambda x: x.encode("utf-8"),
        key_serializer=None,
    )
    for num in range(1, 101):
        producer.send(topic="nums", key=None, value=f"{num}")
    producer.flush()

     

    먼저 nums 토픽을 조회해보겠습니다.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        "nums",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="num-consumer-group",
        auto_offset_reset="earliest",
        enable_auto_commit=False
    )
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)

     

    b'1'
    b'2'
    b'3'
    b'4'
    ...
    b'97'
    b'98'
    b'99'
    b'100'

     

     

    Data Loss 의 케이스는 다음과 같습니다.

    Kafka Consumer 는 1 부터 100 까지의 100 개의 데이터를 10개 단위로 조회합니다.

    그리고 Kafka Consumer 는 레코드 조회 이후에 바로 Commit Offset 을 수행합니다.

    즉, Database 로의 Sink 를 고려하지 않습니다.

    Database 로의 Sink 가 마무리되지 않은 상황에서 프로그램이 재시작된다면 데이터의 손실이 발생하게 됩니다.

     

    아래의 예시 코드가 Data Loss 의 가능성을 내포하는 코드입니다.

    from kafka import KafkaConsumer, TopicPartition
    import mysql.connector
    import time
    
    consumer = KafkaConsumer(
        "nums",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="num-consumer-group3",
        auto_offset_reset="earliest",
        fetch_min_bytes=1024,
        max_poll_records=10,
        enable_auto_commit=False
    )
    
    connection = mysql.connector.connect(
        host="localhost",
        port=3306,
        user="root",
        password="1234",
        database="mysql",
    
    )
    connection.autocommit = True
    cursor = connection.cursor()
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            consumer.commit()
            committed_offset = consumer.committed(TopicPartition("nums", 0))
            print(F"committed_offset : {committed_offset}")
            time.sleep(5)
            for record in records:
                cursor.execute(f"insert into test_table(id) values({record.value.decode('utf-8')});")
                time.sleep(1)

     

    참고로 test_table 의 스키마는 아래와 같습니다.

    CREATE TABLE `test_table` (
      `id` int DEFAULT NULL
    ) ENGINE=InnoDB

     

    위 Python 프로그램은 실행 과정에서 몇 차례 강제로 재시작하였는데요.

    결과는 아래처럼 9개의 Loss 가 발생합니다.

    select count(*) from test_table;
    +----------+
    | count(*) |
    +----------+
    |       91 |
    +----------+
    1 row in set (0.00 sec)

     

    데이터 손실을 방지하는 코드 예시.

    Commit Offset 의 위치를 MySQL Insert 쿼리 실행문보다 아래에 둠으로써 일차적인 방어를 수행할 수 있습니다.

    먼저 MySQL Insert 를 Batch Insert 쿼리문으로 변경하고 auto-commit 이 아닌 Transaction 처리를 합니다.

    MySQL Insert 가 성공한다면, Kafka Consumer 의 Commit Offset 을 수행합니다.

    이로써 Consumer 프로그램이 재시작하더라도 데이터의 손실을 최소화할 수 있습니다.

     

    즉, 두가지 데이터 소스인 Kafka 와 MySQL 의 트랜잭션 처리의 동기화가 매우 중요합니다.

     

    from kafka import KafkaConsumer, TopicPartition
    import mysql.connector
    import time
    
    consumer = KafkaConsumer(
        "nums",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="num-consumer-group5",
        auto_offset_reset="earliest",
        fetch_min_bytes=1024,
        max_poll_records=10,
        enable_auto_commit=False
    )
    
    connection = mysql.connector.connect(
        host="localhost",
        port=3306,
        user="root",
        password="1234",
        database="mysql",
    
    )
    connection.autocommit = False
    cursor = connection.cursor()
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            committed_offset = consumer.committed(TopicPartition("nums", 0))
            print(F"committed_offset : {committed_offset}")
            time.sleep(5)
            success_or_not = cursor.execute(
                f"insert into test_table(id) values {','.join(['(' + record.value.decode('utf-8') + ')' for record in records])};")
            connection.commit()
            consumer.commit()
            time.sleep(1)

     

     

    마치며.

    내용이 길어지는 관계로 Consumer 의 Rebalancing 케이스와 데이터 중복에 대한 처리 내용은 다른 글에서 이어 설명하도록 하겠습니다.

    Kafka Consumer 가 외부 데이터 소스와 연결된 상태에서 Exactly-Once 처리는

    위와 같이 두 데이터 소스의 트랜잭션의 동기화가 매우 중요합니다.

     

     

     

    반응형
Designed by Tistory.