ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Checkpoint Alignment 알아보기
    Flink 2024. 3. 28. 06:39
    728x90
    반응형

    - 목차

     

    함께 보면 좋은 글.

    https://westlife0615.tistory.com/572

     

    Flink Checkpoint 알아보기

    - 목차 들어가며. Flink 의 Checkpoint 는 Flink Data Stream 이 어떻게 처리되고 있는지에 대한 상태를 저장하는 스냅샷입니다. Mapper, Filter, Window Operator 등이 끊임없이 흘러가는 Event 들을 처리합니다. Filte

    westlife0615.tistory.com

     

    들어가며.

    이번 글에서는 Checkpoint Alignment 에 대해서 알아보려고 합니다.

    Checkpoint Alignment 를 이해하기 이전에 Checkpoint Barrier 그리고 Checkpoint 가 수행되는 방법에 대해서 먼저 알아보겠습니다.

     

    Checkpoint Barrier.

    Checkpoint Barrier 는 Flink Application 의 Checkpoint 를 생성하기 위해서

    Flink Application 전역으로 흐르는 하나의 이벤트입니다.

    Source 에서 생성되는 데이터와 Watermark 와 유사하게 Checkpoint Barrier 또한 Flink Application Pipeline 을 흐르게 됩니다.

    그럼 Checkpoint Barrier 의 역할을 무엇일까요 ?

    Checkpoint Barrier 는 여러 Checkpoint 의 경계를 구분합니다.

    예를 들어보겠습니다.

     

    먼저 Source 에서 Sink 로 이벤트 데이터들이 생성되어 흘러갑니다.

    Sink 는 State 로써 전달받은 이벤트의 갯수를 저장합니다.

     

    첫번째 Checkpoint Barrier 인 CB1 이 생성됩니다.

    CB1 는 먼저 생성된 5개의 이벤트의 바로 다음에 위치합니다.

    Checkpoint Barrier 는 Event 데이터를 앞지를 수 없습니다.

     

    아래 이미지처럼 5개의 이벤트들이 모두 Sink 로 전달된 이후에 Checkpoint Barrier 가 Sink 에 전달됩니다.

    이 상황에서 Sink 는 Checkpoint Snapshot 을 생성하게 되며, State 는 TotalCount 5 가 됩니다.

     

     

    즉, Checkpoint Barrier 는 Flink 에 존재하는 모든 SubTask 들에게 전달됩니다.

    그리고 SubTask 들은 Checkpoint Barrier 가 전달되면 Checkpoint Snapshot 을 생성할 준비를 합니다.

    이렇게 Checkpoint Barrier 는 Checkpoint 생성을 트리거하기 위해서 사용됩니다.

     

    Checkpoint Alignment 란 ?

    이제 Checkpoint Barrier 가 무엇인지는 알았는데, Checkpoint Alignment 는 무엇일까요 ?

    Checkpoint Alignment 는 모든 SubTask 들이 Checkpoint 생성을 마무리할 수 있도록 이벤트 처리 통제하는 방식인데요.

    Flink 의 여러 Operator 들은 rebalance, keyBy 등의 파티셔닝에 의해서 복잡한 연결 관계를 가집니다.

    예를 들어, 아래의 예시처럼 Parallelism 3 으로 설정되어 SubTask 가 3인 Flink Application 이 존재합니다.

     

     

    이때, Filter 와 Map 이 Forward 방식으로 연결되어있다면,

    Map Subtask 의 입장에서 Checkpoint Barrier 는 하나씩만 유입됩니다.

     

     

    하지만 아래와 같이 Hash Partitioning 이 된 경우에는 3개의 Checkpoint Barrier 들이 유입되게 됩니다.

    그리고 Filter1 -> Map1, Filter2 -> Map1, Filter3 -> Map1 와 같은 형식의 경로가 생기게 되는데요.

    Filter1 -> Map1 경로의 이벤트 갯수는 1000개

    Filter2 -> Map1 경로의 이벤트 갯수는 10개

    Filter3 -> Map1 경로의 이벤트 갯수는 1개

    라고 가정할 때에 Filter1 -> Map1 경로의 이벤트들을 처리하는데에 더 많은 시간이 소요됩니다.

     

    그리고 "Filter1 -> Map1 경로" 의 모든 이벤트 데이터를 처리하기 전까지

    "Filter2 -> Map1 경로", "Filter3 -> Map1 경로" 의 데이터 흐름이 멈추게 됩니다.

    이렇게 Stop The World 현상이 발생하게 되고, 이러한 현상을 Checkpoint Alignment 라고 합니다.

    즉, 모든 Checkpoint Barrier 보다 선행되어 유입된 이벤트를 처리하고 Sync 를 맞추는 작업이라고 보시면 됩니다.

    "Filter1 -> Map1 경로" 처럼 이벤트가 적절히 분배되지 않은 경우에는 Checkpont Alignment 과정에서 데이터 처리 속도가 낮아지며,

    병목현상으로 이어질 수 있습니다.

     

    실습.

    실제 Checkpoint Alignment 를 테스트해보기 위한 실습을 진행하도록 하겠습니다.

    예시 코드는 아래와 같습니다.

    package com.westlife.jobs;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class TestCheckpointAlignment {
    
      public static void main (String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
        configuration.setBoolean(RestOptions.ENABLE_FLAMEGRAPH, true);
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
        env.disableOperatorChaining();
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoint"));
        DataStream<Integer> source = env.addSource(new SourceFunction<Integer>() {
          @Override
          public void run(SourceContext<Integer> ctx) throws Exception {
            int counter = 0;
            while (true) {
              if (counter % 1000 == 0) ctx.collect(2);
              else ctx.collect(1);
              counter++;
              Thread.sleep(10);
            }
          }
    
          @Override
          public void cancel() {}
        }).name("source").uid("source");
    
        DataStream<Integer> map1 = source.keyBy(v -> v % 2 == 0)
                .map((MapFunction<Integer, Integer>) value -> value)
                .returns(Integer.class).name("map1").uid("map1").setParallelism(2);
    
        map1.map(new CheckpointMapper()).name("map2").uid("map2");
        env.execute();
      }
    
      public static class CheckpointMapper implements MapFunction<Integer, Integer>, CheckpointedFunction {
        private String stateName = "map-state";
        private ListState<Map<Integer, Integer>> state;
        private ListStateDescriptor<Map<Integer, Integer>> desc;
        private Map<Integer, Integer> counterMap;
    
        @Override
        public Integer map(Integer value) {
          if (counterMap.containsKey(value)) counterMap.put(value, counterMap.get(value) + 1);
          else counterMap.put(value, 1);
    
          long waitCounter = 0;
          while (++waitCounter < 100000000) {}
          return value;
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
          if (counterMap.get(1) != null) System.out.printf("### start snapshot checkpoint : %d : %d\n", 1, counterMap.get(1));
          else System.out.printf("### start snapshot checkpoint : %d : %d\n", 2, counterMap.get(2));
    
          this.state.clear();
          this.state.add(counterMap);
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
          this.desc = new ListStateDescriptor<>(stateName, Types.MAP(Types.INT, Types.INT));
          this.state = context.getOperatorStateStore().getListState(desc);
          if (context.isRestored()) this.counterMap = this.state.get().iterator().next();
          else this.counterMap = new HashMap<>();
        }
      }
    }

     

    Stream Graph 는 아래와 같이 구성되구요.

    Parallelism 은 2로써 각 Task 는 2개의 SubTask 를 가집니다.

    Source 는 1 또는 2 의 데이터를 생성합니다.

    10초에 1000개의 데이터를 생성하도록 설계하였구요.  1 과 2의 데이터 생성 비율은 1000 : 1 입니다.

    Source 와 map1 사이에 KeyBy Partitioning 을 적용하여 데이터 1 을 처리하는 SubTask 로 많은 양의 데이터가 유입됩니다.

    즉, Data Skew 를 강제로 구현하였습니다.  0 번 SubTask 로 데이터가 집중적으로 유입됩니다.

     

     

    그리고 10초마다 Checkpoint 가 생성되는데요.

    아래 이미지는 Checkpoint 가 어떻게 생성되는지 나타내는 지표입니다.

     

    0번 SubTask 는 체크포인트 스냅샷을 생성하는데에 2분 14초가 소요됩니다.

    반면 데이터 유입이 적은 1번 SubTask 는 3ms 가 걸리죠.

    하지만, 이 과정동안 Checkpoint Alignment 를 위해서 2m 14s 이상으로 데이터의 처리가 중단됩니다.

     

     

    참고한 글.

    https://aws.amazon.com/ko/blogs/big-data/part-1-optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints/

     

    Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpo

    This post is the first of a two-part series regarding checkpointing mechanisms and in-flight data buffering. In this first part, we explain some of the fundamental Apache Flink internals and cover the buffer debloating feature. In the second part, we focus

    aws.amazon.com

     

     

     

     

    반응형
Designed by Tistory.