-
[Kafka Producer] max.in.flight.requests.per.connection 알아보기Kafka/Kafka Producer 2024. 9. 13. 06:47반응형
- 목차
Record, Batch, ProduceRequest 의 관계.
Kafka Producer 는 send() 라는 함수를 사용하여 개별적인 Record 를 Produce 할 수 있게 설계되어 있습니다.
그래서 사용자의 레벨에서는 단일 Record 를 처리하는 것처럼 노출됩니다.
하지만 그 내부에서는 Batch 단위로 Record 들이 모이게 되고, 실질적으로 Producer 가 Broker 에게 데이터를 전송할 때에는 Batch 단위로 전송되게 됩니다.
이와 관련된 설정으로 batch.size, linger.ms 등이 사용됩니다.
batch.size 설정은 하나의 Batch 를 구성하는 최대 용량 기준이며, linger.ms 는 하나의 Batch 를 생성하는 시간적인 기준입니다.
일반적으로 linger.ms 는 0ms 로 설정하고, batch.size 는 16KB 로 설정되곤 합니다.
최초로 Topic/Partition 의 레코드를 저장하기 위한 Batch 가 생성된 후에 Producer.send 함수를 통해서 Batch 내부로 Record 들이 누적됩니다.
Batch 가 생성된 시점을 기준으로 linger.ms 가 지나게 되면 batch.size 만큼 레코드가 쌓이지 않아도 Batch 의 Append 작업이 완료되어 전송 가능한 상태가 되고 새로운 Batch 가 생성됩니다.
이러한 과정을 통해서 Record 가 Batch 에 누적되고, Batch 는 전송될 준비를 마치게 됩니다.
ProduceRequest 는 이러한 Batch 들을 Broker 에게 전송하기 위한 실질적은 객체입니다.
ProduceRequest 는 Topic/Partition 별 Batch 들을 모아서 Produce API 를 통해 Broker 에게 전달됩니다.
즉, Produce API 의 실질적인 Data 이자 이번 글의 주제인 max.in.flight.requests.per.connection 과 관련이 깊습니다.
Producer 와 Broker 의 TCP connection.
아래의 내용은 lsof 명령어의 출력 결과입니다.
producer1 이라는 KafkaProducer 가 2개의 Broker 와 연결된 상태이며, 이에 대한 TCP Socket 이 출력된 결과입니다.
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME python 1 root 6u IPv4 19971 0t0 TCP producer1:60882->kafka2.kafka-net:9091 (ESTABLISHED) python 1 root 7u IPv4 19969 0t0 TCP producer1:35164->kafka1.kafka-net:9091 (ESTABLISHED)KafkaProducer 가 실행되게 되면, Producer 가 Record 를 추가해야할 Topic/Partition 의 Leader Broker 와 위와 같이 연결됩니다.
저의 경우에는 아래와 같이 test-topic 이라는 Topic 이 2개의 Partition 을 가지는 상태이며, 각 Partition 은 2개의 Broker 가 하나씩 Leader Broker 로 지정되어 있습니다.

그리하여 아래의 Kafka Producer 를 실행하게 되면 2개의 TCP Connection 또는 Socket 이 생성되게 됩니다.
cat <<'EOF'> /tmp/random_producer.py import json import random import time from datetime import datetime from kafka import KafkaProducer KAFKA_BOOTSTRAP_SERVERS = 'kafka1:9091,kafka2:9091' KAFKA_TOPIC = 'test-topic' producer = KafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None ) def generate_random_event(): return { "event_id": random.randint(1000, 9999), "user_id": f'user-{random.randint(1000, 9999)}', "amount": round(random.uniform(10, 1000), 2) } for _ in range(100): event = generate_random_event() print(f"Producing: {event}") producer.send(KAFKA_TOPIC, value=event) time.sleep(1) producer.flush() producer.close() EOF docker run -it --rm --name producer1 --hostname producer1 --network kafka-net -v /tmp/random_producer.py:/tmp/random_producer.py python:3.9 bash -c " pip install kafka-python && python /tmp/random_producer.py "max.in.flight.requests.per.connection ?
max.in.flight.requests.per.connection 은 하나의 TCP Connection 내에서 전달 가능한 ProduceRequest 의 갯수를 제한하는 설정입니다.
먼저 Inflight 라는 용어에 대해서 알아보겠습니다.
In Flight Requests 란 ?
네트워크 IO 를 취급하는 Application 에서 In Flight Request 라는 것은 아직까지 응답을 받지 못한 네트워크 요청을 의미합니다.
즉, Kafka Producer 의 관점에서 Acks 를 응답받지 못한 Request 를 의미합니다.
그래서 결론적으로 max.in.flight.requests.per.connection 은 Acks 응답을 받지 않은 In Flight Request 의 갯수 제한을 의미하게 됩니다.
Producer 와 Leader Broker 사이의 max.in.flight.requests.per.connection
max.in.flight.requests.per.connection 은 2 로 설정된 상태를 가정하겠습니다.
이 상태에서 Producer 는 하나의 Broker 에게 max.in.flight.requests.per.connection 인 2개의 동시 요청이 가능합니다.
그리하여 TCP Connection 을 기준으로 한번에 2개의 Produce API Request 전송이 가능해집니다.
만약 이 두 Request 가 Acks 응답을 받지 못하게 되면 그 이후 추가적인 Produce API Request 는 전송이 불가능해집니다.
왜냐하면 동시에 전송 가능한 Produce API Request 의 최대 갯수가 2개로 제한되기 때문입니다.
Acks=0 인 Producer 와 max.in.flight.requests.per.connection
만약 acks=1 또는 -1 이 아닌 acks=0 인 Producer 는 Fire and Forget 방식으로 동작을 하게 됩니다.
이 경우에는 max.in.flight.requests.per.connection 의 제약을 받지 않게 됩니다.
왜냐하면 Broker 로부터 acks 응답을 받지 않는 상태이며, 생성되는 모든 BatchRecord 들은 곧바로 Broker 에게 전송되게 됩니다.
max.in.flight.requests.per.connection 과 데이터 순서 보장
일반적으로 max.in.flight.requests.per.connection 의 기본값은 5로 설정됩니다.
즉, Acks 응답을 받지 않은 InFlight Request 의 최대 갯수가 5개입니다.
이 경우에 Broker 의 처리 속도에 의해서 최대 5개의 Batch Record 들은 생성 순서가 보장되지 않을 수 있습니다.
왜냐하면 최대 5개의 Produce Request 내부의 레코드들이 언제 브로커에 의해서 처리될지 알 수 없기 때문입니다.
만약에 Producer 에서 Send 함수를 통해서 전달하는 레코드의 순서를 반드시 보장해야한다면, max.in.flight.requests.per.connection 을 1 로 고정하는 대안이 존재합니다.
유용한 명령어.
카프카 실행.
docker network create kafka-net docker run -d --name zookeeper --hostname zookeeper --net kafka-net \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:7.6.4 docker run -d --name kafka1 --hostname kafka1 --net kafka-net \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_CFG_BROKER_ID=1 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \ -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -p 19092:19092 \ bitnami/kafka:3.1.2 docker run -d --name kafka-ui --net kafka-net \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=cluster \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2tcpdump -i eth0 'dst host kafka1 and not src host kafka2' -X \ | grep -E '0x0030\:\s+([0-9a-f]{4}\s+){4}(0000)'반응형'Kafka > Kafka Producer' 카테고리의 다른 글
[Kafka] Producer 의 EndTxn API 알아보기 ( Commit or Abort , Transaction Coordinator ) (0) 2024.07.04 [Kafka] FindCoordinator 와 Transaction Coordinator 알아보기 (0) 2024.06.29 [Kafka] Transaction 과 Commit / Abort Maker 알아보기 (0) 2024.06.29 [Kafka] Producer 와 Idempotence 알아보기 ( InitProducerId, Epoch, Sequence Number ) (0) 2024.06.25 [Kafka] request.timeout.ms 와 데이터 중복 생성 알아보기 (0) 2024.06.25