-
[ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries )Kafka 2024. 1. 9. 05:22728x90반응형
- 목차
들어가며.
이번 글에서는 카프카 클러스터의 네트워크를 의도적으로 불안정하게 만든 이후에
Kafka Producer 가 어떻게 해야 안정적으로 데이터를 생성할 수 있는지 알아보려고 합니다.
아래의 링크는 Docker 를 활용하여 카프카 클러스터를 구축하는 내용이 적힌 페이지의 주소입니다.
https://westlife0615.tistory.com/474
실험.
아래의 명령어는 의도적으로 Kafka Docker Container 를 네트워크에서 제거하여 Retry 상황을 만듭니다.
kafka_kafka 라는 Docker Bridge Network 에서 kafka1, kafka2, kafka3 컨테이너를 의도적으로 제거합니다.
for i in {0..10000000}; do docker network disconnect kafka_kafka kafka1 docker network disconnect kafka_kafka kafka1 sleep 10 docker network connect kafka_kafka kafka1 docker network connect kafka_kafka kafka1 sleep 10 docker network disconnect kafka_kafka kafka2 docker network disconnect kafka_kafka kafka2 sleep 10 docker network connect kafka_kafka kafka2 docker network connect kafka_kafka kafka2 sleep 10 docker network disconnect kafka_kafka kafka3 docker network disconnect kafka_kafka kafka3 sleep 10 docker network connect kafka_kafka kafka3 docker network connect kafka_kafka kafka3 sleep 10 done
위 명령어를 통해서 카프카 컨테이너를 Docker Network 에서 의도적으로 제거하게 되면 아래와 같은 에러 로그를 확인할 수 있습니다.
java.net.SocketException: Network is unreachable at java.base/sun.nio.ch.Net.connect0(Native Method) at java.base/sun.nio.ch.Net.connect(Net.java:483) at java.base/sun.nio.ch.Net.connect(Net.java:472) at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:692) at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:260) at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:270) at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1177) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1211) INFO Opening socket connection to server zookeeper/172.24.0.2:2181. (org.apache.zookeeper.ClientCnxn)
acks 0 & retries 0.
먼저 "topic-acks-0-retries-0" 라는 이름의 토픽을 생성하겠습니다.
kafka-topics --bootstrap-server localhost:9092 \ --create --topic topic-acks-0-retries-0 \ --partitions 3 --replication-factor 3 \ --config min.insync.replicas=2
그리고 ProducerConfig 로써 Acks 0, Reties 0 을 설정하였습니다.
카프카가 불안정한 네트워크 상황 속에서 재시도없이 데이터가 손실되는 상황을 재현합니다.
package com.westlife.producers; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleKafkaProducer { public static void main (String[] args) { String topic = "topic-acks-0-retries-0"; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); KafkaProducer<String, String> producer = new KafkaProducer(properties); for (long i = 1; i <= 30000000L; i++) { ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i); producer.send(record); } producer.flush(); producer.close(); } }
저는 3억개의 Record 를 전송하였구요. 대략적인 시간은 10분 정도가 소요되었습니다.
아래 이미지처럼 29290540 레코드가 생성이 되었고, 7094599 개의 손실이 발생하였습니다.
손실 비율은 2.3 % 에 해당합니다.
acks 0 & retries 2147483647.
이번엔 retries 를 최대치로 설정한 이후에 위와 같은 테스트를 진행해보겠습니다.
아래와 같이 새로운 Topic 을 생성합니다.
kafka-topics --bootstrap-server localhost:9092 \ --create --topic topic-acks-0-retries-2147483647 \ --partitions 3 --replication-factor 3 \ --config min.insync.replicas=2
package com.westlife.producers; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleKafkaProducer { public static void main (String[] args) { String topic = "topic-acks-0-retries-2147483647"; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.RETRIES_CONFIG, "2147483647"); KafkaProducer<String, String> producer = new KafkaProducer(properties); for (long i = 1; i <= 10000000L; i++) { ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i); producer.send(record); } producer.flush(); producer.close(); } }
1천만건의 레코드의 생성을 시도하였고, 생성된 레코드는 9999166 입니다.
834 개의 데이터 손실이 발생하였습니다.
Retries 설정을 추가함으로써 손실의 비율이 확연히 줄어들긴 합니다.
acks all & retries 2147483647.
이번에는 데이터의 무손실을 위해서 acks 설정을 추가해보도록 하겠습니다.
kafka-topics --bootstrap-server localhost:9092 \ --create --topic topic-acks-all-retries-2147483647 \ --partitions 3 --replication-factor 3 \ --config min.insync.replicas=2
package com.westlife.producers; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleKafkaProducer { public static void main (String[] args) { String topic = "topic-acks-all-retries-2147483647"; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "2147483647"); KafkaProducer<String, String> producer = new KafkaProducer(properties); for (long i = 1; i <= 100000L; i++) { ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i); producer.send(record); } producer.flush(); producer.close(); } }
100만개의 레코드를 생성합니다.
Acks 와 Retries 설정만으로도 손실없이 데이터를 생성할 수 있습니다.
request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우.
request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우에 데이터의 중복이 발생할 수 있습니다.
먼저 request.timeout.ms 는 Producer 가 레코드를 생성하는 시간 범위입니다.
request.timeout.ms 동안에 메시지의 생성이 실패한다면 지정된 retries 횟수만큼 재시도를 하게 됩니다.
그래서 request.timeout.ms 동안에 메시지 생성에 성공하지 못하면 더 이상 retry 하지 않고 메시지 생성에 실패하게 됩니다.
그리고 replica.lag.time.max.ms 는 Replica Broker 가 Leader Broker 의 Partition 을 복제하는 주기입니다.
일반적으로 replica.lag.time.max.ms 는 30초를 기본값으로 가집니다.
즉, 최대 30초를 주기로 Replica Broker 는 Leader Broker 의 Partition 을 복제합니다.
문제 상황은 request.timeout.ms 가 replica.lag.time.max.ms 보다 짧은 경우에 아직 복제를 수행할 시간이 아닌데,
Producer 는 복제가 시작하기 이전에 데이터 생성이 실패하였다고 새롭게 Retry 하게 됩니다.
그래서 데이터 중복을 방지하기 위해서 request.timeout.ms 를 replica.lag.time.max.ms 보다 큰 값으로 설정해야합니다.
kafka-topics --bootstrap-server localhost:9092 \ --create --topic topic-request-timeout-10 \ --partitions 3 --replication-factor 3 \ --config min.insync.replicas=2
package com.westlife.producers; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import java.time.Duration; import java.util.*; public class SimpleKafkaProducer { public static void main (String[] args) { String topic = "topic-request-timeout-10"; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "" + Integer.MAX_VALUE); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 5); KafkaProducer<String, String> producer = new KafkaProducer(properties); for (long i = 1; i <= 100000L; i++) { ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + i); producer.send(record, (metadata, exception) -> { if (exception != null) { System.out.printf("%s %s %s\n", metadata.offset(), metadata.partition(), exception.getMessage()); } }); } producer.flush(); producer.close(); } }
10만건의 메시지 생성을 시도하였구요.
아래 이미지처럼 2465 건 만큼이 중복 생성된 것을 확인할 수 있습니다.
반응형'Kafka' 카테고리의 다른 글
Kafka Consumer Configuration 알아보기 (fetch.min.bytes, fetch.max.wait.ms, max.parti (1) 2024.01.12 Kafka Log Compaction 알아보기 (0) 2024.01.12 [Kafka-Streams] mapValues 알아보기 (0) 2024.01.09 [Kafka-Connect] S3 Sink Connector 따라해보기 1 (0) 2024.01.08 [Kafka-Streams] KStream 알아보기 (0) 2024.01.06