ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Producer] max.in.flight.requests.per.connection 알아보기
    Kafka/Kafka Producer 2024. 9. 13. 06:47
    반응형

    - 목차

     

    Record, Batch, ProduceRequest 의 관계.

    Kafka Producer 는 send() 라는 함수를 사용하여 개별적인 Record 를 Produce 할 수 있게 설계되어 있습니다.

    그래서 사용자의 레벨에서는 단일 Record 를 처리하는 것처럼 노출됩니다.

    하지만 그 내부에서는 Batch 단위로 Record 들이 모이게 되고, 실질적으로 Producer 가 Broker 에게 데이터를 전송할 때에는 Batch 단위로 전송되게 됩니다.

    이와 관련된 설정으로 batch.size, linger.ms 등이 사용됩니다.

     

    batch.size 설정은 하나의 Batch 를 구성하는 최대 용량 기준이며, linger.ms 는 하나의 Batch 를 생성하는 시간적인 기준입니다.

    일반적으로 linger.ms 는 0ms 로 설정하고, batch.size 는 16KB 로 설정되곤 합니다.

    최초로 Topic/Partition 의 레코드를 저장하기 위한 Batch 가 생성된 후에 Producer.send 함수를 통해서 Batch 내부로 Record 들이 누적됩니다.

    Batch 가 생성된 시점을 기준으로 linger.ms 가 지나게 되면 batch.size 만큼 레코드가 쌓이지 않아도 Batch 의 Append 작업이 완료되어 전송 가능한 상태가 되고 새로운 Batch 가 생성됩니다.

    이러한 과정을 통해서 Record 가 Batch 에 누적되고, Batch 는 전송될 준비를 마치게 됩니다.

     

    ProduceRequest 는 이러한 Batch 들을 Broker 에게 전송하기 위한 실질적은 객체입니다.

    ProduceRequest 는 Topic/Partition 별 Batch 들을 모아서 Produce API 를 통해 Broker 에게 전달됩니다.

    즉, Produce API 의 실질적인 Data 이자 이번 글의 주제인 max.in.flight.requests.per.connection 과 관련이 깊습니다.

     

    Producer 와 Broker 의 TCP connection.

    아래의 내용은 lsof 명령어의 출력 결과입니다.

    producer1 이라는 KafkaProducer 가 2개의 Broker 와 연결된 상태이며, 이에 대한 TCP Socket 이 출력된 결과입니다.

    COMMAND PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
    python    1 root    6u  IPv4  19971      0t0  TCP producer1:60882->kafka2.kafka-net:9091 (ESTABLISHED)
    python    1 root    7u  IPv4  19969      0t0  TCP producer1:35164->kafka1.kafka-net:9091 (ESTABLISHED)

     

     

    KafkaProducer 가 실행되게 되면, Producer 가 Record 를 추가해야할 Topic/Partition 의 Leader Broker 와 위와 같이 연결됩니다.

    저의 경우에는 아래와 같이 test-topic 이라는 Topic 이 2개의 Partition 을 가지는 상태이며, 각 Partition 은 2개의 Broker 가 하나씩 Leader Broker 로 지정되어 있습니다.

     

    그리하여 아래의 Kafka Producer 를 실행하게 되면 2개의 TCP Connection 또는 Socket 이 생성되게 됩니다.

     

    cat <<'EOF'> /tmp/random_producer.py
    import json
    import random
    import time
    from datetime import datetime
    from kafka import KafkaProducer
    
    KAFKA_BOOTSTRAP_SERVERS = 'kafka1:9091,kafka2:9091'
    KAFKA_TOPIC = 'test-topic'
    
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None
    )
    
    def generate_random_event():
        return {
            "event_id": random.randint(1000, 9999),
            "user_id": f'user-{random.randint(1000, 9999)}',
            "amount": round(random.uniform(10, 1000), 2)
        }
    
    for _ in range(100):
        event = generate_random_event()
        print(f"Producing: {event}")
        producer.send(KAFKA_TOPIC, value=event)
        time.sleep(1)
    
    producer.flush()
    producer.close()
    EOF
    
    docker run -it --rm --name producer1 --hostname producer1 --network kafka-net -v /tmp/random_producer.py:/tmp/random_producer.py python:3.9 bash -c "
    pip install kafka-python && python /tmp/random_producer.py
    "

     

     

    max.in.flight.requests.per.connection ?

    max.in.flight.requests.per.connection 은 하나의 TCP Connection 내에서 전달 가능한 ProduceRequest 의 갯수를 제한하는 설정입니다.

    먼저 Inflight 라는 용어에 대해서 알아보겠습니다.

     

    In Flight Requests 란 ?

    네트워크 IO 를 취급하는 Application 에서 In Flight Request 라는 것은 아직까지 응답을 받지 못한 네트워크 요청을 의미합니다.

    즉, Kafka Producer 의 관점에서 Acks 를 응답받지 못한 Request 를 의미합니다.

    그래서 결론적으로 max.in.flight.requests.per.connection 은 Acks 응답을 받지 않은 In Flight Request 의 갯수 제한을 의미하게 됩니다.

     

    Producer 와 Leader Broker 사이의 max.in.flight.requests.per.connection

    max.in.flight.requests.per.connection 은 2 로 설정된 상태를 가정하겠습니다.

    이 상태에서 Producer 는 하나의 Broker 에게 max.in.flight.requests.per.connection 인 2개의 동시 요청이 가능합니다.

    그리하여 TCP Connection 을 기준으로 한번에 2개의 Produce API Request 전송이 가능해집니다.

    만약 이 두 Request 가 Acks 응답을 받지 못하게 되면 그 이후 추가적인 Produce API Request 는 전송이 불가능해집니다.

    왜냐하면 동시에 전송 가능한 Produce API Request 의 최대 갯수가 2개로 제한되기 때문입니다.

     

    Acks=0 인 Producer 와 max.in.flight.requests.per.connection

    만약 acks=1 또는 -1 이 아닌 acks=0 인 Producer 는 Fire and Forget 방식으로 동작을 하게 됩니다.

    이 경우에는 max.in.flight.requests.per.connection 의 제약을 받지 않게 됩니다.

    왜냐하면 Broker 로부터 acks 응답을 받지 않는 상태이며, 생성되는 모든 BatchRecord 들은 곧바로 Broker 에게 전송되게 됩니다.

     

    max.in.flight.requests.per.connection 과 데이터 순서 보장

    일반적으로 max.in.flight.requests.per.connection 의 기본값은 5로 설정됩니다.

    즉, Acks 응답을 받지 않은 InFlight Request 의 최대 갯수가 5개입니다.

    이 경우에 Broker 의 처리 속도에 의해서 최대 5개의 Batch Record 들은 생성 순서가 보장되지 않을 수 있습니다.

    왜냐하면 최대 5개의 Produce Request 내부의 레코드들이 언제 브로커에 의해서 처리될지 알 수 없기 때문입니다.

    만약에 Producer 에서 Send 함수를 통해서 전달하는 레코드의 순서를 반드시 보장해야한다면, max.in.flight.requests.per.connection 을 1 로 고정하는 대안이 존재합니다.

     

     

     

     

    유용한 명령어.

    카프카 실행.

    docker network create kafka-net
    
    docker run -d --name zookeeper --hostname zookeeper --net kafka-net \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      confluentinc/cp-zookeeper:7.6.4
    
    docker run -d --name kafka1 --hostname kafka1 --net kafka-net \
      -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
      -e KAFKA_CFG_BROKER_ID=1 \
      -e ALLOW_PLAINTEXT_LISTENER=yes \
      -e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INNER://:9091,OUTER://:19092 \
      -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,INNER://kafka1:9091,OUTER://localhost:19092 \
      -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INNER:PLAINTEXT,OUTER:PLAINTEXT \
      -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
      -p 19092:19092 \
      bitnami/kafka:3.1.2
      
    docker run -d --name kafka-ui --net kafka-net \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=cluster \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    tcpdump -i eth0 'dst host kafka1 and not src host kafka2' -X \
    | grep -E '0x0030\:\s+([0-9a-f]{4}\s+){4}(0000)'

     

     

    반응형
Designed by Tistory.