ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap)
    Flink 2024. 1. 25. 21:07
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Flink DataStream API 의 기본적인 Stateless Operator 들에 대해서 알아보려고 합니다.

    살펴볼 대상은 Map, Filter, Flatmap Operator 입니다.

    먼저 Stateless Operator 에 대해서 간단히 알아보도록 하겠습니다.

     

    Stateless Operator.

    Stream Processing Application 영역에서 Stateful 과 Stateless 라는 두가지 처리 기법이 존재합니다.

    Stateless 는 Event-by-Event 형태로 데이터를 처리하는 일반적으로 기법을 의미합니다.

    즉, 단일 Event 하나를 처리하는데에만 집중합니다.

    다른 표현으로는 과거의 상태가 현재의 Event 처리에 영향을 주지 않습니다.

    예를 들어볼까요 ?

     

    만약 1 부터 1000 까지의 숫자가 존재할 때에 홀수는 1을 더하고, 짝수는 2 로 나누는 프로그램이 존재한다고 가정해보겠습니다.

    이를 Psuedo Code 로 나타내면 아래와 같이 나타낼 수 있습니다.

    Stream = IntStream.range(1, 1000);
    ProcessedStream = Stream.map(num -> isOdd(num) ? num + 1 : num % 2);

     

    이와 같은 경우는 단일 Event 의 처리에 집중합니다.

    이벤트가 홀수인지 짝수인지에 초점을 맞추고, 그에 상응하는 Transformation 을 수행합니다.

    과거의 상태가 현재의 Event 처리에 영향을 전혀 주지 않습니다.

     

    반면 Stateful 한 데이터 처리 요구사항이 있을 수 있습니다.

    첫번째 Event 부터 1만번째 Event 까지만 위와 같은 처리를 수행하고, 그 이외의 데이터들은 처리하지 않는다.

    이를 Psuedo Code 로 표현하면 아래와 같습니다.

    아래의 프로그램은 Stateful 한 데이터 처리 프로그램입니다.

    왜냐하면 처리된 Event 들의 누적된 갯수가 Transformation 에 영향을 끼치기 때문입니다.

    즉, 과거의 상태가 현재의 데이터 처리에 영향을 끼칩니다.

    ProcessedCount = 0;
    Stream = IntStream.range(1, 1000);
    FilteredStream = Stream.filter( _ -> ProcessedCount <= 100000 );
    ProcessedStream = FilteredStream.map(num -> {
    
        ProcessedCount = ProcessedCount + 1;
        
        boolean oddOrEvent = isOdd(num);
        
        if (oddOrEvent) {
        	return num + 1;
        } else {
            return num % 2;
        }
        
    });

     

    Stateless 와 Stateful 을 비교하기 위한 단순한 예시를 들긴 했지만, 이러한 컨셉이 둘을 구분짓는 큰 요소입니다.

     

    Stateless Transformation.

    Stateless Transformation 은 아래 3가지 중 하나 이상의 기준을 만족해야합니다.

    첫번째로 Input Event 와 Output Event 의 갯수가 달라져야 합니다.

    두번째로 Input Event 의 값과 Output Event 의 값이 달라져야 합니다.

    마지막으로 Input Event 의 DataType 과 Output Event 의 DataType 이 달라져야합니다.

    즉, 이벤트의 갯수 또는 값과 데이터 타입이 변경된다면 이는 Transformation 이라고 부를 수 있습니다.

     

    Map Operator.

    Map Operator 는 입력 데이터의 수와 출력 데이터의 수는 일치합니다.

    즉, 일대일 관계를 반드시 유지합니다.

    대신 Event 의 Value 혹은 Data Type 이 변경될 수 있습니다.

     

    IntStream 을 읽어들여 간단한 Transformation 을 수행하는 Map Operator 를 구현해보겠습니다.

     

    < Event Value 가 변경되는 케이스 >

      public static void main (String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<Integer> source = env.fromCollection(
                IntStream.range(0, 10).boxed().collect(Collectors.toList())
        );
        DataStream<Integer> mappedStream = source.map(num -> {
          if (num % 2 == 0) return num + 1;
          else return num / 2;
        });
        mappedStream.print();
        env.execute();
      }

     

    < Event DataType 이 변경되는 케이스 >

      public static void main (String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<Integer> source = env.fromCollection(
                IntStream.range(0, 10).boxed().collect(Collectors.toList())
        );
        DataStream<String> mappedStream = source.map(num -> String.valueOf(num));
        mappedStream.print();
        env.execute();
      }

     

     

    Filter Operator.

    Filter Operator 는 입력 이벤트의 수와 출력 이벤트의 수가 달라집니다.

    Filter Operator 를 거치게 되면서 더 이상 일대일 관계가 성립되지 않게 됩니다.

    대신 Value 또는 DataType 이 변경되어도 되고, 유지되어도 됩니다.

    Filter Operator 의 큰 의의는 입력 이벤트 중의 일부를 제외시키는데에 존재합니다.

     

    보통 데이터 처리의 요구사항에 합당하지 않는 데이터를 제외시키는 용도로 활용되곤 합니다.

     

    < 5 보다 큰 값을 제외시키는 Filter Operator >

      public static void main (String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<Integer> source = env.fromCollection(
                IntStream.range(0, 10).boxed().collect(Collectors.toList())
        );
        DataStream<String> mappedStream = source
                .filter(num -> num < 5)
                .map(num -> String.valueOf(num));
        mappedStream.print();
        env.execute();
      }

     

     

    FlatMap Operator.

    FlatMap Transformation 은 N:M 의 입력 데이터와 출력 데이터의 비율을 가집니다.

    다른 식으로 표현하면 하나의 입력 값에 대해서 0개 이상의 데이터를 출력합니다.

    어떠한 출력 데이터도 생성하지 않을 수 있고, 많은 양의 데이터를 생성할 수도 있습니다.

    예를 들어보면 아래와 같습니다.

     

    아래 예시는 입력된 수치보다 작은 모든 자연수를 출력하는 FlatMap Transformation 을 구현한 케이스입니다.

     

      public static void main (String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<Integer> source = env.fromCollection(
                IntStream.range(0, 10).boxed().collect(Collectors.toList())
        );
        DataStream<Integer> mappedStream = source
                .flatMap(new FlatMapFunction<Integer, Integer>() {
                  @Override
                  public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                    while (value > 0) {
                      out.collect(value);
                      value--;
                    }
                  }
                });
        mappedStream.print();
        env.execute();
      }

     

    출력 아래처럼 입력 데이터보다 작은 자연수들을 내림차순으로 출력합니다.

    결과 내용이 조금 생략되긴 했지만, 10>, 9> 와 같이 > 등호의 앞 부분은 병렬적으로 수행된 SubTask 의 번호를 뜻합니다.

     

    10> 9  9> 8  8> 7  7> 6  6> 5
    10> 8  9> 7  8> 6  7> 5  6> 4
    10> 7  9> 6  8> 5  7> 2  6> 3
    10> 6  9> 5  8> 4  7> 1  6> 2
    10> 5  9> 4  8> 3  7> 4  6> 1
    10> 4  9> 3  8> 2  7> 3
    10> 3  9> 2  8> 1
    10> 2  9> 1
    10> 1

     

     

    마치며.

    대표적인 Stateless Transformation 인 Map, Filter, FlatMap 에 대해서 알아보았습니다.

    저는 편의를 위해서 모든 Transformation 의 구현을 Lambda Function 과 익명클래스로 구현하였는데요.

    다음 글에서는 각 Operator 를 구현하는 다양한 방법들에 대해서 알아보도록 하겠습니다.

    언제든 질문주시면 답변해드리도록 하겠습니다.

    감사합니다.

     

    반응형

    'Flink' 카테고리의 다른 글

    [Flink] FileSource 알아보기 (json, csv, parquet, avro)  (0) 2024.02.04
    [Flink] KafkaSource Connector 알아보기  (0) 2024.02.04
    Flink Window 이해하기  (0) 2024.01.13
    Flink StreamGraph 알아보기  (0) 2024.01.11
    Flink State 알아보기  (0) 2024.01.10
Designed by Tistory.