ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flink Checkpoint 알아보기
    Flink 2024. 1. 10. 21:41
    728x90
    반응형

    - 목차

     

    들어가며.

    Flink 의 Checkpoint 는 Flink Data Stream 이 어떻게 처리되고 있는지에 대한 상태를 저장하는 스냅샷입니다.

    Mapper, Filter, Window Operator 등이 끊임없이 흘러가는 Event 들을 처리합니다.

    Filter 에 의해서 불필요한 데이터는 제외되고,

    Mapper 에 의해서 데이터의 타입이 변경되거나 데이터의 내용이 바뀌기도 합니다.

    그리고 Window Operator 에 의해서 데이터들의 평균값 또는 총 합계와 같은 집계 연산 또한 수행됩니다.

    특정 시각에 각 Operator 들이 어떤 데이터를 처리하였고, 어떤 그 상태가 어떤지를 스냅샷으로 찍어 관리하게 됩니다.

    이러한 기능을 Checkpoint 라고 부릅니다.

     

    만약 Flink Stream Processing Program 이 어떠한 부정적인 이유로 종료된다면,

    종료 시점부터 재처리가 진행되어야합니다.

    이 과정에서 생성된 Checkpoint 를 통해 Flink Stream Processing Program 이 마지막 Checkpoint 시점으로 복구되죠.

     

    Checkpoint 를 알기 위해선는 Checkpoint Barrier, Checkpoint Alignment 등의 이해도가 필요하기 때문에

    이번 글에서 Checkpoint 에 대한 대략적인 정보를 제공하려고 합니다.

     

    Checkpoint Barrier.

    Checkpoint Barrier 는 Flink Data Stream 을 흐르는 하나의 레코드입니다.

    Checkpoint Barrier 는 Checkpoint 를 생성하기 위한 목적을 가지고 있구요.

    Flink 의 TaskManager 에서 실행되고 있는 여러 Task 에게 Checkpoint Barrier 는 전달됩니다.

     

    Checkpoint Barrier 의 출발은 Source Task 에서 시작됩니다.

    Checkpoint Barrier 는 Source Task 에서 시작하여 모든 Task 들을 이동하게 되구요.

    마지막으로 Sink 에 도달함을 끝으로 Checkpoint 를 생성하는 여정은 마무리됩니다.

     

    왜 Barrier 라고 부를까 ?

    Barrier 의 뜻은 벽입니다.

    Flink Data Stream 을 흐르는 데이터들은 크게 세 종류로 나뉘는데요.

    - Event

    - Checkpoint Barrier

    - Watermark

    와 같이 분류될 수 있습니다.

     

    그 중에서 Checkpoint Barrier 는 Event 와 달리 프로세싱의 대상이 아니며,

    오로지 Checkpoint 를 생성하는 목적을 위해서 Data Stream 을 흐르게 됩니다.

    아래 이미지에서 볼 수 있듯이, Data 4 와 Data 5 의 사이에 Checkpoint Barrier 가 존재합니다.

    이 Checkpoint Barrier 는 Event 관점에서 하나의 거대한 벽과 같이 인식됩니다.

    Checkpoint Barrier 를 전달받은 SubTask 는 Data 1 부터 Data 4 까지의 데이터를 처리하였고,

    State 를 업데이트한 상태입니다.

    그리고 벽 너머로의 Data 5 와 그 이후의 데이터들은 처리하지 않았고, State 에 반영되지 않습니다.

     

     

    즉, Checkpoint Barrier 는 이를 기준으로 처리가 완료된 Event 와 처리되지 않은 Event 를 나누는 벽이 됩니다.

    그리고 Checkpoint Barrier 이전의 Event 와 State 는 생성될 Checkpoint 에 포함되죠.

    Aligned Checkpoint.

    Checkpoint Barrier 는 모든 SubTask 로 전달됩니다.

     

    먼저 SubTask 와 Task 의 관계에 대해서 알아보도록 하겠습니다.

    아래와 같은 Source -> Map -> Filter -> Sink 인 데이터 스트림이 존재합니다.

    이는 Parallelism 3 에 의해서 아래와 같은 구조가 되는데요.

    이때, Parallelism 에 의해서 분산되는 각각의 노드들은 SubTask 라고 합니다.

    즉, Map Task 는 Map1, Map2, Map3 인 SubTask 들로 구성됩니다.

    Filter Task 는 Filter1, Filter2, Filter3 인 SubTask 들로 구성되고,

    Sink Task 는 Sink1, Sink2, Sink3 인 SubTask 들로 구성됩니다.

    참고로 Source 는 1개의 SubTask 로 구성된 Task 입니다.

     

    Checkpoint Barrier 는 Source 를 시작으로하여 모든 Task 와 각 Task 의 SubTask 들로 전달됩니다.

    Aligned Checkpoint 는 한 Task 의 SubTask 들이 모두 State 를 업데이트를 마치고,

    Checkpoint Coordinator 에게 State 를 모두 갱신하였음을 알리게 됩니다. (Acknowledgement)

    이렇게 동일한 Task 의 SubTask 들이 Checkpoint Barrier 에 의해서 State 를 갱신하고

    Checkpoint Coordinator 에게 Acknowledgement 를 전달하는 과정은 Checkpoint Alignment 라고 합니다.

     

     

     

    각각의 SubTask 들이 Checkpoint Barrier 이전의 Event 들을 모두 처리하고, 그 결과인 State 를 Update 한 이후에

    Checkpoint Coordinator 에게 Acknowledgement 를 전달합니다.

    이는 Task 단위로 이루어지며, 모든 Task 의 Acknowledgement 가 마무리되면 State-Backend 에 저장되어 있는 상태 정보를 기반으로 Checkpoint 가 생성됩니다.

     

     

    Unaligned Checkpoint.

    Unaligned Checkpoint 는 Checkpoint Coordinator 가 Task 의 Acknowledgement 를 기다리지 않습니다.

    그래서 Checkpoint Interval 주기별로 Checkpoint Barrier 가 Data Stream 을 흐르게 되구요.

    Task 간의 Synchronization 없이 Checkpoint 가 생성됩니다.

     

    이 과정에서 Checkpoint Barrier 보다 새로운 Event 이 SubTask 에 의해서 처리될 수 있습니다.

    그리고 만약 Checkpoint Recovery 가 발생한다면, 몇몇 Event 가 중복되어 처리될 가능성이 생깁니다.

    이러한 현상은 Data In-Flight 라고 합니다.

     

     

    코드로 구현해보기.

    먼저 Checkpoint 가 생성되는 것을 한번 확인해보겠습니다.

    아래 코드는 별 의미없는 코드입니다.

    그냥 State 를 생성하고, 10초 주기로 Checkpoint 를 생성하기 위함입니다.

    package io.bigin.flink.job;
    
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.time.Instant;
    
    public class TestStreamJob {
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(10 * 1000);
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints");
        DataStream<Long> source = env.addSource(new SourceFunction<>() {
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
            while (true) {
              ctx.collect(Instant.now().getEpochSecond());
              Thread.sleep(1000);
            }
          }
    
          @Override
          public void cancel() {
    
          }
        });
    
        DataStream<Long> mapper = source.keyBy(n -> n % 10).map(new StatefulMapper());
        env.execute();
      }
    }
    
    class StatefulMapper extends RichMapFunction<Long, Long> {
    
      ListState<Long> state;
      ListStateDescriptor<Long> descriptor;
    
      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.descriptor = new ListStateDescriptor<Long>("list-state", Long.class);
        this.state = getRuntimeContext().getListState(this.descriptor);
      }
    
      @Override
      public Long map(Long value) throws Exception {
        this.state.add(value);
        return value;
      }
    
      @Override
      public void close() throws Exception {
        super.close();
        this.state.clear();
      }
    }

     

    위 코드가 실행되면 아래와 같은 단순한 DAG 가 생성됩니다.

    그리고 Map Operator 에서 State 가 차곡차곡 누적됩니다.

     

     

     

    미세하긴하지만 Checkpoined Data Size 가 조금씩 늘어나는게 보이시죠 ?

     

     

    SubTask 의 Checkpoint Alignment 도표.

    10개의 SubTask 가 각각 State 를 State-Backend 로 저장한 이후에 Acknowledged 된 상태입니다.

    각 SubTask 의 Checkpoint 에 소요된 시간과 State 의 사이즈를 보여줍니다.

     

     

    그리고 아래의 사진처럼 실제로 생성된 Checkpoint 관련 파일들을 확인할 수 있습니다.

     

     

    마무리.

    다음 글에서는 Checkpoint, Savepoint, Restore 전략 등에 대해서 알아보는 시간을 가져보겠습니다.

    읽어주셔서 감사합니다.

     

     

    반응형

    'Flink' 카테고리의 다른 글

    Flink State 알아보기  (0) 2024.01.10
    Flink Watermark 알아보기  (0) 2024.01.10
    [Flink] 바이너리 파일 실행하기 (Binary Execution File)  (0) 2023.12.29
    Flink KeyedStream 알아보기  (0) 2023.10.11
    Flink Parquet FileSink 알아보기  (0) 2023.10.04
Designed by Tistory.