    이번 글에서는 Checkpoint Alignment 에 대해서 알아보려고 합니다.

    Checkpoint Alignment 를 이해하기 이전에 Checkpoint Barrier 그리고 Checkpoint 가 수행되는 방법에 대해서 먼저 알아보겠습니다.


    Checkpoint Barrier.

    Checkpoint Barrier 는 Flink Application 의 Checkpoint 를 생성하기 위해서

    Flink Application 전역으로 흐르는 하나의 이벤트입니다.

    Source 에서 생성되는 데이터와 Watermark 와 유사하게 Checkpoint Barrier 또한 Flink Application Pipeline 을 흐르게 됩니다.

    그럼 Checkpoint Barrier 의 역할을 무엇일까요 ?

    Checkpoint Barrier 는 여러 Checkpoint 의 경계를 구분합니다.

    예를 들어보겠습니다.


    먼저 Source 에서 Sink 로 이벤트 데이터들이 생성되어 흘러갑니다.

    Sink 는 State 로써 전달받은 이벤트의 갯수를 저장합니다.


    첫번째 Checkpoint Barrier 인 CB1 이 생성됩니다.

    CB1 는 먼저 생성된 5개의 이벤트의 바로 다음에 위치합니다.

    Checkpoint Barrier 는 Event 데이터를 앞지를 수 없습니다.


    아래 이미지처럼 5개의 이벤트들이 모두 Sink 로 전달된 이후에 Checkpoint Barrier 가 Sink 에 전달됩니다.

    이 상황에서 Sink 는 Checkpoint Snapshot 을 생성하게 되며, State 는 TotalCount 5 가 됩니다.



    즉, Checkpoint Barrier 는 Flink 에 존재하는 모든 SubTask 들에게 전달됩니다.

    그리고 SubTask 들은 Checkpoint Barrier 가 전달되면 Checkpoint Snapshot 을 생성할 준비를 합니다.

    이렇게 Checkpoint Barrier 는 Checkpoint 생성을 트리거하기 위해서 사용됩니다.


    Checkpoint Alignment 란 ?

    이제 Checkpoint Barrier 가 무엇인지는 알았는데, Checkpoint Alignment 는 무엇일까요 ?

    Checkpoint Alignment 는 모든 SubTask 들이 Checkpoint 생성을 마무리할 수 있도록 이벤트 처리 통제하는 방식인데요.

    Flink 의 여러 Operator 들은 rebalance, keyBy 등의 파티셔닝에 의해서 복잡한 연결 관계를 가집니다.

    예를 들어, 아래의 예시처럼 Parallelism 3 으로 설정되어 SubTask 가 3인 Flink Application 이 존재합니다.



    이때, Filter 와 Map 이 Forward 방식으로 연결되어있다면,

    Map Subtask 의 입장에서 Checkpoint Barrier 는 하나씩만 유입됩니다.



    하지만 아래와 같이 Hash Partitioning 이 된 경우에는 3개의 Checkpoint Barrier 들이 유입되게 됩니다.

    그리고 Filter1 -> Map1, Filter2 -> Map1, Filter3 -> Map1 와 같은 형식의 경로가 생기게 되는데요.

    Filter1 -> Map1 경로의 이벤트 갯수는 1000개

    Filter2 -> Map1 경로의 이벤트 갯수는 10개

    Filter3 -> Map1 경로의 이벤트 갯수는 1개

    라고 가정할 때에 Filter1 -> Map1 경로의 이벤트들을 처리하는데에 더 많은 시간이 소요됩니다.


    그리고 "Filter1 -> Map1 경로" 의 모든 이벤트 데이터를 처리하기 전까지

    "Filter2 -> Map1 경로", "Filter3 -> Map1 경로" 의 데이터 흐름이 멈추게 됩니다.

    이렇게 Stop The World 현상이 발생하게 되고, 이러한 현상을 Checkpoint Alignment 라고 합니다.

    즉, 모든 Checkpoint Barrier 보다 선행되어 유입된 이벤트를 처리하고 Sync 를 맞추는 작업이라고 보시면 됩니다.

    "Filter1 -> Map1 경로" 처럼 이벤트가 적절히 분배되지 않은 경우에는 Checkpont Alignment 과정에서 데이터 처리 속도가 낮아지며,

    병목현상으로 이어질 수 있습니다.



    실제 Checkpoint Alignment 를 테스트해보기 위한 실습을 진행하도록 하겠습니다.

    예시 코드는 아래와 같습니다.

    package com.westlife.jobs;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import java.util.HashMap;
    import java.util.Map;
    public class TestCheckpointAlignment {
      public static void main (String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
        configuration.setBoolean(RestOptions.ENABLE_FLAMEGRAPH, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
        env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoint"));
        DataStream<Integer> source = env.addSource(new SourceFunction<Integer>() {
          public void run(SourceContext<Integer> ctx) throws Exception {
            int counter = 0;
            while (true) {
              if (counter % 1000 == 0) ctx.collect(2);
              else ctx.collect(1);
          public void cancel() {}
        DataStream<Integer> map1 = source.keyBy(v -> v % 2 == 0)
                .map((MapFunction<Integer, Integer>) value -> value)
        map1.map(new CheckpointMapper()).name("map2").uid("map2");
      public static class CheckpointMapper implements MapFunction<Integer, Integer>, CheckpointedFunction {
        private String stateName = "map-state";
        private ListState<Map<Integer, Integer>> state;
        private ListStateDescriptor<Map<Integer, Integer>> desc;
        private Map<Integer, Integer> counterMap;
        public Integer map(Integer value) {
          if (counterMap.containsKey(value)) counterMap.put(value, counterMap.get(value) + 1);
          else counterMap.put(value, 1);
          long waitCounter = 0;
          while (++waitCounter < 100000000) {}
          return value;
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
          if (counterMap.get(1) != null) System.out.printf("### start snapshot checkpoint : %d : %d\n", 1, counterMap.get(1));
          else System.out.printf("### start snapshot checkpoint : %d : %d\n", 2, counterMap.get(2));
        public void initializeState(FunctionInitializationContext context) throws Exception {
          this.desc = new ListStateDescriptor<>(stateName, Types.MAP(Types.INT, Types.INT));
          this.state = context.getOperatorStateStore().getListState(desc);
          if (context.isRestored()) this.counterMap = this.state.get().iterator().next();
          else this.counterMap = new HashMap<>();


    Stream Graph 는 아래와 같이 구성되구요.

    Parallelism 은 2로써 각 Task 는 2개의 SubTask 를 가집니다.

    Source 는 1 또는 2 의 데이터를 생성합니다.

    10초에 1000개의 데이터를 생성하도록 설계하였구요.  1 과 2의 데이터 생성 비율은 1000 : 1 입니다.

    Source 와 map1 사이에 KeyBy Partitioning 을 적용하여 데이터 1 을 처리하는 SubTask 로 많은 양의 데이터가 유입됩니다.

    즉, Data Skew 를 강제로 구현하였습니다.  0 번 SubTask 로 데이터가 집중적으로 유입됩니다.



    그리고 10초마다 Checkpoint 가 생성되는데요.

    아래 이미지는 Checkpoint 가 어떻게 생성되는지 나타내는 지표입니다.


    0번 SubTask 는 체크포인트 스냅샷을 생성하는데에 2분 14초가 소요됩니다.

    반면 데이터 유입이 적은 1번 SubTask 는 3ms 가 걸리죠.

    하지만, 이 과정동안 Checkpoint Alignment 를 위해서 2m 14s 이상으로 데이터의 처리가 중단됩니다.



