ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka producer
    Kafka 2023. 2. 23. 05:45
    728x90
    반응형

    개요

    카프카를 활용하기 위한 cli 들이 존재합니다.
    예를 들어, kafka-topics, kafka-console-consumer, kafka-console-producer 등이 쉘스크립트 형태로 존재하는데요.
    해당 스크립트들은 카프카 콘솔 명령어라고도 불리며
    1. 토픽을 관리하고,
    2. 메시지의 생성과 소비를 담당하는 등
    여러 명령어들이 존재합니다.
     

    <kafka 가 설치된 상태에서 bin 폴더 아래에 해당 script 들이 존재함.>
    
    kafka-acls
    kafka-broker-api-versions
    kafka-cluster
    kafka-configs
    kafka-console-consumer
    kafka-console-producer
    kafka-consumer-groups
    kafka-consumer-perf-test
    kafka-delegation-tokens
    kafka-delete-records
    kafka-dump-log
    kafka-features
    kafka-leader-election
    kafka-log-dirs
    kafka-metadata-shell
    kafka-mirror-maker
    kafka-preferred-replica-election
    kafka-producer-perf-test
    kafka-reassign-partitions
    kafka-replica-verification
    kafka-run-class
    kafka-server-start
    kafka-server-stop
    kafka-storage
    kafka-streams-application-reset
    kafka-topics
    kafka-verifiable-consumer
    kafka-verifiable-producer

     

    그리고 이러한 콘솔들을 카프카와 연결된 여러 어플리케이션에서 해당 콘솔 명령어를 활용할 수 있도록 라이브라리가 제공되는데, 오늘은 kafka client library 에 대해서 알아보고자 합니다.

    1. flink, spark 와 같이 메시지를 생산/소비하는 앱,
    2. kafdrop 처럼 관리하는 앱에서
    해당 라이브러리가 사용됩니다.

    producer 와 관련된 속성

    producer 는 메시지의 생산자입니다. 
    생산자는 대표적인 카프카의 클라이언트입니다. 

    bootstrap.servers

    프로듀서는 메시지의 생산자입니다.
    프로듀서에서 생성된 메시지는 카프카에게 전송됩니다.
    메시지는 네트워크 통신을 통하여 브로커에게 전달되며, 이때 생산자와 브로커가 연결되어 있어야합니다.
    bootstrap.servers 는 브로커의 네트워크 주소인데요.
    host:port 형태로 프로듀서의 속성값으로 설정된며, 프로듀서와 브로커는 연결됩니다.

    각각의 브로커들은 리더 - 팔로워의 관계가 아니기 때문에 모든 브로커의 주소를 bootstrap.servers 의 속성값으로 활용하면 하나의 브로커에 이상이 생겼을 때, 메시지는 다른 브로커로 메시지의 손실없이 저장될 수 있습니다.

    acks

    acks 는 생산된 메시지를 얼마나 복제할지를 결정합니다.
    acks 는 일종의 트랜잭션 처리에 가깝습니다. 복제 횟수만큼 트랜잭션을 보장합니다.

    복제의 횟수에 대한 설정값은 여러가지가 있습니다.
    1. 우선적으로 토픽을 생성할 때 replication factor 를 1 이상으로 설정해 복제본이 있는 토픽이 필요하고,
    2. in-sync replica 설정을 통해서도 리더와 팔로워 토픽 사이의 복제 관계를 강제할 수 있습니다.

    또한 acks 를 2이상으로 설정해 복제 상태를 만들 수 있는데요. 이 설정은 메시지의 생산 시점에 메시지의 복제를 유도합니다.

    acks 가 1인 경우, 리더에만 메시지가 저장된다면 메시지 생성은 정상 종료됩니다.

    acks 가 2인 경우, 리더와 팔로워 1에 메시지가 저장되어야 메시지 생성이 저장 종료됩니다. 만약 팔로워에 메시지를 저장하는 과정에서 문제가 발생한다면 해당 요청은 모드 fail 처리되며 생산자의 입장에서 재시도를 할 수 있습니다.

    key.serializer

    생산된 카프카 메시지는 key 와 value 를 가집니다. 
    value 는 실제 메시지의 내용에 해당하는 값이며, key 는 해당 메시지의 식별값입니다. 
    key 의 용도는 메시지가 저장된 파티션의 위치를 지정하는데에 사용됩니다. 
     
    예를 들어, key 가 1인 데이터는 파티션 1에만 저장되고, key가 2인 경우에는 파티션 2에만 저장되는 방식으로 key는 활용됩니다. 
    key 가 null 일수도 있습니다. 
    이 경우에는 Round-Robin 방식으로 파티션에 저장됩니다. 
     
    key.serializer 는 key 에 해당하는 값을 어떻게 직렬화/역직렬화 할지에 대한 방식을 설정합니다. 
    대개
    org.apache.kafka.common.serialization.ByteArraySerializer
    (https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/ByteArraySerializer.html)
    org.apache.kafka.common.serialization.StringSerializer
    (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringSerializer.html)
    등을 사용합니다. 
     
    key 는 단순한 값으로 직렬화/역직렬화 가 까다롭지 않습니다. 
     

    value.serializer

    카프카 메시지는 key 와 value 로 구성되며, 실질적인 메시지의 데이터는 value 에 해당합니다. 
    value 에 해당하는 데이터는 직렬화/역직렬화 가 까다로울 수 있습니다. 
    json, avro, protocol buffer 등의 여러 데이터 형태가 존재하기 때문입니다. 
    그리고 스키마의 변경에 대하여 스키마 레지스트리를 적용해야할 경우도 생깁니다. 
     
    json 과 같은 텍스트 기반 데이터는 key.serializer 와 같이 ByteArraySerializer, StringSerializer 와 같은 직렬화 방식을 사용해도 무방하지만,
    avro, protocol buffer 와 같은 이진 데이터는 특정 직렬화 방식을 명시해주어야합니다. 
    io.confluent.kafka.serializers.KafkaAvroSerializer
    io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
     
     

    retries

    프로듀서가 카프카 메시지를 브로커에 전송 실패하였을 경우, 재시도하는 횟수를 의미합니다. 
     
     
     
     

    반응형
Designed by Tistory.