-
[Flink] KafkaSource Connector 알아보기Flink 2024. 2. 4. 15:08728x90반응형
- 목차
들어가며.
이번 글에서는 Flink DataStream 에서 사용할 수 있는 Kafka Source Connector 에 대해서 알아보도록 하겠습니다.
Flink 의 Kafka Connector 를 구현한 Source Class 는 두가지가 있습니다.
하나는 FlinkKafkaConsumer 이고, 다른 하나는 KafkaSource 입니다.
Flink 1.14 버전 이후부터 KafkaSource 가 추가되었구요.
setBounded 설정을 통해서 Unbounded 가 아닌 Bounded 하게 Kafka Message 를 처리할 수 있고,
특정 Timestamp 부터 Message 를 조회하는 기능이 추가되었습니다.
참고로 Flink 1.17 버전 이후부터 FlinkKafkaConsumer 는 더 이상 사용할 수 없게 된 점도 함께 알려드립니다.
테스트를 위한 Kafka Cluster 생성 및 Kafka Producer 구현.
먼저 Flink 의 KafkaSource 를 구현하기 위해서 Kafka 세팅을 먼저 진행하겠습니다.
아래 링크는 제가 과거에 작성한 Kafka Cluster 를 도커로 구현한 자료의 링크입니다.
https://westlife0615.tistory.com/474
저는 "test-topic" 이라는 이름의 Topic 을 만들었구요. Partition 은 4개로 설정하였습니다.
그리고 아래 코드는"test-topic" 토픽에 메시지를 생성하기 위한 간단한 KafkaProducer 관련 함수입니다.
kafka-clients 3.2.3 버전의 패키지를 사용하였습니다.
private static void produceTestMessages() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); String topic = "test-topic"; KafkaProducer producer = new KafkaProducer(properties); int counter = 0; while (counter < 1000000) { counter++; ProducerRecord<String, String> record = new ProducerRecord(topic, null, "" + counter); producer.send(record); } producer.flush(); producer.close(); }
KafkaSource 만들어보기.
먼저 저는 Flink 1.16 버전을 사용하였구요.
아래는 Kafka Source 를 사용하기 위한 Dependency 들입니다.
dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' // https://mvnrepository.com/artifact/org.apache.flink/flink-clients implementation group: 'org.apache.flink', name: 'flink-clients', version: '1.16.1' // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java implementation group: 'org.apache.flink', name: 'flink-streaming-java', version: '1.16.1' // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka implementation group: 'org.apache.flink', name: 'flink-connector-kafka', version: '1.16.1' }
flink-streaming-java 는 DataStream API 를 사용하기 위해서 필요한 모듈입니다.
그리고 flink-clients 는 로컬 환경에서 Flink DataStream 프로그램을 구동시키기 위해서 필요한 모듈입니다.
Standalone Mode 로 Flink 프로그램을 실행시켜줍니다.
그리고 flink-connector-kafka 는 Kafka Connector 를 구현한 모듈이며, 내부적으로 kafkk-clients 모듈을 내장하고 있습니다.
flink-connector-kafka 1.16.1 버전의 경우에는 kafka-clients:3.2.3 버전을 내장하고 있습니다.
KafkaSource 를 구현하는 기본적인 코드 양식은 아래와 같습니다.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("localhost:29091,localhost:29092,localhost:29093") .setTopics("test-topic") .setGroupId("test-topic-consumer-group") .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") .build(); DataStream<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source"); source.print(); env.execute(); }
< 출력 내용 >
// 이하 생략 10> 996823 10> 996824 10> 996825 10> 996826 10> 996827 // 이하 생략
< 실행되는 Stream Graph >
반응형'Flink' 카테고리의 다른 글
[Kryo] Kyro Serialization 알아보기 (0) 2024.02.05 [Flink] FileSource 알아보기 (json, csv, parquet, avro) (0) 2024.02.04 [Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap) (0) 2024.01.25 Flink Window 이해하기 (0) 2024.01.13 Flink StreamGraph 알아보기 (0) 2024.01.11