ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries )
    Kafka 2024. 1. 9. 05:22
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 카프카 클러스터의 네트워크를 의도적으로 불안정하게 만든 이후에

    Kafka Producer 가 어떻게 해야 안정적으로 데이터를 생성할 수 있는지 알아보려고 합니다.

     

    아래의 링크는 Docker 를 활용하여 카프카 클러스터를 구축하는 내용이 적힌 페이지의 주소입니다.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

     

    실험.

    아래의 명령어는 의도적으로 Kafka Docker Container 를 네트워크에서 제거하여 Retry 상황을 만듭니다.

    kafka_kafka 라는 Docker Bridge Network 에서 kafka1, kafka2, kafka3 컨테이너를 의도적으로 제거합니다.

    for i in {0..10000000};
    do 
    docker network disconnect kafka_kafka kafka1
    docker network disconnect kafka_kafka kafka1
    sleep 10
    docker network connect kafka_kafka kafka1
    docker network connect kafka_kafka kafka1
    sleep 10
    docker network disconnect kafka_kafka kafka2
    docker network disconnect kafka_kafka kafka2
    sleep 10
    docker network connect kafka_kafka kafka2
    docker network connect kafka_kafka kafka2
    sleep 10
    docker network disconnect kafka_kafka kafka3
    docker network disconnect kafka_kafka kafka3
    sleep 10
    docker network connect kafka_kafka kafka3
    docker network connect kafka_kafka kafka3
    sleep 10
    done

     

    위 명령어를 통해서 카프카 컨테이너를 Docker Network 에서 의도적으로 제거하게 되면 아래와 같은 에러 로그를 확인할 수 있습니다.

     java.net.SocketException: Network is unreachable
         at java.base/sun.nio.ch.Net.connect0(Native Method)
         at java.base/sun.nio.ch.Net.connect(Net.java:483)
         at java.base/sun.nio.ch.Net.connect(Net.java:472)
         at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:692)
         at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:260)
         at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:270)
         at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1177)
         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1211)
      INFO Opening socket connection to server zookeeper/172.24.0.2:2181. (org.apache.zookeeper.ClientCnxn)

     

    acks 0 & retries 0.

    먼저 "topic-acks-0-retries-0" 라는 이름의 토픽을 생성하겠습니다.

    kafka-topics --bootstrap-server localhost:9092 \
    --create --topic topic-acks-0-retries-0 \
    --partitions 3 --replication-factor 3 \
    --config min.insync.replicas=2

     

    그리고 ProducerConfig 로써 Acks 0, Reties 0 을 설정하였습니다.

    카프카가 불안정한 네트워크 상황 속에서 재시도없이 데이터가 손실되는 상황을 재현합니다.

    package com.westlife.producers;
    
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class SimpleKafkaProducer {
      public static void main (String[] args) {
        String topic = "topic-acks-0-retries-0";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.ACKS_CONFIG, "0");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
    
        KafkaProducer<String, String> producer = new KafkaProducer(properties);
        for (long i = 1; i <= 30000000L; i++) {
          ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i);
          producer.send(record);
        }
        producer.flush();
        producer.close();
      }
    }

     

    저는 3억개의 Record 를 전송하였구요. 대략적인 시간은 10분 정도가 소요되었습니다.

    아래 이미지처럼 29290540 레코드가 생성이 되었고, 7094599 개의 손실이 발생하였습니다.

    손실 비율은 2.3 % 에 해당합니다.

     

     

     

    acks 0 & retries 2147483647.

    이번엔 retries 를 최대치로 설정한 이후에 위와 같은 테스트를 진행해보겠습니다.

    아래와 같이 새로운 Topic 을 생성합니다.

    kafka-topics --bootstrap-server localhost:9092 \
    --create --topic topic-acks-0-retries-2147483647 \
    --partitions 3 --replication-factor 3 \
    --config min.insync.replicas=2

     

    package com.westlife.producers;
    
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class SimpleKafkaProducer {
      public static void main (String[] args) {
        String topic = "topic-acks-0-retries-2147483647";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.ACKS_CONFIG, "0");
        properties.put(ProducerConfig.RETRIES_CONFIG, "2147483647");
    
        KafkaProducer<String, String> producer = new KafkaProducer(properties);
        for (long i = 1; i <= 10000000L; i++) {
          ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i);
          producer.send(record);
        }
        producer.flush();
        producer.close();
      }
    }

     

    1천만건의 레코드의 생성을 시도하였고, 생성된 레코드는 9999166 입니다.

    834 개의 데이터 손실이 발생하였습니다.

    Retries 설정을 추가함으로써 손실의 비율이 확연히 줄어들긴 합니다.

     

     

    acks all & retries 2147483647.

    이번에는 데이터의 무손실을 위해서 acks 설정을 추가해보도록 하겠습니다.

    kafka-topics --bootstrap-server localhost:9092 \
    --create --topic topic-acks-all-retries-2147483647 \
    --partitions 3 --replication-factor 3 \
    --config min.insync.replicas=2

     

    package com.westlife.producers;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class SimpleKafkaProducer {
      public static void main (String[] args) {
        String topic = "topic-acks-all-retries-2147483647";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "2147483647");
    
        KafkaProducer<String, String> producer = new KafkaProducer(properties);
        for (long i = 1; i <= 100000L; i++) {
          ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i);
          producer.send(record);
        }
        producer.flush();
        producer.close();
      }
    }

     

    100만개의 레코드를 생성합니다.

    Acks 와 Retries 설정만으로도 손실없이 데이터를 생성할 수 있습니다.

     

     

    request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우.

    request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우에 데이터의 중복이 발생할 수 있습니다.

    먼저 request.timeout.ms 는 Producer 가 레코드를 생성하는 시간 범위입니다.

    request.timeout.ms 동안에 메시지의 생성이 실패한다면 지정된 retries 횟수만큼 재시도를 하게 됩니다.

    그래서 request.timeout.ms 동안에 메시지 생성에 성공하지 못하면 더 이상 retry 하지 않고 메시지 생성에 실패하게 됩니다.

     

    그리고 replica.lag.time.max.ms 는 Replica Broker 가 Leader Broker 의 Partition 을 복제하는 주기입니다.

    일반적으로 replica.lag.time.max.ms 는 30초를 기본값으로 가집니다.

    즉, 최대 30초를 주기로 Replica Broker 는 Leader Broker 의 Partition 을 복제합니다.

     

    문제 상황은 request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우에 아직 복제를 수행할 시간이 아닌데,

    Producer 는 복제가 시작하기 이전에 데이터 생성이 실패하였다고 새롭게 Retry 하게 됩니다.

    그래서 데이터 중복을 방지하기 위해서 request.timeout.ms 를 replica.lag.time.max.ms 보다 큰 값으로 설정해야합니다.

     

    kafka-topics --bootstrap-server localhost:9092 \
    --create --topic topic-request-timeout-10 \
    --partitions 3 --replication-factor 3 \
    --config min.insync.replicas=2
    package com.westlife.producers;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.*;
    import java.time.Duration;
    import java.util.*;
    
    public class SimpleKafkaProducer {
      public static void main (String[] args) {
        String topic = "topic-request-timeout-10";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "" + Integer.MAX_VALUE);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 5);
    
        KafkaProducer<String, String> producer = new KafkaProducer(properties);
        for (long i = 1; i <= 100000L; i++) {
          ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i);
          producer.send(record, (metadata, exception) -> {
            if (exception != null) {
              System.out.printf("%s %s %s\n", metadata.offset(), metadata.partition(), exception.getMessage());
            }
          });
        }
        producer.flush();
        producer.close();
      }
    }

     

    10만건의 메시지 생성을 시도하였구요.

    아래 이미지처럼 2465 건 만큼이 중복 생성된 것을 확인할 수 있습니다.

     

     

    반응형
Designed by Tistory.