ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka-Streams] KStream 알아보기
    Kafka 2024. 1. 6. 06:38
    728x90
    반응형

    - 목차

     

    함께 보면 좋은 글.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    소개.

    이번 글에서 Kafka Streams 의 KStream 에 대해서 알아보려고 합니다.

    KStream카프카 스트림 프로세싱 어플리케이션에서 mutation 되는 Stream 의 단위입니다.

    그러니깐 끊임없이 이벤트가 유입되는 스트림 환경에서 여러 Transformation 기능들이 적용되는데요.

    KStream 은 map, filter, flatMap 등의 Transformation 에 의해서 구분되는 Stream 의 단위입니다.

     

    KStream 은 순서대로 정렬됩니다.

    이전 단계의 KStream 은 다음 단계의 KStream 에게 Kafka Record 를 하나씩 전송합니다.

    KStream 은 이전 단계의 KStream 에게서 전달받은 Record 를 작성된 로직에 맞게 수정하게 되죠.

    그리고 Processing 을 마친 KStream 은 다음 단계의 KStream 에게로 Record 를 전달합니다.

     

     

    토폴로지 (Topology).

    대부분의 데이터 처리 또는 스트림 처리 어플리케이션들은 토폴로지라는 개념을 가집니다.

    Source 부터 Transformation 을 거처 Sink 까지 이어지는 데이터 처리 흐름은 그래프로 간단히 표현하게 됩니다.

    이를 DAG, 실행 계획 등으로 표현하기도 하죠.

    아래 이미지는 Flink, Spark 에서 사용되는 Execution Plan 을 도식화한 예시들입니다.

    apache flink execution plan

     

    apache spark execution plan

     

    카프카 스트림 또한 Topology 라는 명칭으로 이러한 실행 계획을 사용하죠.

     

    이러한 구조를 가지는 큰 이유 중의 하나는 데이터 처리 또는 스트림 처리 어플리케이션을 보통 분산 환경에서 동작하기 때문입니다.

    그래서 Task 라고 불리는 가장 작은 데이터 처리 단위가 분산 컴퓨팅 환경으로 흩어져 있으며, 이들은 네트워크적으로 연결됩니다.

    그래서 Topology 라는 정확한 지도 또는 구성이 필요하며, Task 들은 연결됩니다.

     

    아래 이미지는 카프카 스트림의 토폴로지는 표현하는 대표적인 이미지 예시입니다.

    Source Node -> Processor Node -> Sink Node 로 이어지며,

    각 Node 가 KStream 에 대응한다고 가볍게 생각하시면 됩니다.

     

     

     

    Immutable.

    각 노드는 토폴로지에 따라 다음 노드에게 데이터를 전달합니다.

    이 과정에서 반드시 데이터의 복사본을 다음 노드에게 전달하게 됩니다.

    그래서 원본 데이터의 변형을 발생하지 않습니다.

    이는 분산 컴퓨팅 환경에서 본다면 매우 자연스러운 현상입니다.

    동일 서버에서의 프로세스 끼리의 통신이나 원격 서버 간의 통신에서는 반드시 직렬화는 필수입니다.

    그리고 직렬화의 대상은 byte 로 표현되는 데이터 그 자체이지 Java Object 의 Reference 같은 개념은 결코 아닙니다.

    그래서 Processor Node 에서 처리하게 되는 데이터는 이전 노드가 처리했던 원본 데이터에 영향을 주지 않습니다.

    이러한 관점에서 Node 또는 KStream 은 Immutable 한 상태를 유지하게 됩니다.

     

    구현해보기.

    kafka streams 의 KStream 을 간단히 구현해보도록 하겠습니다.

    아래 이미지와 같은 구조의 스트림 프로세싱 어플리케이션입니다.

    "words" Topic 으로부터 데이터를 입력받습니다.

    "words" Topic 은 랜덤한 길이의 난수 데이터를 저장합니다.

    그리고 Uppercase Mapperlong words Filter 를 통해서 프로세싱됩니다.

    마지막으로 "words-output" Topic 으로 프로세싱된 데이터들이 저장됩니다.

     

    package com.westlife.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;
    
    import java.util.Properties;
    
    public class KafkaStreamExample {
      public static void main (String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "words-processing-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29093,localhost:29093");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("words");
        KStream<String, String> uppercased = source.mapValues(value -> value.toUpperCase());
        KStream<String, String> longWordFiltered = uppercased.filter((key, value) -> value.length() < 10);
        longWordFiltered.to("words-out", Produced.with(Serdes.String(),Serdes.String()));
    
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
      }
    }

     

    위 코드 예시에서 볼 수 있듯이,

    KStream 은 Transformation 단위로 분리됩니다.

    mapValues 를 경계로 source stream 과 uppercased stream 이 나뉘어집니다.

    filter 또한 uppercased stream 과 longwordFiltered stream 을 나누게 되죠.

     

     

     

     

    반응형
Designed by Tistory.