ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] KafkaSource Connector 알아보기
    Flink 2024. 2. 4. 15:08
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Flink DataStream 에서 사용할 수 있는 Kafka Source Connector 에 대해서 알아보도록 하겠습니다.

    Flink 의 Kafka Connector 를 구현한 Source Class 는 두가지가 있습니다.

    하나는 FlinkKafkaConsumer 이고, 다른 하나는 KafkaSource 입니다.

    Flink 1.14 버전 이후부터 KafkaSource 가 추가되었구요.

    setBounded 설정을 통해서 Unbounded 가 아닌 Bounded 하게 Kafka Message 를 처리할 수 있고,

    특정 Timestamp 부터 Message 를 조회하는 기능이 추가되었습니다.

    참고로 Flink 1.17 버전 이후부터 FlinkKafkaConsumer 는 더 이상 사용할 수 없게 된 점도 함께 알려드립니다.

     

    테스트를 위한 Kafka Cluster 생성 및 Kafka Producer 구현.

    먼저 Flink 의 KafkaSource 를 구현하기 위해서 Kafka 세팅을 먼저 진행하겠습니다.

     

    아래 링크는 제가 과거에 작성한 Kafka Cluster 를 도커로 구현한 자료의 링크입니다.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    저는 "test-topic" 이라는 이름의 Topic 을 만들었구요. Partition 은 4개로 설정하였습니다.

     

     

    그리고 아래 코드는"test-topic" 토픽에 메시지를 생성하기 위한 간단한 KafkaProducer 관련 함수입니다.

    kafka-clients 3.2.3 버전의 패키지를 사용하였습니다.

      private static void produceTestMessages() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        String topic = "test-topic";
        KafkaProducer producer = new KafkaProducer(properties);
        int counter = 0;
        while (counter < 1000000) {
          counter++;
          ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + counter);
          producer.send(record);
        }
        producer.flush();
        producer.close();
      }

     

     

    KafkaSource 만들어보기.

    먼저 저는 Flink 1.16 버전을 사용하였구요.

    아래는 Kafka Source 를 사용하기 위한 Dependency 들입니다.

    dependencies {
        testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
        testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
    
        // https://mvnrepository.com/artifact/org.apache.flink/flink-clients
        implementation group: 'org.apache.flink', name: 'flink-clients', version: '1.16.1'
        // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
        implementation group: 'org.apache.flink', name: 'flink-streaming-java', version: '1.16.1'
        // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
        implementation group: 'org.apache.flink', name: 'flink-connector-kafka', version: '1.16.1'
    }

     

    flink-streaming-java 는 DataStream API 를 사용하기 위해서 필요한 모듈입니다.

    그리고 flink-clients 는 로컬 환경에서 Flink DataStream 프로그램을 구동시키기 위해서 필요한 모듈입니다.

    Standalone Mode 로 Flink 프로그램을 실행시켜줍니다.

    그리고 flink-connector-kafka 는 Kafka Connector 를 구현한 모듈이며, 내부적으로 kafkk-clients 모듈을 내장하고 있습니다.

    flink-connector-kafka 1.16.1 버전의 경우에는 kafka-clients:3.2.3 버전을 내장하고 있습니다.

     

    KafkaSource 를 구현하는 기본적인 코드 양식은 아래와 같습니다.

      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:29091,localhost:29092,localhost:29093")
                .setTopics("test-topic")
                .setGroupId("test-topic-consumer-group")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
                .build();
    
        DataStream<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
        source.print();
        env.execute();
      }

     

    < 출력 내용 >

    // 이하 생략
    10> 996823
    10> 996824
    10> 996825
    10> 996826
    10> 996827
    // 이하 생략

     

     

    < 실행되는 Stream Graph >

     

     

    반응형
Designed by Tistory.