ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flink State 알아보기
    Flink 2024. 1. 10. 23:15
    728x90
    반응형

     

     

    - 목차

     

    들어가며.

    Flink 의 DataStream API 는 Stateful Stream Processing 을 지원합니다.

    그럼 Stateful Stream Processing 은 무엇일까요 ?

    Stateful 과 Stream Processing 을 나눠서 생각해보겠습니다.

    Stream Processing 은 끊임없이 유입되는 데이터를 하나씩 처리하는 방식을 의미합니다.

    예를 들기 위해서 간단한 psuedo code 를 작성해보았습니다.

    var source = [1,2,3,4,5,6,7,8,9, ...]
    source.filter(x -> x % 2)
    source.map(x -> x * 2)

     

    위 예시에서 Filter 에 의해 홀수 자연수는 제외됩니다.

    그리고 Mapper 에 의해서 필터링된 자연수들은 x2 가 됩니다.

    적용된 Filter 와 Mapper 는 데이터 하나 하나에 대한 Transformation 을 수행하게 됩니다.

    이러한 방식을 Stream Processing 이라고 부를 수 있습니다.

     

    여기서 중요한 점은 Filter 와 Mapper 가 순수함수로 작성되었다는 점입니다.

    어떤 Input 에 대해 상응하는 Output 을 생산하고, 어떠한 Side-Effect 가 없습니다.

    그리고 이렇게 순수함수로 구현된 Stream Processing 은 Stateless Stream Processing 이라고 합니다.

    즉, 데이터를 처리하기 위해서 어떠한 상태나 조건이 필요하지 않습니다.

    어떤 입력 데이터가 주어지더라도 항상 동일한 결과를 도출합니다.

     

    그럼 Stateful 은 무엇일까요 ?

    Stream Processing 에서 Stateful 의 의미는 지금까지 처리한 Event 들이 다음에 처리한 Event 에 영향을 주는 구조입니다.

    예를 들어, Event 의 총 갯수를 셈하는 Stream Processing 은 아래와 같이 나타낼 수 있습니다.

    Count State 라는 State 를 둠으로써 Window Operator 로 유입되는 Event 의 갯수를 누적하게 셈하게 됩니다.

     

    즉, 누적된 Count State 를 활용함으로써 41 이라는 한개의 Event 의 유입되더라도 유지하고 있던 Count State : 40 에 의해서

    41 개라는 결과를 얻을 수 있습니다.

    이러한 방식으로 Stateful Stream Processing Application 이 구현됩니다.

     

    Flink 의 State 의 종류.

    Flink 의 각 State 들을 살펴보고 간단한 구현을 해보도록 하겠습니다.

    먼저 State 는 사용하기 위해서는 Hash Partition 이 선행되어야합니다.

    이게 무슨 뜻인가하면 State 의 Locality 를 구현하기 위함인데요.

    State 의 Locality 에 대해서 설명드리도록 하겠습니다.

     

    State Locality.

     

    아래의 이미지는 keyBy Function 을 사용하여 KeyedStream 을 구현한 내용입니다.

     

    < Psuedo Code >

    DataStream source = [1,2,3,4,5,6,7,8,9,10, ...]
    KeyedStream keyedStream = source.keyBy(num -> num % 2)
    DataStrea doubleMapper = keyedStream.map(num -> num * 2)

     

    Source 의 데이터가 홀수인지 짝수인지에 따라서 어떤 Downstream SubTask 로 데이터가 전달될지 결정되는 구조입니다.

    홀수 데이터는 Mapper1 으로 짝수 데이터는 Mapper2 로 전달됩니다.

    이렇게 함으로써 Mapper1 은 홀수 데이터에 관한 State 만을 저장하면 되고,

    Mapper2 는 짝수 데이터에 관한 State 만을 저장하면 됩니다.

    그렇지 않으면 모든 SubTask 가 Global 한 State 를 공유해야하기 때문에 Overhead 가 발생할 수 있죠.

    Hash Partition 을 통해서 SubTask 는 동일한 데이터만을 결정론적으로 제공받을 수 있습니다.

     

    만약 rebalance 과 같이 무작위로 데이터가 Downstream 으로 흘러갈 경우에는

    아래와 같이 매번 다른 방식으로 데이터가 Downstream SubTask 로 전달됩니다.

    이때에 Checkpoint 를 통해서 Restore 를 하더라도 제대로된 처리가 어렵습니다.

    그래서 Flink State 는 KeyedStream 에서만 State 를 허용하며, State 의 Locality 를 구현합니다.

     

    다시 한번 말씀드리면,

    Global State 를 사용하게 되면 위 문제는 해결되지만, 이는 비효율적이며

    Flink 가 지향하는 방향이 아니라서 해당 기능은 제공되지 않습니다 .

     

    ValueState.

    ValueState 는 Scalar Value 를 저장하는 용도입니다.

    아래 예시는 Map Operator 로 유입되는 Event 의 누적 카운트를 저장하는 State 예시입니다.

    그리고 누적된 수치가 100개가 넘어가면 Downstream 으로 Event 를 흘려보내지 않습니다.

    class ValueStateMapper extends RichMapFunction<Long, Long> {
      transient ValueState<Long> state;
      transient ValueStateDescriptor<Long> descriptor;
    
      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.descriptor = new ValueStateDescriptor<Long>("total-event-count", Long.class);
        this.state = getRuntimeContext().getState(descriptor);
      }
    
      @Override
      public Long map(Long value) throws Exception {
        long accumulatedCount = Optional.ofNullable(this.state.value()).orElse(0L);
        if (accumulatedCount > 100) return null;
        this.state.update(accumulatedCount + 1);
        return value;
      }
    
      @Override
      public void close() throws Exception {
        super.close();
      }
    }
    public class TestStreamJob {
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(2);
        env.enableCheckpointing(10 * 1000);
        DataStream<Long> source = env.addSource(new SourceFunction<>() {
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
            while (true) {ctx.collect(Instant.now().toEpochMilli());Thread.sleep(10);}
          }
    
          @Override
          public void cancel() {}
        });
    
        source.keyBy(n -> n % 2).map(new ValueStateMapper()).filter(Objects::nonNull).rebalance().print();
        env.execute();
      }
    }

     

    Value State 가 적용된 Flink Data Stream Program 을 아래와 같이 동작합니다.

    2개의 SubTask 가 존재하며, 각 SubTask 가 101 개의 Event 를 Downstream 으로 흘려보냅니다.

    따라서 아래 이미지처럼 "Sink: Print to Std. Out" 로 유입되는 Event 의 갯수는 202 가 됩니다.

     

     

     

    ListState.

    ListState 는 List 형태로 State 를 기록합니다.

    저는 마지막 Event 5개를 List State 에 저장하는 예시 코드를 작성해보려고 합니다.

     

    class ListStateMapper extends RichMapFunction<Long, Long> {
      transient ListState<Long> state;
      transient ListStateDescriptor<Long> descriptor;
    
      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.descriptor = new ListStateDescriptor<Long>("last-five-events", Long.class);
        this.state = getRuntimeContext().getListState(descriptor);
      }
    
      @Override
      public Long map(Long value) throws Exception {
        Queue<Long> queue = new LinkedList<>();
        this.state.get().forEach(queue::add);
    
        while (queue.size() > 4) queue.poll();
        queue.add(value);
    
        this.state.clear();
        while (!queue.isEmpty()) this.state.add(queue.poll());
    
        return value;
      }
    
      @Override
      public void close() throws Exception {
        super.close();
      }
    }

     

    MapState.

    MapState 는 Dictionary 타입의 데이터를 저장할 수 있습니다.

    분단위로 몇개의 Event 가 유입되었는지 저장하는 Dictionary 를 저장하는 MapState 를 만들어보겠습니다.

     

    class MapStateMapper extends RichMapFunction<Long, Long> {
      transient MapState<Integer, Long> state;
      transient MapStateDescriptor<Integer, Long> descriptor;
    
      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.descriptor = new MapStateDescriptor<Integer, Long>("count-per-minute", Integer.class, Long.class);
        this.state = getRuntimeContext().getMapState(descriptor);
      }
    
      @Override
      public Long map(Long value) throws Exception {
        int minute = (int) ((value / 60000) % 60);
        long countPerMinute = Optional.ofNullable(this.state.get(minute)).orElse(0L);
        this.state.put(minute, countPerMinute + 1L);
        return value;
      }
    
      @Override
      public void close() throws Exception {
        super.close();
      }
    }

     

     

    Checkpoint.

    Stateless 한 Operator 인 Map Operator 에서 State 를 사용함으로써

    Checkpoint 시에 Map Operator 의 State 가 저장됩니다.

    아래 두 이미지는 Checkpoint History 와 Checkpoint 의 내용인데요.

    "Map -> Filter" 에 해당하는 Task 의 Checkpointed Data Size 가 3.87KB 로 보여집니다.

    ValueState, ListState, MapState 에서 다루는 데이터의 크기만큼 용량이 결정됩니다.

    State 를 사용하지 않는 Source 와 Sink 의 경우에는 Checkpointed Data Size 가 0 입니다.

     

     

    마무리 .

    위에서 설명하였던 Map Operator 와 State 의 조합은 Stateful Stream Processing 을 설명하기 위한 내용이었습니다.

    사실상 Map, Filter, FlatMap 과 같은 Operator 는 Stateless 한 처리가 더 어울립니다.

    State 를 설명하기 위해서 설명한 내용임을 알아주시면 좋을 것 같구요.

    다음에 Window Operator 를 설명하는 글에서

    State 의 자연스러운 적용 예시를 한번 더 말씀드리겠습니다.

     

     

    반응형

    'Flink' 카테고리의 다른 글

    Flink Window 이해하기  (0) 2024.01.13
    Flink StreamGraph 알아보기  (0) 2024.01.11
    Flink Watermark 알아보기  (0) 2024.01.10
    Flink Checkpoint 알아보기  (0) 2024.01.10
    [Flink] 바이너리 파일 실행하기 (Binary Execution File)  (0) 2023.12.29
Designed by Tistory.