-
Flink StreamGraph 알아보기Flink 2024. 1. 11. 06:58728x90반응형
- 목차
들어가며.
StreamGraph 는 Flink DataStream API 로 작성된 프로그램이 변환되는 최초의 모습입니다.
이를 다른 말로 실행 계획이라고 부르는데요.
일반적인 Java Program 이 컴파일되어 bytecode 가 되고, JVM 에서 실행되는 절차가 있듯이,
Flink Program 또한 이러한 절차들이 존재합니다.
대략적인 절차를 작성해보면,
1. Flink DataStream API 로 작성한 Program 을 만든다.
2. 최초의 실행 계획인 StreamGraph 를 만든다.
3. StreamGraph 는 Flink Optimizer 에 의해서 JobGraph 가 된다.
4. JobGraph 는 Flink JobManager 에게 전달된다.
5. Flink JobManager 는 TaskManager, ResourceManager 들로 부터 현재 클러스터 상태를 확인하고,
최종적인 Physical Execution Plan 을 생성한다.
이번 글에서 알아보고자하는 StreamGraph 는 Flink Runtime 이 최초로 생성하는 대략적인 Stream Topology 입니다.
몇차리 Optimizing 과 Scheduling 을 위한 "스트림 프로세싱 도안" 정도 생각하시면 됩니다.
StreamGraph 란 ?
StreamGraph 는 Flink 스트림 처리를 위한 논리적인 실행 계획입니다.
Flink Optimizer 에 의한 최적화와, JobManager 에 의한 실질적인 스케줄링을 하기 위해서 간단한 도안을 생성하는 느낌입니다.
StreamGraph 는 최종적으로 Task 라는 이름의 JVM Thread 가 되기까지 최적화와 변환을 반복합니다.
아래의 이미지가 대표적인 Execution Plan 을 생성하는 과정을 그린 예시입니다.
내용이 좀 복잡하긴 한데요.
이번 글에서 StreamGraph 에 대해서 상세히 설명을 드릴 것이구요.
또 다른 글들에서 이어지는 JobGraph, Optimization, Physical Execution Plan 들에 대해서 설명하도록 하겠습니다.
StreamGraph 는 StreamNode 와 StreamEdge 로 구성됩니다.
StreamNode 는 Flink DataStream API 에서 제공하는 여러 Operator 들입니다.
예를 들어,
map, filter, flatMap 등의 Operator 가 곧 StreamNode 가 됩니다.
StreamNode 와 Operator 는 유사한 의미로써 서로 호환이 가능한 표현입니다.
그리고 StreamEdge 는 이어지는 두개의 StreamNode 또는 Operator 를 어떻게 연결할지에 대한 설정입니다.
keyBy, rebalance 등을 통해서 StreamEdge 를 설정합니다.
이는 다른 표현으로 Partitioning 이라고도 합니다.
간단한 psuedo code 로 예를 들어보도록 하겠습니다.
그리고 psuedo code 의 StreamGraph 도 함께 그려보겠습니다.
DataStreamSource source = env.from([1,2,3,4,5]) DataStream mapper = source.map(num -> num * 2) mapper = mapper.rebalance() DataStream filter = mapper.filter(num -> num > 4) filter = filter.keyBy(num -> num % 2) DataStreamSink sink = filter.sink()
위 StreamGraph 는 4개의 StreamNode 와 3개의 StreamEdge 로 구성됩니다.
StreamGraph 는 Flink Optimizer 에 의해서 JobGraph 를 생성하기 위한 입력 데이터에 지나지 않습니다.
StreamGraph 를 통해서 실질적인 스트림 처리를 할 수 없기 때문이죠.
그래서 Flink Datastream API 로 작성된 프로그램이 어떻게 StreamGraph 로 변환되는지에 대해 아는 것이 중요합니다.
StreamNode 란 무엇일까 ?
StreamNode 는 Flink DataStream API 로 작성된 프로그램의 Operator 에 해당합니다.
source, map, filter, sink 등과 같이 스트림 이벤트를 처리할 수 있는 로직을 가지는 부분이죠.
StreamNode 는 input 데이터를 처리하여 output 데이터로 변환하는 역할을 수행합니다.
그래서 거대한 데이터 스트림의 특별한 처리를 담당하는 작은 부분들로 정의하셔도 됩니다.
그리고 흘러가는 이벤트들을 하나씩 Transformation 을 수행합니다.
Map, FlatMap Transformation 에 의해서 데이터의 타입이 변형될 수도 있고, 데이터의 내용이 바뀔 수도 있습니다.
하나의 데이터가 100개의 데이터로 Enrich 될 수도 있습니다.
Filter Transformation 에 의해서 이상한 데이터들은 걸러질 수도 있습니다.
즉, Flink DataStream API 를 통해서 구현되는 Transformation Operator 들이 곧 StreamNode 이며
"내가 작성하는 Transformation Operator 가 Graph 상에서 이렇게 변환되겠군" 정도의 이해도만 있으시면 좋을 것 같습니다.
StreamEdge 는 무엇인가?
StreamEdge 는 StreamNode 를 잇는 네트워크입니다.
먼저 가볍게 예시를 먼저 확인해보도록 하겠습니다.
아래와 같은 Flink DataStream Program 을 작성하였습니다.
그리고 Flink DataStream Program 은 아래의 이미지처럼 Logical Execution Plan 을 가지게 됩니다.
DataStreamSource source = env.source(); DataStream mapper = source.map(x -> x * 2); DataStream filter = mapper.filter(x -> x > 10); DataStreamSink sink = flink.sink();
Source, Map, Filter, Sink 는 각각 StreamNode 이자 Operator 에 해당합니다.
그리고 그 사이를 잇는 Forward 라는 StreamEdge 가 존재합니다.
이처럼 StreamEdge 는 StreamNode 는 잇는 네트워크 정보입니다.
StreamEdge 의 종류는 무엇이 있을까 ?
Forward.
가장 일반적인 StreamEdge 종류입니다.
Forward 는 StreamNode 와 StreamNode 의 일대일 매핑 방식으로
StreamNode1 -> StreamNode2 의 관계에서 StreamNode1 에서 발생한 이벤트는 반드시 StreamNode2 로 전달됩니다.
아래의 이미지처럼 Forward 로 연결된 StreamGraph 는 최종적인 실행 계획에선
Upstream Task 와 Downstream Task 가 반드시 일대일로 매칭됩니다.
그리고 이러한 관계를 Chaning Operator 라고 합니다.
StreamGraph 는 논리적인 실행 계획이라고 말씀드렸죠?
StreamGraph 는 Flink Optimizer 에 의해서 실질적인 실행 계획으로 변환되는데요.
StreamGraph 는 Physical Execution Plan 으로 변환되기 위한 밑그림이라고 생각하시면 됩니다.
Flink Optimizer 가 최적화하는 여러 기준들 중 하나가 바로 Chaining Operator 의 유무이며,
Forward StreamEdge 가 바로 Chaining Operator 의 중요한 기준이 됩니다.
Hash.
Hash StreamEdge 는 Flink DataStream API 의 keyBy Operation 에 해당합니다.
이는 Upstream StreamNode 에서 발생하는 이벤트를 Hashing 하여 Downstream StreamNode 로 분산시켜 전달합니다.
다른 표현으로 Hash Partitioning 이라고도 합니다.
예를 들어, 1 부터 10000 까지 숫자 데이터가 발생한다고 가정하겠습니다.
그리고 홀수인지 짝수인지를 기준으로 Partitioning 되도록 설계보겠습니다.
DataStreamSource source = env.from(range(1, 10000)) DataStream mapper = source.map(num -> { if (num % 2 == 0) return num * 2; else return num; }) mapper = mapper.keyBy(num -> num % 2); DataStream filter = mapper.filter(num -> num > 100) DataStreamSink sink = filter.sink();
아래 이미지처럼 이벤트의 Shuffling 이 발생하게 됩니다.
실제로 Flink 의 Task 들이 실행될 때, Task 들을 여러 컴퓨팅 환경으로 분산되어 실행됩니다.
그래서 Hash 와 같은 네트워크 연결을 원격 서버들 사이의 네트워크 통신을 발생시킬 수 있습니다.
그리고 Hash 로 연결된 StreamNode 들은 Chaining Operator 관계가 아니므로 최적화의 대상이 아니게 됩니다.
Rebalance.
Rebalance StreamEdge 가 존재합니다.
이는 Hash StreamEdge 처럼 Node 사이의 Shuffling 을 야기합니다.
Hash 는 명확한 Partitioning 기준이 있었다고 한다면,
Rebalance 는 Round-Robin 방식으로 데이터를 Shuffling 합니다.
제 경험상 Rebalancing 방식의 Partitioning 은 로드 밸런싱을 사용하였습니다.
데이터베이스를 조회하는 것과 같은 Network IO 는 병목현상이 발생할 수 있는 주요한 지점입니다.
이러한 병목현상이 발생하는 StreamNode 에 대해서 균등한 수의 이벤트를 처리할 수 있도록 Rebalancing 을 사용했던 경험이 있습니다.
그 외의 경우라면, 무의미한 Shuffling 이 발생하지 않도록 주의할 필요가 있습니다.
Broadcast.
마지막으로 Broadcast 방식이 존재합니다.
Broadcast 를 일반적인 스트림 프로세싱 어플리케이션에서 흔히 등장하는 Partitioning 방식인데요.
Upstream Task 에서 Downstream Task 로 Broadcasting 을 하게 되면,
동일 이벤트가 복사되어 모든 Downstream Task 로 전송됩니다.
즉, Partitioning 이 아닌 Replication 이 되는 것이죠.
아래 이미지와 같이 동일한 메시지가 복사되어 모든 Downstream Task 로 전달됩니다.
반응형'Flink' 카테고리의 다른 글
[Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap) (0) 2024.01.25 Flink Window 이해하기 (0) 2024.01.13 Flink State 알아보기 (0) 2024.01.10 Flink Watermark 알아보기 (0) 2024.01.10 Flink Checkpoint 알아보기 (0) 2024.01.10