ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Pull 방식의 복제와 Producer 속도 관계 알아보기 ( replica.fetch.wait.max.ms )
    Kafka 2024. 6. 25. 06:28
    반응형

    - 목차

     

    들어가며.

    카프카는 여러 Broker 들이 서로 서로 데이터 복제를 수행합니다.

    정확히는 리더 브로커가 소유한 파티션을 여러 팔로워 브로커들이 복제를 하게 됩니다.

     

    Replication 과 관련된 첫번째 예시로 아래의 이미지를 살펴보면,

    1개의 Topic 이 3개의 Partition 으로 구성됩니다.

    이 Topic 의 Replication Factor 는 2로 설정되어 있습니다.

    따라서 각 Partiton 은 1개의 Leader Broker 와 1개의 Follower Broker 를 필요로 하게 되며,

    아래와 같은 구조를 가집니다.

    Partition 0 의 Leader Broker 는 Broker1 이고, Follower Broker 는 Broker3 입니다.

    따라서 Broker3 은 Broker1 로부터 Replication 을 수행하게 됩니다.

    출처 : https://docs.confluent.io/kafka/design/replication.html

     

     

     

    또 다른 예시로 아래와 같은 상황을 살펴봅시다.

    1개의 Topic 이 존재하고 2개의 Partition 을 가집니다.

    그리고 이 Topic 은 Replication Factor 가 2로 설정되어 있습니다.

    3개의 Broker 중 101, 102 브로커가 Partition0, Partition1 의 Leader Broker 로써 동작하며,

    Broker 102 가 Partition0 의 Follower Broker, Broker 103 이 Partition1 의 Follower Broker 로 동작합니다.

    출처 : https://learn.conduktor.io/kafka/kafka-topic-replication/

     

    카프카에서 팔로워 브로커는 자신의 템포에 맞게 리더 브로커의 Log 데이터를 복제합니다.

    다른 표현으로 Pull 방식의 복제라고도 합니다.

    MySQL 도 대표적인 Pull 방식의 복제가 이루어집니다.

    Slave MySQL 에서 Master MySQL 의 Bin Log 를 조회하여 데이터 복제가 이루어 집니다.

    반면 Push 방식으로 데이터 복제가 이루어지는 경우도 있습니다.

    대표적으로 Hadoop 에서 Data Block 들이 Push & Pipelining 방식으로 복제가 이루어집니다.

     

    이번 글에서 알아볼 내용은 Pull 방식의 복제가 데이터 생성 속도에 미치는 영향을 알아보려고 합니다.

    Push 방식이 아니기 때문에 Follower Broker 의 Replication 속도와 Data 생성 속도의 관계에 대해서 상세히 알아보도록 하겠습니다.

     

    Acks 와 min.insync.replica.

    Producer 는 Leader Broker 와 소통하게 됩니다.

    Leader Broker 는 Follower Broker 와 다르게 데이터의 쓰기가 허용되는 브로커입니다.

    그래서 Producer 는 오로지 Leader Broker 에게만 데이트 쓰기를 요청할 수 있습니다.

    Leader Broker 는 Producer 에게서 데이터를 전달받아 자신의 Log File 에 저장합니다.

    Follower Broker 는 Leader Broker 로부터 데이터를 전달받습니다.

    복제는 이러한 흐름으로 수행되게 됩니다.

     

    출처 : https://logiwa.tech/beyond-the-basics-of-kafka-producer-151a860c22a8

     

     

    Producer 는 데이터의 생성과 복제의 정도를 acks Option 을 통해서 설정합니다.

    ack 의 설정값은 0, 1, -1 가 존재합니다.

     

    ack = 0.

    ack == 0 인 Producer 는 데이터의 생성과 복제의 완료에 대해서 신경쓰지 않는 상태입니다.

    Leader Broker 가 데이터를 잘 생성했는지,

    그리고 Follower Broker 가 Leader Broker 로부터 데이터를 잘 복제했는지에 대해서 전혀 신경쓰지 않습니다.

    이러한 방식을 Fire and Forget 이라고도 부릅니다.

    ack 모드가 0 인 상태의 Producer 는 데이터의 저장과 복제에 전혀 신경을 쓰지 않기 때문에 가장 빠른 속도로 데이터 쓰기 작업을 수행합니다.

     

     

    ack = 1.

    ack 가 1 인 상태는 오직 데이터의 생성에만 집중하고 복제에 대해서는 신경을 쓰지 않습니다.

    이게 무슨 뜻이냐면,

    Leader Broker 가 자신의 Log File 에 Producer 가 전달한 Batch Record 를 잘 저장하면 Producer 에세 ack 응답을 제공합니다.

    그 이후에 Follower Broker 들이 잘 복제를 수행했는지에 대해서 신경을 쓰지 않습니다.

     

    ack = -1 (all).

    마지막으로 ack 가 -1 인 상태가 있습니다.

    ack 가 -1 이라는 의미는 데이터의 생성과 복제 모두 고려하는 상태를 의미합니다.

    즉, Follower Broker 에 데이터 복제가 마무리되어야지 ack 응답이 발생합니다.

     

    여기서 Topic 의 설정인 min.insync.replicas 의 개념이 적용되는데요.

    만약에 A Topic 의 Replication Factor 가 3 이고 min.insync.replicas 가 2, 그리고 ack 가 -1 이라면,

    Client 는 1개의 리더 브로커에 데이터가 생성되고, 1개의 팔로워 브로커에 데이터가 복제되면 ack 응답을 받습니다.

    그리고 만약 B Topic 의 Replication Factor 가 3 이고 min.insync.replicas 가 3, 그리고 ack 가 -1 이라면, 

    Client 는 1개의 리더 브로커에 데이터가 생성되고, 2개의 팔로워 브로커에 데이터가 복제되면 ack 응답을 받습니다. 

    즉, 모든 브로커가 복제를 완료한 경우에 ack 응답이 발생합니다.

    이해가 되시나요 ? min.insync.replicas 는 동일한 데이터를 (insync) 를 가지야하는 Broker (replica) 의 숫자를 의미합니다.

     

    따라서 ack 가 -1 이면서 min.insync.replicas 의 값이 Replication Factor 와 동일한 경우에는 가장 강력한 데이터 무결성이 보장됩니다.

     

     

    replica.fetch.wait.max.ms

    replica.fetch.wait.max.ms 는 팔로워 브로커의 ReplicaFetcherThread 가 데이터 복제를 수행하는 주기로 생각하시면 됩니다.

    정확한 동작의 원리는 저도 잘 모르지만, Replication 을 수행하는 ReplicaFetcherThread 의 복제 동작 사이사이에 Waiting 시간의 적용하는 개념입니다.

     

    따라서 replica.fetch.wait.max.ms 의 값에 변화를 주어서 ack=-1 의 데이터 쓰기 속도 차이를 실험해보려고 합니다.

     

     

    replica.fetch.wait.max.ms = 500 (기본값)

    아래의 Docker 명령어들을 수행하면 2개의 Kafka Broker 들을 생성할 수 있습니다.

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=500 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=2 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=500 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -p 29092:29092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    그리고 http://localhost:8080 주소에 접속하여 Kafka UI 관리자 페이지에 접속할 수 있습니다.

    여기서 아래와 같이 Partition 2, Replication Factor 2, min.insync.replicas 2 인 Topic 을 생성하겠습니다.

     

     

    그리고 아래와 같이 Producer 스크립트를 통해서 데이터 생성 시간을 측정합니다.

    import time
    
    from confluent_kafka import Producer
    from datetime import datetime
    
    bootstrap_servers = 'localhost:19092,localhost:29092'
    
    producer_conf = {
        'bootstrap.servers': bootstrap_servers,
        'batch.size': 1,
    }
    
    producer = Producer(producer_conf)
    
    topic = "test"
    
    start = datetime.now()
    def delivery_report(err, msg):
        print(datetime.now() - start)
        if err is not None:
            print(f"Message delivery failed: {err}")
        else:
            print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
    
    producer.produce(topic, key=None, value=b'1', callback=delivery_report)
    producer.flush()
    
    time.sleep(100000)

     

    먼저 replica.fetch.wait.max.ms 가 500 인 상태에선 아래와 같이 1초 미만의 짧은 시간이 소요됩니다.

    0:00:00.046372

     

     

    replica.fetch.wait.max.ms = 10000 (10초)

    이번에는 데이터 복제의 대기 시간을 10초로 설정합니다.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=10000 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=2 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=10000 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -p 29092:29092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    좀 편차가 심하긴한데, 10초라는 최대값 내에서 다양한 범위의 시간이 소요됩니다.

    0:00:01.939550
    0:00:04.229085
    0:00:09.064745

     

     

    replica.fetch.wait.max.ms = 29000 (29초)

    마지막으로 29초의 대기시간을 설정하였습니다.

     

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:19092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,OUTER://localhost:19092 \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=29000 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka2 --hostname kafka2 --net kafka-net \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_BROKER_ID=2 \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 \
      -e KAFKA_LISTENERS=PLAINTEXT://:9092,OUTER://:29092 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9092,OUTER://localhost:29092 \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -e KAFKA_REPLICA_FETCH_WAIT_MAX_MS=29000 \
      -e KAFKA_REPLICA_FETCH_MIN_BYTES=1048576 \
      -p 29092:29092 \
      confluentinc/cp-kafka:7.6.4
    
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    그리고 Producer 스크립트를 실행해보면 아래와 같이 최대 29초를 기준으로 다양한 데이터 생성 시간이 관찰됩니다.

     

    0:00:24.676378

     

     

    마치며.

    Kafka Broker 는 Pull 방식으로 데이터 복제가 수행됩니다.

    따라서 ack = -1 (all) 과 같이 데이터 복제를 고려한 쓰기 방식은 복제의 속도에 영향을 받게 됩니다.

    그래서 안정성을 지키면서 빠른 데이터 쓰기 속도를 보장하기 위해서 복제의 성능을 올릴 수 있는 여러 요소에 대한 고민도 필요합니다.

     

     

    반응형
Designed by Tistory.