-
Kafka Consumer Configuration 알아보기 ( session.timeout.ms, heartbeat.interval.ms, auto.offset.reset, auto.commit.interval.ms )Kafka 2024. 1. 13. 05:31728x90반응형
- 목차
관련있는 다른 글.
https://westlife0615.tistory.com/587
들어가며.
이번 글에서는 Kafka Consumer 의 Configuration 들 중에서
heartbeat.interval.ms, session.timeout.ms, auto.offset.reset, auto.commit.interval.ms 등
에 대해서 알아보도록 하겠습니다.
heartbeat.interval.ms
Kafka Consumer 는 Broker 와 통신을 주고 받습니다.
이렇게 Kafka Consumer 와 통신하는 Broker 를 Group Coordinator 라고 부르는데요.
여러 개의 Broker 들 중에서 특정 Consumer Group 들과 전적으로 통신하는 Broker 를 Group Coordinator 라고 합니다.
그리고 Group Coordinator 는 Consumer Group 마다 매칭되는 Broker 가 개별적으로 존재합니다.
아래 이미지와 같은 느낌으로 Broker 와 Consumer Group 이 매칭됩니다.
Broker 1 은 Consumer Group 1 과 Consumer Group 4 의 Group Coordinator
Broker 2 은 Consumer Group 2 의 Group Coordinator
Broker 3 은 Consumer Group 3 의 Group Coordinator 가 됩니다.
그리고 Consumer Group 은 여러 Consumer 들로 구성됩니다.
Consumer 는 반드시 Broker 에게 자신이 살아았음을 알려야하는데요.
이렇게 자신이 살아있음을 알리는 신호를 heartbeat 라고 부릅니다.
heartbeat.interval.ms 는 Consumer 가 Broker 에게 heartbeat 를 전달하는 주기입니다.
heartbeat.interval.ms 의 기본값은 3초입니다.
Consumer 들은 3초마다 한번씩 자신의 생존 여부를 Broker 에게 전달합니다.
session.timeout.ms
session.timeout.ms 는 Heartbeat 와 연관이 있는 설정값입니다.
session.timeout.ms 는 Consumer 가 Heartbeat 를 전송하지 않을 수 있는 제한 시간인데요.
다른 표현으로 Consumer 의 Idle 상태를 허용하는 시간입니다.
어떠한 이유로 Consumer 가 Heartbeat 를 Broker 에게 전달할 수 없을 수도 있습니다.
예를 들어 네트워크 지연이 원인을 수도 있죠.
이러한 경우를 고려하여 Broker 는 session.timeout.ms 에 정해진 시간만큼 Consumer 가 Heartbeat 를 전송하지 않는 것을 허용합니다.
session.timeout.ms 의 기본값은 10초, heartbeat.interval.ms 는 3초입니다.
즉, Consumer 는 3초에 한번씩 Heartbeat 를 전송하며, 혹여나 전송이 실패하더라도 10초 동안은 다음 Heartbeat 시도를 기다리겠다는 의미입니다.
아래의 이미지처럼 2번의 Hearbeat 전송을 실패하였지만,
session.timeout.ms 인 10초 내로 Heartbeat 가 성공한다면 Broker 는 Consumer 의 생존을 확인할 수 있습니다.
만약 모든 Heartbeat 가 실패한다면, Rebalancing 이 발생합니다.
Rebalancing 에 대해서는 다른 글에서 설명해보도록 하겠습니다.
auto.offset.reset
auto.offset.reset 은 Topic 의 레코드를 어디에서부터 읽어들일지에 대한 설정입니다.
auto.offset.reset 은 earliest 와 latest 가 있습니다.
earlist 는 Topic 의 첫 레코드부터 읽어들이는 설정입니다.
latest 는 새롭게 추가된 레코드부터 읽어들이는 설정이구요.
아래 이미지와 같이 Record1 ~ Record4 까지 존재하는 Topic 의 경우에 auto.offset.reset 은 earliest 로 Consume 하게 되면,
Record1, Record2, Record3, Record4 순서로 데이터를 읽어들이게 됩니다.
반면, auto.offset.reset 이 latest 인 경우에는 새롭게 추가되는 Record 부터 읽어들이게 됩니다.
그리고 auto.offset.reset 은 Consumer Group 과 Commit Offset 과도 관련이 있습니다.
Commit Offset 이란 Consumer Group 이 읽어들인 Record 의 순서인데요.
즉, " N 번째 Record 까지 읽었고, 다음엔 N + 1 번째 Record 부터 읽겠다." 라는 기능을 수행합니다.
auto.offset.reset 은 Commit Offset 이 설정되지 않은 경우 또는 Commit Offset 이 Outdated 되어 Invalid 한 케이스에서 적용됩니다.
그래서 Commit Offset 이 존재하는 상황에서는 auto.offset.reset 이 적용되지 않습니다.
아래 사진처럼 auto-offset-reset Topic 에 1부터 9까지 데이터가 존재합니다.
auto.offset.reset : earliest.
auto.offset.reset 은 earliest 로 설정한 이후에 조회를 하게 되면, 모든 Record 들이 조회됩니다.
from kafka import KafkaConsumer consumer = KafkaConsumer( "auto-offset-reset", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group1", auto_offset_reset="earliest", session_timeout_ms=10000, heartbeat_interval_ms=3000 ) while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value)
b'1' b'2' b'3' b'4' b'5' b'6' b'7' b'8' b'9'
auto.offset.reset : latest.
반면 auto.offset.reset 설정을 latest 로 하게 되면 새롭게 추가된 레코드부터 조회됩니다.
아래의 예시는 {key : null, value : 10} 인 레코드를 추가하였기 때문에 b'10' 이 출력됩니다.
from kafka import KafkaConsumer, KafkaProducer consumer = KafkaConsumer( "auto-offset-reset", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group2", auto_offset_reset="latest", session_timeout_ms=10000, heartbeat_interval_ms=3000 ) producer = KafkaProducer( bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", value_serializer=lambda x: x.encode("utf-8"), key_serializer=None, ) producer.send(topic="auto-offset-reset", key=None, value="10") producer.flush() while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value)
b'10'
Consumer Group 의 Committed Offset 이 존재하는 경우.
아래 사진과 같이 Consumer Group 의 Committed Offset 이 존재하는 상황에선 auto.offset.reset 이 적용되지 않습니다.
위 auto.offset.reset : earliest 예시코드를 다시 시도해보면, 새롭게 추가된 10 이라는 값을 Consume 합니다.
이는 더 이상 auto.offset.reset 정책이 적용되지 않음을 의미합니다.
from kafka import KafkaConsumer consumer = KafkaConsumer( "auto-offset-reset", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group1", auto_offset_reset="earliest", session_timeout_ms=10000, heartbeat_interval_ms=3000 ) while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value)
b'10'
보통 Kafka 의 Consumer 들은 분산 클러스터에서 동작하는 경우가 많습니다.
Spark Stream, Flink Stream, Kafka Stream 등이 있죠.
그리고 대용량의 스트림 처리를 해야하는 구조이기에 서버가 다운되고 Recovery 되는 상황이 잦죠.
이때 Kafka Consumer 가 재시작할 때마다 auto.offset.reset 을 따르게 된다면, 데이터가 중복되거나 손실되는 경우가 발생합니다.
그래서 아래 사진과 같이 Committed Offset 이 존재하는 상황이라면 Committed Offset 을 따르게 됩니다.
enable.auto.commit & auto.commit.interval.ms
enable.auto.commit 은 Consumer 가 조회한 레코드의 Offset 을 자동으로 commit 하는 설정입니다.
만약 enable.auto.commit 을 설정하지 않으면 코드 레벨에서 commit 명령을 해주어야합니다.
아래의 코드 예시가 enable.auto.commit 을 Disabled 시킨 상태이구요.
아래의 코드를 실행시키면 Offset Commit 동작이 수행되지 않습니다.
그래서 Committed Offset 이 존재하지 않아서 매번 earliest 부터 레코드를 읽어들입니다.
from kafka import KafkaConsumer consumer = KafkaConsumer( "auto-offset-reset", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group7", auto_offset_reset="earliest", enable_auto_commit=False ) while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value)
b'1' b'2' b'3' b'4' b'5' b'6' b'7' b'8' b'9' b'10'
명시적으로 commit 을 시도하는 케이스.
아래의 코드 예시는 enable.auto.commit 이 Disabled 된 상태에서 명시적으로 commit 을 시도하는 코드입니다.
from kafka import KafkaConsumer consumer = KafkaConsumer( "auto-offset-reset", bootstrap_servers="localhost:29091,localhost:29092,localhost:29093", group_id="test-consumer-group7", auto_offset_reset="earliest", enable_auto_commit=False ) while True: response = consumer.poll(timeout_ms=5 * 1000) for _, records in response.items(): for record in records: print(record.value) consumer.commit()
그리고 Committed Offset 이 아래와 같이 생성됩니다.
auto.commit.interval.ms 는 auto.commit 을 위한 주기입니다.
Heartbeat 처럼 interval 를 설정할 수 있구요.
enable.auto.commit 과 auto.commit.interval.ms 을 함께 설정하여 사용합니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] Replication 의 시간은 얼마나 걸릴까 ? ( kafka-reassign-partitions ) (0) 2024.01.14 [Kafka Consumer] Exactly-Once 구현하기 (0) 2024.01.13 Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.parti (1) 2024.01.12 Kafka Log Compaction 알아보기 (0) 2024.01.12 [ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries ) (0) 2024.01.09