ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.ms )
    Kafka 2024. 1. 13. 05:31
    728x90
    반응형

    - 목차

     

    관련있는 다른 글.

    https://westlife0615.tistory.com/587

     

    Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.partition.fetch.bytes)

    - 목차 관련있는 다른 글. https://westlife0615.tistory.com/588 Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.m - 목차 들어가며. 이번 글에서는 Kafka Consume

    westlife0615.tistory.com

     

    들어가며.

    이번 글에서는 Kafka Consumer 의 Configuration 들 중에서

    heartbeat.interval.ms, session.timeout.ms, auto.offset.reset, auto.commit.interval.ms 등

    에 대해서 알아보도록 하겠습니다.

     

    heartbeat.interval.ms

    Kafka Consumer 는 Broker 와 통신을 주고 받습니다.

    이렇게 Kafka Consumer 와 통신하는 Broker 를 Group Coordinator 라고 부르는데요.

    여러 개의 Broker 들 중에서 특정 Consumer Group 들과 전적으로 통신하는 Broker 를 Group Coordinator 라고 합니다.

    그리고 Group Coordinator 는 Consumer Group 마다 매칭되는 Broker 가 개별적으로 존재합니다.

    아래 이미지와 같은 느낌으로 Broker 와 Consumer Group 이 매칭됩니다.

    Broker 1 은 Consumer Group 1 과 Consumer Group 4 의 Group Coordinator

    Broker 2 은 Consumer Group 2 의 Group Coordinator

    Broker 3 은 Consumer Group 3 의 Group Coordinator 가 됩니다.

     

    그리고 Consumer Group 은 여러 Consumer 들로 구성됩니다.

    Consumer 는 반드시 Broker 에게 자신이 살아았음을 알려야하는데요.

    이렇게 자신이 살아있음을 알리는 신호를 heartbeat 라고 부릅니다.

    heartbeat.interval.ms 는 Consumer 가 Broker 에게 heartbeat 를 전달하는 주기입니다.

    heartbeat.interval.ms 의 기본값은 3초입니다.

    Consumer 들은 3초마다 한번씩 자신의 생존 여부를 Broker 에게 전달합니다.

     

     

    session.timeout.ms

    session.timeout.ms 는 Heartbeat 와 연관이 있는 설정값입니다.

    session.timeout.ms 는 Consumer 가 Heartbeat 를 전송하지 않을 수 있는 제한 시간인데요.

    다른 표현으로 Consumer 의 Idle 상태를 허용하는 시간입니다.

    어떠한 이유로 Consumer 가 Heartbeat 를 Broker 에게 전달할 수 없을 수도 있습니다.

    예를 들어 네트워크 지연이 원인을 수도 있죠.

    이러한 경우를 고려하여 Broker 는 session.timeout.ms 에 정해진 시간만큼 Consumer 가 Heartbeat 를 전송하지 않는 것을 허용합니다.

    session.timeout.ms 의 기본값은 10초, heartbeat.interval.ms 는 3초입니다.

    즉, Consumer 는 3초에 한번씩 Heartbeat 를 전송하며, 혹여나 전송이 실패하더라도 10초 동안은 다음 Heartbeat 시도를 기다리겠다는 의미입니다.

     

    아래의 이미지처럼 2번의 Hearbeat 전송을 실패하였지만,

    session.timeout.ms 인 10초 내로 Heartbeat 가 성공한다면 Broker 는 Consumer 의 생존을 확인할 수 있습니다.

    만약 모든 Heartbeat 가 실패한다면, Rebalancing 이 발생합니다.

    Rebalancing 에 대해서는 다른 글에서 설명해보도록 하겠습니다.

     

     

    auto.offset.reset

    auto.offset.reset 은 Topic 의 레코드를 어디에서부터 읽어들일지에 대한 설정입니다.

    auto.offset.reset 은 earliest 와 latest 가 있습니다.

    earlist 는 Topic 의 첫 레코드부터 읽어들이는 설정입니다.

    latest 는 새롭게 추가된 레코드부터 읽어들이는 설정이구요.

    아래 이미지와 같이 Record1 ~ Record4 까지 존재하는 Topic 의 경우에 auto.offset.reset 은 earliest 로 Consume 하게 되면,

    Record1, Record2, Record3, Record4 순서로 데이터를 읽어들이게 됩니다.

    반면, auto.offset.reset 이 latest 인 경우에는 새롭게 추가되는 Record 부터 읽어들이게 됩니다.

     

     

    그리고 auto.offset.reset 은 Consumer Group 과 Commit Offset 과도 관련이 있습니다.

    Commit Offset 이란 Consumer Group 이 읽어들인 Record 의 순서인데요.

    즉, " N 번째 Record 까지 읽었고, 다음엔 N + 1 번째 Record 부터 읽겠다." 라는 기능을 수행합니다.

    auto.offset.reset 은 Commit Offset 이 설정되지 않은 경우 또는 Commit Offset 이 Outdated 되어 Invalid 한 케이스에서 적용됩니다.

    그래서 Commit Offset 이 존재하는 상황에서는 auto.offset.reset 이 적용되지 않습니다.

    아래 사진처럼 auto-offset-reset Topic 에 1부터 9까지 데이터가 존재합니다.

     

    auto.offset.reset : earliest.

    auto.offset.reset 은 earliest 로 설정한 이후에 조회를 하게 되면, 모든 Record 들이 조회됩니다.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        "auto-offset-reset",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group1",
        auto_offset_reset="earliest",
        session_timeout_ms=10000,
        heartbeat_interval_ms=3000
    )
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)
    b'1'
    b'2'
    b'3'
    b'4'
    b'5'
    b'6'
    b'7'
    b'8'
    b'9'

     

    auto.offset.reset : latest.

    반면 auto.offset.reset 설정을 latest 로 하게 되면 새롭게 추가된 레코드부터 조회됩니다.

    아래의 예시는 {key : null, value : 10} 인 레코드를 추가하였기 때문에 b'10' 이 출력됩니다.

    from kafka import KafkaConsumer, KafkaProducer
    
    consumer = KafkaConsumer(
        "auto-offset-reset",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group2",
        auto_offset_reset="latest",
        session_timeout_ms=10000,
        heartbeat_interval_ms=3000
    )
    producer = KafkaProducer(
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        value_serializer=lambda x: x.encode("utf-8"),
        key_serializer=None,
    )
    producer.send(topic="auto-offset-reset", key=None, value="10")
    producer.flush()
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)
    b'10'

     

    Consumer Group 의 Committed Offset 이 존재하는 경우.

    아래 사진과 같이 Consumer Group 의 Committed Offset 이 존재하는 상황에선 auto.offset.reset 이 적용되지 않습니다.

    위 auto.offset.reset : earliest 예시코드를 다시 시도해보면, 새롭게 추가된 10 이라는 값을 Consume 합니다.

    이는 더 이상 auto.offset.reset 정책이 적용되지 않음을 의미합니다.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        "auto-offset-reset",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group1",
        auto_offset_reset="earliest",
        session_timeout_ms=10000,
        heartbeat_interval_ms=3000
    )
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)
    b'10'

     

    보통 Kafka 의 Consumer 들은 분산 클러스터에서 동작하는 경우가 많습니다.

    Spark Stream, Flink Stream, Kafka Stream 등이 있죠.

    그리고 대용량의 스트림 처리를 해야하는 구조이기에 서버가 다운되고 Recovery 되는 상황이 잦죠.

    이때 Kafka Consumer 가 재시작할 때마다 auto.offset.reset 을 따르게 된다면, 데이터가 중복되거나 손실되는 경우가 발생합니다.

    그래서 아래 사진과 같이 Committed Offset 이 존재하는 상황이라면 Committed Offset 을 따르게 됩니다.

     

     

    enable.auto.commit & auto.commit.interval.ms

    enable.auto.commit 은 Consumer 가 조회한 레코드의 Offset 을 자동으로 commit 하는 설정입니다.

    만약 enable.auto.commit 을 설정하지 않으면 코드 레벨에서 commit 명령을 해주어야합니다.

    아래의 코드 예시가 enable.auto.commit 을 Disabled 시킨 상태이구요.

    아래의 코드를 실행시키면 Offset Commit 동작이 수행되지 않습니다.

    그래서 Committed Offset 이 존재하지 않아서 매번 earliest 부터 레코드를 읽어들입니다.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        "auto-offset-reset",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group7",
        auto_offset_reset="earliest",
        enable_auto_commit=False
    )
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)
    b'1'
    b'2'
    b'3'
    b'4'
    b'5'
    b'6'
    b'7'
    b'8'
    b'9'
    b'10'

     

    명시적으로 commit 을 시도하는 케이스.

    아래의 코드 예시는 enable.auto.commit 이 Disabled 된 상태에서 명시적으로 commit 을 시도하는 코드입니다.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        "auto-offset-reset",
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093",
        group_id="test-consumer-group7",
        auto_offset_reset="earliest",
        enable_auto_commit=False
    )
    
    while True:
        response = consumer.poll(timeout_ms=5 * 1000)
        for _, records in response.items():
            for record in records:
                print(record.value)
                consumer.commit()

     

    그리고 Committed Offset 이 아래와 같이 생성됩니다.

     

     

    auto.commit.interval.ms 는 auto.commit 을 위한 주기입니다.

    Heartbeat 처럼 interval 를 설정할 수 있구요.

    enable.auto.commit 과 auto.commit.interval.ms 을 함께 설정하여 사용합니다.

     

     

    반응형
Designed by Tistory.