-
Flink Checkpoint 알아보기Flink 2024. 1. 10. 21:41728x90반응형
- 목차
들어가며.
Flink 의 Checkpoint 는 Flink Data Stream 이 어떻게 처리되고 있는지에 대한 상태를 저장하는 스냅샷입니다.
Mapper, Filter, Window Operator 등이 끊임없이 흘러가는 Event 들을 처리합니다.
Filter 에 의해서 불필요한 데이터는 제외되고,
Mapper 에 의해서 데이터의 타입이 변경되거나 데이터의 내용이 바뀌기도 합니다.
그리고 Window Operator 에 의해서 데이터들의 평균값 또는 총 합계와 같은 집계 연산 또한 수행됩니다.
특정 시각에 각 Operator 들이 어떤 데이터를 처리하였고, 어떤 그 상태가 어떤지를 스냅샷으로 찍어 관리하게 됩니다.
이러한 기능을 Checkpoint 라고 부릅니다.
만약 Flink Stream Processing Program 이 어떠한 부정적인 이유로 종료된다면,
종료 시점부터 재처리가 진행되어야합니다.
이 과정에서 생성된 Checkpoint 를 통해 Flink Stream Processing Program 이 마지막 Checkpoint 시점으로 복구되죠.
Checkpoint 를 알기 위해선는 Checkpoint Barrier, Checkpoint Alignment 등의 이해도가 필요하기 때문에
이번 글에서 Checkpoint 에 대한 대략적인 정보를 제공하려고 합니다.
Checkpoint Barrier.
Checkpoint Barrier 는 Flink Data Stream 을 흐르는 하나의 레코드입니다.
Checkpoint Barrier 는 Checkpoint 를 생성하기 위한 목적을 가지고 있구요.
Flink 의 TaskManager 에서 실행되고 있는 여러 Task 에게 Checkpoint Barrier 는 전달됩니다.
Checkpoint Barrier 의 출발은 Source Task 에서 시작됩니다.
Checkpoint Barrier 는 Source Task 에서 시작하여 모든 Task 들을 이동하게 되구요.
마지막으로 Sink 에 도달함을 끝으로 Checkpoint 를 생성하는 여정은 마무리됩니다.
왜 Barrier 라고 부를까 ?
Barrier 의 뜻은 벽입니다.
Flink Data Stream 을 흐르는 데이터들은 크게 세 종류로 나뉘는데요.
- Event
- Checkpoint Barrier
- Watermark
와 같이 분류될 수 있습니다.
그 중에서 Checkpoint Barrier 는 Event 와 달리 프로세싱의 대상이 아니며,
오로지 Checkpoint 를 생성하는 목적을 위해서 Data Stream 을 흐르게 됩니다.
아래 이미지에서 볼 수 있듯이, Data 4 와 Data 5 의 사이에 Checkpoint Barrier 가 존재합니다.
이 Checkpoint Barrier 는 Event 관점에서 하나의 거대한 벽과 같이 인식됩니다.
Checkpoint Barrier 를 전달받은 SubTask 는 Data 1 부터 Data 4 까지의 데이터를 처리하였고,
State 를 업데이트한 상태입니다.
그리고 벽 너머로의 Data 5 와 그 이후의 데이터들은 처리하지 않았고, State 에 반영되지 않습니다.
즉, Checkpoint Barrier 는 이를 기준으로 처리가 완료된 Event 와 처리되지 않은 Event 를 나누는 벽이 됩니다.
그리고 Checkpoint Barrier 이전의 Event 와 State 는 생성될 Checkpoint 에 포함되죠.
Aligned Checkpoint.
Checkpoint Barrier 는 모든 SubTask 로 전달됩니다.
먼저 SubTask 와 Task 의 관계에 대해서 알아보도록 하겠습니다.
아래와 같은 Source -> Map -> Filter -> Sink 인 데이터 스트림이 존재합니다.
이는 Parallelism 3 에 의해서 아래와 같은 구조가 되는데요.
이때, Parallelism 에 의해서 분산되는 각각의 노드들은 SubTask 라고 합니다.
즉, Map Task 는 Map1, Map2, Map3 인 SubTask 들로 구성됩니다.
Filter Task 는 Filter1, Filter2, Filter3 인 SubTask 들로 구성되고,
Sink Task 는 Sink1, Sink2, Sink3 인 SubTask 들로 구성됩니다.
참고로 Source 는 1개의 SubTask 로 구성된 Task 입니다.
Checkpoint Barrier 는 Source 를 시작으로하여 모든 Task 와 각 Task 의 SubTask 들로 전달됩니다.
Aligned Checkpoint 는 한 Task 의 SubTask 들이 모두 State 를 업데이트를 마치고,
Checkpoint Coordinator 에게 State 를 모두 갱신하였음을 알리게 됩니다. (Acknowledgement)
이렇게 동일한 Task 의 SubTask 들이 Checkpoint Barrier 에 의해서 State 를 갱신하고
Checkpoint Coordinator 에게 Acknowledgement 를 전달하는 과정은 Checkpoint Alignment 라고 합니다.
각각의 SubTask 들이 Checkpoint Barrier 이전의 Event 들을 모두 처리하고, 그 결과인 State 를 Update 한 이후에
Checkpoint Coordinator 에게 Acknowledgement 를 전달합니다.
이는 Task 단위로 이루어지며, 모든 Task 의 Acknowledgement 가 마무리되면 State-Backend 에 저장되어 있는 상태 정보를 기반으로 Checkpoint 가 생성됩니다.
Unaligned Checkpoint.
Unaligned Checkpoint 는 Checkpoint Coordinator 가 Task 의 Acknowledgement 를 기다리지 않습니다.
그래서 Checkpoint Interval 주기별로 Checkpoint Barrier 가 Data Stream 을 흐르게 되구요.
Task 간의 Synchronization 없이 Checkpoint 가 생성됩니다.
이 과정에서 Checkpoint Barrier 보다 새로운 Event 이 SubTask 에 의해서 처리될 수 있습니다.
그리고 만약 Checkpoint Recovery 가 발생한다면, 몇몇 Event 가 중복되어 처리될 가능성이 생깁니다.
이러한 현상은 Data In-Flight 라고 합니다.
코드로 구현해보기.
먼저 Checkpoint 가 생성되는 것을 한번 확인해보겠습니다.
아래 코드는 별 의미없는 코드입니다.
그냥 State 를 생성하고, 10초 주기로 Checkpoint 를 생성하기 위함입니다.
package io.bigin.flink.job; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Instant; public class TestStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.enableCheckpointing(10 * 1000); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints"); DataStream<Long> source = env.addSource(new SourceFunction<>() { @Override public void run(SourceContext<Long> ctx) throws Exception { while (true) { ctx.collect(Instant.now().getEpochSecond()); Thread.sleep(1000); } } @Override public void cancel() { } }); DataStream<Long> mapper = source.keyBy(n -> n % 10).map(new StatefulMapper()); env.execute(); } } class StatefulMapper extends RichMapFunction<Long, Long> { ListState<Long> state; ListStateDescriptor<Long> descriptor; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.descriptor = new ListStateDescriptor<Long>("list-state", Long.class); this.state = getRuntimeContext().getListState(this.descriptor); } @Override public Long map(Long value) throws Exception { this.state.add(value); return value; } @Override public void close() throws Exception { super.close(); this.state.clear(); } }
위 코드가 실행되면 아래와 같은 단순한 DAG 가 생성됩니다.
그리고 Map Operator 에서 State 가 차곡차곡 누적됩니다.
미세하긴하지만 Checkpoined Data Size 가 조금씩 늘어나는게 보이시죠 ?
SubTask 의 Checkpoint Alignment 도표.
10개의 SubTask 가 각각 State 를 State-Backend 로 저장한 이후에 Acknowledged 된 상태입니다.
각 SubTask 의 Checkpoint 에 소요된 시간과 State 의 사이즈를 보여줍니다.
그리고 아래의 사진처럼 실제로 생성된 Checkpoint 관련 파일들을 확인할 수 있습니다.
마무리.
다음 글에서는 Checkpoint, Savepoint, Restore 전략 등에 대해서 알아보는 시간을 가져보겠습니다.
읽어주셔서 감사합니다.
반응형'Flink' 카테고리의 다른 글
Flink State 알아보기 (0) 2024.01.10 Flink Watermark 알아보기 (0) 2024.01.10 [Flink] 바이너리 파일 실행하기 (Binary Execution File) (0) 2023.12.29 Flink KeyedStream 알아보기 (0) 2023.10.11 Flink Parquet FileSink 알아보기 (0) 2023.10.04