-
[Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap)Flink 2024. 1. 25. 21:07728x90반응형
- 목차
들어가며.
이번 글에서는 Flink DataStream API 의 기본적인 Stateless Operator 들에 대해서 알아보려고 합니다.
살펴볼 대상은 Map, Filter, Flatmap Operator 입니다.
먼저 Stateless Operator 에 대해서 간단히 알아보도록 하겠습니다.
Stateless Operator.
Stream Processing Application 영역에서 Stateful 과 Stateless 라는 두가지 처리 기법이 존재합니다.
Stateless 는 Event-by-Event 형태로 데이터를 처리하는 일반적으로 기법을 의미합니다.
즉, 단일 Event 하나를 처리하는데에만 집중합니다.
다른 표현으로는 과거의 상태가 현재의 Event 처리에 영향을 주지 않습니다.
예를 들어볼까요 ?
만약 1 부터 1000 까지의 숫자가 존재할 때에 홀수는 1을 더하고, 짝수는 2 로 나누는 프로그램이 존재한다고 가정해보겠습니다.
이를 Psuedo Code 로 나타내면 아래와 같이 나타낼 수 있습니다.
Stream = IntStream.range(1, 1000); ProcessedStream = Stream.map(num -> isOdd(num) ? num + 1 : num % 2);
이와 같은 경우는 단일 Event 의 처리에 집중합니다.
이벤트가 홀수인지 짝수인지에 초점을 맞추고, 그에 상응하는 Transformation 을 수행합니다.
과거의 상태가 현재의 Event 처리에 영향을 전혀 주지 않습니다.
반면 Stateful 한 데이터 처리 요구사항이 있을 수 있습니다.
첫번째 Event 부터 1만번째 Event 까지만 위와 같은 처리를 수행하고, 그 이외의 데이터들은 처리하지 않는다.
이를 Psuedo Code 로 표현하면 아래와 같습니다.
아래의 프로그램은 Stateful 한 데이터 처리 프로그램입니다.
왜냐하면 처리된 Event 들의 누적된 갯수가 Transformation 에 영향을 끼치기 때문입니다.
즉, 과거의 상태가 현재의 데이터 처리에 영향을 끼칩니다.
ProcessedCount = 0; Stream = IntStream.range(1, 1000); FilteredStream = Stream.filter( _ -> ProcessedCount <= 100000 ); ProcessedStream = FilteredStream.map(num -> { ProcessedCount = ProcessedCount + 1; boolean oddOrEvent = isOdd(num); if (oddOrEvent) { return num + 1; } else { return num % 2; } });
Stateless 와 Stateful 을 비교하기 위한 단순한 예시를 들긴 했지만, 이러한 컨셉이 둘을 구분짓는 큰 요소입니다.
Stateless Transformation.
Stateless Transformation 은 아래 3가지 중 하나 이상의 기준을 만족해야합니다.
첫번째로 Input Event 와 Output Event 의 갯수가 달라져야 합니다.
두번째로 Input Event 의 값과 Output Event 의 값이 달라져야 합니다.
마지막으로 Input Event 의 DataType 과 Output Event 의 DataType 이 달라져야합니다.
즉, 이벤트의 갯수 또는 값과 데이터 타입이 변경된다면 이는 Transformation 이라고 부를 수 있습니다.
Map Operator.
Map Operator 는 입력 데이터의 수와 출력 데이터의 수는 일치합니다.
즉, 일대일 관계를 반드시 유지합니다.
대신 Event 의 Value 혹은 Data Type 이 변경될 수 있습니다.
IntStream 을 읽어들여 간단한 Transformation 을 수행하는 Map Operator 를 구현해보겠습니다.
< Event Value 가 변경되는 케이스 >
public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source = env.fromCollection( IntStream.range(0, 10).boxed().collect(Collectors.toList()) ); DataStream<Integer> mappedStream = source.map(num -> { if (num % 2 == 0) return num + 1; else return num / 2; }); mappedStream.print(); env.execute(); }
< Event DataType 이 변경되는 케이스 >
public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source = env.fromCollection( IntStream.range(0, 10).boxed().collect(Collectors.toList()) ); DataStream<String> mappedStream = source.map(num -> String.valueOf(num)); mappedStream.print(); env.execute(); }
Filter Operator.
Filter Operator 는 입력 이벤트의 수와 출력 이벤트의 수가 달라집니다.
Filter Operator 를 거치게 되면서 더 이상 일대일 관계가 성립되지 않게 됩니다.
대신 Value 또는 DataType 이 변경되어도 되고, 유지되어도 됩니다.
Filter Operator 의 큰 의의는 입력 이벤트 중의 일부를 제외시키는데에 존재합니다.
보통 데이터 처리의 요구사항에 합당하지 않는 데이터를 제외시키는 용도로 활용되곤 합니다.
< 5 보다 큰 값을 제외시키는 Filter Operator >
public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source = env.fromCollection( IntStream.range(0, 10).boxed().collect(Collectors.toList()) ); DataStream<String> mappedStream = source .filter(num -> num < 5) .map(num -> String.valueOf(num)); mappedStream.print(); env.execute(); }
FlatMap Operator.
FlatMap Transformation 은 N:M 의 입력 데이터와 출력 데이터의 비율을 가집니다.
다른 식으로 표현하면 하나의 입력 값에 대해서 0개 이상의 데이터를 출력합니다.
어떠한 출력 데이터도 생성하지 않을 수 있고, 많은 양의 데이터를 생성할 수도 있습니다.
예를 들어보면 아래와 같습니다.
아래 예시는 입력된 수치보다 작은 모든 자연수를 출력하는 FlatMap Transformation 을 구현한 케이스입니다.
public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source = env.fromCollection( IntStream.range(0, 10).boxed().collect(Collectors.toList()) ); DataStream<Integer> mappedStream = source .flatMap(new FlatMapFunction<Integer, Integer>() { @Override public void flatMap(Integer value, Collector<Integer> out) throws Exception { while (value > 0) { out.collect(value); value--; } } }); mappedStream.print(); env.execute(); }
출력 아래처럼 입력 데이터보다 작은 자연수들을 내림차순으로 출력합니다.
결과 내용이 조금 생략되긴 했지만, 10>, 9> 와 같이 > 등호의 앞 부분은 병렬적으로 수행된 SubTask 의 번호를 뜻합니다.
10> 9 9> 8 8> 7 7> 6 6> 5 10> 8 9> 7 8> 6 7> 5 6> 4 10> 7 9> 6 8> 5 7> 2 6> 3 10> 6 9> 5 8> 4 7> 1 6> 2 10> 5 9> 4 8> 3 7> 4 6> 1 10> 4 9> 3 8> 2 7> 3 10> 3 9> 2 8> 1 10> 2 9> 1 10> 1
마치며.
대표적인 Stateless Transformation 인 Map, Filter, FlatMap 에 대해서 알아보았습니다.
저는 편의를 위해서 모든 Transformation 의 구현을 Lambda Function 과 익명클래스로 구현하였는데요.
다음 글에서는 각 Operator 를 구현하는 다양한 방법들에 대해서 알아보도록 하겠습니다.
언제든 질문주시면 답변해드리도록 하겠습니다.
감사합니다.
반응형'Flink' 카테고리의 다른 글
[Flink] FileSource 알아보기 (json, csv, parquet, avro) (0) 2024.02.04 [Flink] KafkaSource Connector 알아보기 (0) 2024.02.04 Flink Window 이해하기 (0) 2024.01.13 Flink StreamGraph 알아보기 (0) 2024.01.11 Flink State 알아보기 (0) 2024.01.10