-
[Kafka] Pull 방식의 복제와 Producer 속도 관계 알아보기 ( replica.fetch.wait.max.ms )Kafka 2024. 6. 25. 06:28반응형
- 목차
들어가며.
카프카는 여러 Broker 들이 서로 서로 데이터 복제를 수행합니다.
정확히는 리더 브로커가 소유한 파티션을 여러 팔로워 브로커들이 복제를 하게 됩니다.
Replication 과 관련된 첫번째 예시로 아래의 이미지를 살펴보면,
1개의 Topic 이 3개의 Partition 으로 구성됩니다.
이 Topic 의 Replication Factor 는 2로 설정되어 있습니다.
따라서 각 Partiton 은 1개의 Leader Broker 와 1개의 Follower Broker 를 필요로 하게 되며,
아래와 같은 구조를 가집니다.
Partition 0 의 Leader Broker 는 Broker1 이고, Follower Broker 는 Broker3 입니다.
따라서 Broker3 은 Broker1 로부터 Replication 을 수행하게 됩니다.
출처 : https://docs.confluent.io/kafka/design/replication.html 또 다른 예시로 아래와 같은 상황을 살펴봅시다.
1개의 Topic 이 존재하고 2개의 Partition 을 가집니다.
그리고 이 Topic 은 Replication Factor 가 2로 설정되어 있습니다.
3개의 Broker 중 101, 102 브로커가 Partition0, Partition1 의 Leader Broker 로써 동작하며,
Broker 102 가 Partition0 의 Follower Broker, Broker 103 이 Partition1 의 Follower Broker 로 동작합니다.
출처 : https://learn.conduktor.io/kafka/kafka-topic-replication/ 카프카에서 팔로워 브로커는 자신의 템포에 맞게 리더 브로커의 Log 데이터를 복제합니다.
다른 표현으로 Pull 방식의 복제라고도 합니다.
MySQL 도 대표적인 Pull 방식의 복제가 이루어집니다.
Slave MySQL 에서 Master MySQL 의 Bin Log 를 조회하여 데이터 복제가 이루어 집니다.
반면 Push 방식으로 데이터 복제가 이루어지는 경우도 있습니다.
대표적으로 Hadoop 에서 Data Block 들이 Push & Pipelining 방식으로 복제가 이루어집니다.
이번 글에서 알아볼 내용은 Pull 방식의 복제가 데이터 생성 속도에 미치는 영향을 알아보려고 합니다.
Push 방식이 아니기 때문에 Follower Broker 의 Replication 속도와 Data 생성 속도의 관계에 대해서 상세히 알아보도록 하겠습니다.
Acks 와 min.insync.replica.
Producer 는 Leader Broker 와 소통하게 됩니다.
Leader Broker 는 Follower Broker 와 다르게 데이터의 쓰기가 허용되는 브로커입니다.
그래서 Producer 는 오로지 Leader Broker 에게만 데이트 쓰기를 요청할 수 있습니다.
Leader Broker 는 Producer 에게서 데이터를 전달받아 자신의 Log File 에 저장합니다.
Follower Broker 는 Leader Broker 로부터 데이터를 전달받습니다.
복제는 이러한 흐름으로 수행되게 됩니다.
출처 : https://logiwa.tech/beyond-the-basics-of-kafka-producer-151a860c22a8 Producer 는 데이터의 생성과 복제의 정도를 acks Option 을 통해서 설정합니다.
ack 의 설정값은 0, 1, -1 가 존재합니다.
ack = 0.
ack == 0 인 Producer 는 데이터의 생성과 복제의 완료에 대해서 신경쓰지 않는 상태입니다.
Leader Broker 가 데이터를 잘 생성했는지,
그리고 Follower Broker 가 Leader Broker 로부터 데이터를 잘 복제했는지에 대해서 전혀 신경쓰지 않습니다.
이러한 방식을 Fire and Forget 이라고도 부릅니다.
ack 모드가 0 인 상태의 Producer 는 데이터의 저장과 복제에 전혀 신경을 쓰지 않기 때문에 가장 빠른 속도로 데이터 쓰기 작업을 수행합니다.
ack = 1.
ack 가 1 인 상태는 오직 데이터의 생성에만 집중하고 복제에 대해서는 신경을 쓰지 않습니다.
이게 무슨 뜻이냐면,
Leader Broker 가 자신의 Log File 에 Producer 가 전달한 Batch Record 를 잘 저장하면 Producer 에세 ack 응답을 제공합니다.
그 이후에 Follower Broker 들이 잘 복제를 수행했는지에 대해서 신경을 쓰지 않습니다.
ack = -1 (all).
마지막으로 ack 가 -1 인 상태가 있습니다.
ack 가 -1 이라는 의미는 데이터의 생성과 복제 모두 고려하는 상태를 의미합니다.
즉, Follower Broker 에 데이터 복제가 마무리되어야지 ack 응답이 발생합니다.
여기서 Topic 의 설정인 min.insync.replicas 의 개념이 적용되는데요.
만약에 A Topic 의 Replication Factor 가 3 이고 min.insync.replicas 가 2, 그리고 ack 가 -1 이라면,
Client 는 1개의 리더 브로커에 데이터가 생성되고, 1개의 팔로워 브로커에 데이터가 복제되면 ack 응답을 받습니다.
그리고 만약 B Topic 의 Replication Factor 가 3 이고 min.insync.replicas 가 3, 그리고 ack 가 -1 이라면,
Client 는 1개의 리더 브로커에 데이터가 생성되고, 2개의 팔로워 브로커에 데이터가 복제되면 ack 응답을 받습니다.
즉, 모든 브로커가 복제를 완료한 경우에 ack 응답이 발생합니다.
이해가 되시나요 ? min.insync.replicas 는 동일한 데이터를 (insync) 를 가지야하는 Broker (replica) 의 숫자를 의미합니다.
따라서 ack 가 -1 이면서 min.insync.replicas 의 값이 Replication Factor 와 동일한 경우에는 가장 강력한 데이터 무결성이 보장됩니다.
replica.fetch.wait.max.ms
replica.fetch.wait.max.ms 는 팔로워 브로커의 ReplicaFetcherThread 가 데이터 복제를 수행하는 주기로 생각하시면 됩니다.
정확한 동작의 원리는 저도 잘 모르지만, Replication 을 수행하는 ReplicaFetcherThread 의 복제 동작 사이사이에 Waiting 시간의 적용하는 개념입니다.
따라서 replica.fetch.wait.max.ms 의 값에 변화를 주어서 ack=-1 의 데이터 쓰기 속도 차이를 실험해보려고 합니다.
replica.fetch.wait.max.ms = 500 (기본값)
아래의 Docker 명령어들을 수행하면 2개의 Kafka Broker 들을 생성할 수 있습니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=500 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=500 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -p 29092:29092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
그리고 http://localhost:8080 주소에 접속하여 Kafka UI 관리자 페이지에 접속할 수 있습니다.
여기서 아래와 같이 Partition 2, Replication Factor 2, min.insync.replicas 2 인 Topic 을 생성하겠습니다.
그리고 아래와 같이 Producer 스크립트를 통해서 데이터 생성 시간을 측정합니다.
import time from confluent_kafka import Producer from datetime import datetime bootstrap_servers = 'localhost:19092,localhost:29092' producer_conf = { 'bootstrap.servers': bootstrap_servers, 'batch.size': 1, } producer = Producer(producer_conf) topic = "test" start = datetime.now() def delivery_report(err, msg): print(datetime.now() - start) if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}]") producer.produce(topic, key=None, value=b'1', callback=delivery_report) producer.flush() time.sleep(100000)
먼저 replica.fetch.wait.max.ms 가 500 인 상태에선 아래와 같이 1초 미만의 짧은 시간이 소요됩니다.
0:00:00.046372
replica.fetch.wait.max.ms = 10000 (10초)
이번에는 데이터 복제의 대기 시간을 10초로 설정합니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=10000 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=10000 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -p 29092:29092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
좀 편차가 심하긴한데, 10초라는 최대값 내에서 다양한 범위의 시간이 소요됩니다.
0:00:01.939550 0:00:04.229085 0:00:09.064745
replica.fetch.wait.max.ms = 29000 (29초)
마지막으로 29초의 대기시간을 설정하였습니다.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=29000 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka2 --hostname kafka2 --net kafka-net \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=29000 \ -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \ -p 29092:29092 \ confluentinc/cp-kafka:7.6.4 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
그리고 Producer 스크립트를 실행해보면 아래와 같이 최대 29초를 기준으로 다양한 데이터 생성 시간이 관찰됩니다.
0:00:24.676378
마치며.
Kafka Broker 는 Pull 방식으로 데이터 복제가 수행됩니다.
따라서 ack = -1 (all) 과 같이 데이터 복제를 고려한 쓰기 방식은 복제의 속도에 영향을 받게 됩니다.
그래서 안정성을 지키면서 빠른 데이터 쓰기 속도를 보장하기 위해서 복제의 성능을 올릴 수 있는 여러 요소에 대한 고민도 필요합니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number ) (0) 2024.06.25 [Kafka] request.timeout.ms 와 데이터 중복 생성 알아보기 (0) 2024.06.25 [Kafka] Topic 의 retention.bytes & retention.ms 알아보기 (0) 2024.06.24 [Kafka] num.replica.fetchers 와 ReplicaFetcherThread 알아보기 (0) 2024.06.24 [Kafka] advertised.listeners 와 Socket Acceptor Listener Thread 알아보기 (0) 2024.06.22