-
[Flink] Checkpoint Alignment 알아보기Flink 2024. 3. 28. 06:39728x90반응형
- 목차
함께 보면 좋은 글.
https://westlife0615.tistory.com/572
들어가며.
이번 글에서는 Checkpoint Alignment 에 대해서 알아보려고 합니다.
Checkpoint Alignment 를 이해하기 이전에 Checkpoint Barrier 그리고 Checkpoint 가 수행되는 방법에 대해서 먼저 알아보겠습니다.
Checkpoint Barrier.
Checkpoint Barrier 는 Flink Application 의 Checkpoint 를 생성하기 위해서
Flink Application 전역으로 흐르는 하나의 이벤트입니다.
Source 에서 생성되는 데이터와 Watermark 와 유사하게 Checkpoint Barrier 또한 Flink Application Pipeline 을 흐르게 됩니다.
그럼 Checkpoint Barrier 의 역할을 무엇일까요 ?
Checkpoint Barrier 는 여러 Checkpoint 의 경계를 구분합니다.
예를 들어보겠습니다.
먼저 Source 에서 Sink 로 이벤트 데이터들이 생성되어 흘러갑니다.
Sink 는 State 로써 전달받은 이벤트의 갯수를 저장합니다.
첫번째 Checkpoint Barrier 인 CB1 이 생성됩니다.
CB1 는 먼저 생성된 5개의 이벤트의 바로 다음에 위치합니다.
Checkpoint Barrier 는 Event 데이터를 앞지를 수 없습니다.
아래 이미지처럼 5개의 이벤트들이 모두 Sink 로 전달된 이후에 Checkpoint Barrier 가 Sink 에 전달됩니다.
이 상황에서 Sink 는 Checkpoint Snapshot 을 생성하게 되며, State 는 TotalCount 5 가 됩니다.
즉, Checkpoint Barrier 는 Flink 에 존재하는 모든 SubTask 들에게 전달됩니다.
그리고 SubTask 들은 Checkpoint Barrier 가 전달되면 Checkpoint Snapshot 을 생성할 준비를 합니다.
이렇게 Checkpoint Barrier 는 Checkpoint 생성을 트리거하기 위해서 사용됩니다.
Checkpoint Alignment 란 ?
이제 Checkpoint Barrier 가 무엇인지는 알았는데, Checkpoint Alignment 는 무엇일까요 ?
Checkpoint Alignment 는 모든 SubTask 들이 Checkpoint 생성을 마무리할 수 있도록 이벤트 처리 통제하는 방식인데요.
Flink 의 여러 Operator 들은 rebalance, keyBy 등의 파티셔닝에 의해서 복잡한 연결 관계를 가집니다.
예를 들어, 아래의 예시처럼 Parallelism 3 으로 설정되어 SubTask 가 3인 Flink Application 이 존재합니다.
이때, Filter 와 Map 이 Forward 방식으로 연결되어있다면,
Map Subtask 의 입장에서 Checkpoint Barrier 는 하나씩만 유입됩니다.
하지만 아래와 같이 Hash Partitioning 이 된 경우에는 3개의 Checkpoint Barrier 들이 유입되게 됩니다.
그리고 Filter1 -> Map1, Filter2 -> Map1, Filter3 -> Map1 와 같은 형식의 경로가 생기게 되는데요.
Filter1 -> Map1 경로의 이벤트 갯수는 1000개
Filter2 -> Map1 경로의 이벤트 갯수는 10개
Filter3 -> Map1 경로의 이벤트 갯수는 1개
라고 가정할 때에 Filter1 -> Map1 경로의 이벤트들을 처리하는데에 더 많은 시간이 소요됩니다.
그리고 "Filter1 -> Map1 경로" 의 모든 이벤트 데이터를 처리하기 전까지
"Filter2 -> Map1 경로", "Filter3 -> Map1 경로" 의 데이터 흐름이 멈추게 됩니다.
이렇게 Stop The World 현상이 발생하게 되고, 이러한 현상을 Checkpoint Alignment 라고 합니다.
즉, 모든 Checkpoint Barrier 보다 선행되어 유입된 이벤트를 처리하고 Sync 를 맞추는 작업이라고 보시면 됩니다.
"Filter1 -> Map1 경로" 처럼 이벤트가 적절히 분배되지 않은 경우에는 Checkpont Alignment 과정에서 데이터 처리 속도가 낮아지며,
병목현상으로 이어질 수 있습니다.
실습.
실제 Checkpoint Alignment 를 테스트해보기 위한 실습을 진행하도록 하겠습니다.
예시 코드는 아래와 같습니다.
package com.westlife.jobs; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.HashMap; import java.util.Map; public class TestCheckpointAlignment { public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); configuration.setBoolean(RestOptions.ENABLE_FLAMEGRAPH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration); env.disableOperatorChaining(); env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoint")); DataStream<Integer> source = env.addSource(new SourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { int counter = 0; while (true) { if (counter % 1000 == 0) ctx.collect(2); else ctx.collect(1); counter++; Thread.sleep(10); } } @Override public void cancel() {} }).name("source").uid("source"); DataStream<Integer> map1 = source.keyBy(v -> v % 2 == 0) .map((MapFunction<Integer, Integer>) value -> value) .returns(Integer.class).name("map1").uid("map1").setParallelism(2); map1.map(new CheckpointMapper()).name("map2").uid("map2"); env.execute(); } public static class CheckpointMapper implements MapFunction<Integer, Integer>, CheckpointedFunction { private String stateName = "map-state"; private ListState<Map<Integer, Integer>> state; private ListStateDescriptor<Map<Integer, Integer>> desc; private Map<Integer, Integer> counterMap; @Override public Integer map(Integer value) { if (counterMap.containsKey(value)) counterMap.put(value, counterMap.get(value) + 1); else counterMap.put(value, 1); long waitCounter = 0; while (++waitCounter < 100000000) {} return value; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (counterMap.get(1) != null) System.out.printf("### start snapshot checkpoint : %d : %d\n", 1, counterMap.get(1)); else System.out.printf("### start snapshot checkpoint : %d : %d\n", 2, counterMap.get(2)); this.state.clear(); this.state.add(counterMap); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.desc = new ListStateDescriptor<>(stateName, Types.MAP(Types.INT, Types.INT)); this.state = context.getOperatorStateStore().getListState(desc); if (context.isRestored()) this.counterMap = this.state.get().iterator().next(); else this.counterMap = new HashMap<>(); } } }
Stream Graph 는 아래와 같이 구성되구요.
Parallelism 은 2로써 각 Task 는 2개의 SubTask 를 가집니다.
Source 는 1 또는 2 의 데이터를 생성합니다.
10초에 1000개의 데이터를 생성하도록 설계하였구요. 1 과 2의 데이터 생성 비율은 1000 : 1 입니다.
Source 와 map1 사이에 KeyBy Partitioning 을 적용하여 데이터 1 을 처리하는 SubTask 로 많은 양의 데이터가 유입됩니다.
즉, Data Skew 를 강제로 구현하였습니다. 0 번 SubTask 로 데이터가 집중적으로 유입됩니다.
그리고 10초마다 Checkpoint 가 생성되는데요.
아래 이미지는 Checkpoint 가 어떻게 생성되는지 나타내는 지표입니다.
0번 SubTask 는 체크포인트 스냅샷을 생성하는데에 2분 14초가 소요됩니다.
반면 데이터 유입이 적은 1번 SubTask 는 3ms 가 걸리죠.
하지만, 이 과정동안 Checkpoint Alignment 를 위해서 2m 14s 이상으로 데이터의 처리가 중단됩니다.
참고한 글.
반응형'Flink' 카테고리의 다른 글
[Flink] Operator Chaining 알아보기 (0) 2024.03.25 [Flink] Async IO Retry Strategy 알아보기 (0) 2024.03.23 [Flink] Async IO 알아보기 ( AsyncDataStream ) (0) 2024.03.23 [Flink] JSON FileSink 만들어보기 (Row Format) (0) 2024.02.12 [Flink] Tumbling Time Window 알아보기 (TumblingEventTimeWindows) (0) 2024.02.12