-
Flink State 알아보기Flink 2024. 1. 10. 23:15728x90반응형
- 목차
들어가며.
Flink 의 DataStream API 는 Stateful Stream Processing 을 지원합니다.
그럼 Stateful Stream Processing 은 무엇일까요 ?
Stateful 과 Stream Processing 을 나눠서 생각해보겠습니다.
Stream Processing 은 끊임없이 유입되는 데이터를 하나씩 처리하는 방식을 의미합니다.
예를 들기 위해서 간단한 psuedo code 를 작성해보았습니다.
var source = [1,2,3,4,5,6,7,8,9, ...] source.filter(x -> x % 2) source.map(x -> x * 2)
위 예시에서 Filter 에 의해 홀수 자연수는 제외됩니다.
그리고 Mapper 에 의해서 필터링된 자연수들은 x2 가 됩니다.
적용된 Filter 와 Mapper 는 데이터 하나 하나에 대한 Transformation 을 수행하게 됩니다.
이러한 방식을 Stream Processing 이라고 부를 수 있습니다.
여기서 중요한 점은 Filter 와 Mapper 가 순수함수로 작성되었다는 점입니다.
어떤 Input 에 대해 상응하는 Output 을 생산하고, 어떠한 Side-Effect 가 없습니다.
그리고 이렇게 순수함수로 구현된 Stream Processing 은 Stateless Stream Processing 이라고 합니다.
즉, 데이터를 처리하기 위해서 어떠한 상태나 조건이 필요하지 않습니다.
어떤 입력 데이터가 주어지더라도 항상 동일한 결과를 도출합니다.
그럼 Stateful 은 무엇일까요 ?
Stream Processing 에서 Stateful 의 의미는 지금까지 처리한 Event 들이 다음에 처리한 Event 에 영향을 주는 구조입니다.
예를 들어, Event 의 총 갯수를 셈하는 Stream Processing 은 아래와 같이 나타낼 수 있습니다.
Count State 라는 State 를 둠으로써 Window Operator 로 유입되는 Event 의 갯수를 누적하게 셈하게 됩니다.
즉, 누적된 Count State 를 활용함으로써 41 이라는 한개의 Event 의 유입되더라도 유지하고 있던 Count State : 40 에 의해서
41 개라는 결과를 얻을 수 있습니다.
이러한 방식으로 Stateful Stream Processing Application 이 구현됩니다.
Flink 의 State 의 종류.
Flink 의 각 State 들을 살펴보고 간단한 구현을 해보도록 하겠습니다.
먼저 State 는 사용하기 위해서는 Hash Partition 이 선행되어야합니다.
이게 무슨 뜻인가하면 State 의 Locality 를 구현하기 위함인데요.
State 의 Locality 에 대해서 설명드리도록 하겠습니다.
State Locality.
아래의 이미지는 keyBy Function 을 사용하여 KeyedStream 을 구현한 내용입니다.
< Psuedo Code >
DataStream source = [1,2,3,4,5,6,7,8,9,10, ...] KeyedStream keyedStream = source.keyBy(num -> num % 2) DataStrea doubleMapper = keyedStream.map(num -> num * 2)
Source 의 데이터가 홀수인지 짝수인지에 따라서 어떤 Downstream SubTask 로 데이터가 전달될지 결정되는 구조입니다.
홀수 데이터는 Mapper1 으로 짝수 데이터는 Mapper2 로 전달됩니다.
이렇게 함으로써 Mapper1 은 홀수 데이터에 관한 State 만을 저장하면 되고,
Mapper2 는 짝수 데이터에 관한 State 만을 저장하면 됩니다.
그렇지 않으면 모든 SubTask 가 Global 한 State 를 공유해야하기 때문에 Overhead 가 발생할 수 있죠.
Hash Partition 을 통해서 SubTask 는 동일한 데이터만을 결정론적으로 제공받을 수 있습니다.
만약 rebalance 과 같이 무작위로 데이터가 Downstream 으로 흘러갈 경우에는
아래와 같이 매번 다른 방식으로 데이터가 Downstream SubTask 로 전달됩니다.
이때에 Checkpoint 를 통해서 Restore 를 하더라도 제대로된 처리가 어렵습니다.
그래서 Flink State 는 KeyedStream 에서만 State 를 허용하며, State 의 Locality 를 구현합니다.
다시 한번 말씀드리면,
Global State 를 사용하게 되면 위 문제는 해결되지만, 이는 비효율적이며
Flink 가 지향하는 방향이 아니라서 해당 기능은 제공되지 않습니다 .
ValueState.
ValueState 는 Scalar Value 를 저장하는 용도입니다.
아래 예시는 Map Operator 로 유입되는 Event 의 누적 카운트를 저장하는 State 예시입니다.
그리고 누적된 수치가 100개가 넘어가면 Downstream 으로 Event 를 흘려보내지 않습니다.
class ValueStateMapper extends RichMapFunction<Long, Long> { transient ValueState<Long> state; transient ValueStateDescriptor<Long> descriptor; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.descriptor = new ValueStateDescriptor<Long>("total-event-count", Long.class); this.state = getRuntimeContext().getState(descriptor); } @Override public Long map(Long value) throws Exception { long accumulatedCount = Optional.ofNullable(this.state.value()).orElse(0L); if (accumulatedCount > 100) return null; this.state.update(accumulatedCount + 1); return value; } @Override public void close() throws Exception { super.close(); } }
public class TestStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(2); env.enableCheckpointing(10 * 1000); DataStream<Long> source = env.addSource(new SourceFunction<>() { @Override public void run(SourceContext<Long> ctx) throws Exception { while (true) {ctx.collect(Instant.now().toEpochMilli());Thread.sleep(10);} } @Override public void cancel() {} }); source.keyBy(n -> n % 2).map(new ValueStateMapper()).filter(Objects::nonNull).rebalance().print(); env.execute(); } }
Value State 가 적용된 Flink Data Stream Program 을 아래와 같이 동작합니다.
2개의 SubTask 가 존재하며, 각 SubTask 가 101 개의 Event 를 Downstream 으로 흘려보냅니다.
따라서 아래 이미지처럼 "Sink: Print to Std. Out" 로 유입되는 Event 의 갯수는 202 가 됩니다.
ListState.
ListState 는 List 형태로 State 를 기록합니다.
저는 마지막 Event 5개를 List State 에 저장하는 예시 코드를 작성해보려고 합니다.
class ListStateMapper extends RichMapFunction<Long, Long> { transient ListState<Long> state; transient ListStateDescriptor<Long> descriptor; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.descriptor = new ListStateDescriptor<Long>("last-five-events", Long.class); this.state = getRuntimeContext().getListState(descriptor); } @Override public Long map(Long value) throws Exception { Queue<Long> queue = new LinkedList<>(); this.state.get().forEach(queue::add); while (queue.size() > 4) queue.poll(); queue.add(value); this.state.clear(); while (!queue.isEmpty()) this.state.add(queue.poll()); return value; } @Override public void close() throws Exception { super.close(); } }
MapState.
MapState 는 Dictionary 타입의 데이터를 저장할 수 있습니다.
분단위로 몇개의 Event 가 유입되었는지 저장하는 Dictionary 를 저장하는 MapState 를 만들어보겠습니다.
class MapStateMapper extends RichMapFunction<Long, Long> { transient MapState<Integer, Long> state; transient MapStateDescriptor<Integer, Long> descriptor; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.descriptor = new MapStateDescriptor<Integer, Long>("count-per-minute", Integer.class, Long.class); this.state = getRuntimeContext().getMapState(descriptor); } @Override public Long map(Long value) throws Exception { int minute = (int) ((value / 60000) % 60); long countPerMinute = Optional.ofNullable(this.state.get(minute)).orElse(0L); this.state.put(minute, countPerMinute + 1L); return value; } @Override public void close() throws Exception { super.close(); } }
Checkpoint.
Stateless 한 Operator 인 Map Operator 에서 State 를 사용함으로써
Checkpoint 시에 Map Operator 의 State 가 저장됩니다.
아래 두 이미지는 Checkpoint History 와 Checkpoint 의 내용인데요.
"Map -> Filter" 에 해당하는 Task 의 Checkpointed Data Size 가 3.87KB 로 보여집니다.
ValueState, ListState, MapState 에서 다루는 데이터의 크기만큼 용량이 결정됩니다.
State 를 사용하지 않는 Source 와 Sink 의 경우에는 Checkpointed Data Size 가 0 입니다.
마무리 .
위에서 설명하였던 Map Operator 와 State 의 조합은 Stateful Stream Processing 을 설명하기 위한 내용이었습니다.
사실상 Map, Filter, FlatMap 과 같은 Operator 는 Stateless 한 처리가 더 어울립니다.
State 를 설명하기 위해서 설명한 내용임을 알아주시면 좋을 것 같구요.
다음에 Window Operator 를 설명하는 글에서
State 의 자연스러운 적용 예시를 한번 더 말씀드리겠습니다.
반응형'Flink' 카테고리의 다른 글
Flink Window 이해하기 (0) 2024.01.13 Flink StreamGraph 알아보기 (0) 2024.01.11 Flink Watermark 알아보기 (0) 2024.01.10 Flink Checkpoint 알아보기 (0) 2024.01.10 [Flink] 바이너리 파일 실행하기 (Binary Execution File) (0) 2023.12.29