ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Window AllowedLateness 알아보기 (Watermark)
    Flink 2024. 2. 12. 08:33
    728x90
    반응형

     

     

    - 목차

     

    들어가며.

    AllowedLateness 은 Window Operation 에서 사용되는 옵션입니다.

    AllowedLateness 는 이름 그대로 지연됨을 허용한다는 의미인데요.

    Flink 와 같은 이벤트 스트리밍 처리 도메인에서는 모든 이벤트들이 뒤섞인 순서로 유입된다는 대전제가 존재합니다.

    네트워크 지연이 가장 큰 원인인데요.

    데이터 소스에서 데이터가 생성된 순서대로 이벤트가 유입된다는 보장이 없습니다.

    그래서 Window Operation 의 AllowedLateness 설정을 통해서 순서가 뒤섞인 이벤트들을 순서대로 처리할 수 있습니다.

     

     

    위 이미지처럼 1, 2, 3, 4 의 순서대로 데이터는 생성됩니다.

    하지만 복잡한 네트워크를 통하게 되면, Flink Application 으로 유입되는 데이터들의 순서는 보장되지 않습니다.

     

    이번 글에서는 Flink Window Operator 의 AllowedLateness 설정이 어떻게 지연된 메시지를 처리하는지에 대한 설명을 작성해보려고 합니다.

     

    Watermark.

    https://westlife0615.tistory.com/573

     

    Flink Watermark 알아보기

    - 목차 들어가며. Watermark 는 Checkpoint Barrier 와 같이 Flink Data Stream 을 흐르는 하나의 레코드입니다. Watermark 가 표현하는 정보는 오직 시간 밖에 없는데요. 특정 시각 정보를 담고 있는 Watermark 는 Dat

    westlife0615.tistory.com

     

    먼저 Watermark 에 대한 간단한 설명을 하려고 합니다.

    위 링크는 제가 이전에 작성한 Watermark 에 대한 글이며, 한번 읽어보시는 것을 추천합니다.

     

    Watermark 는 Flink 데이터 파이프라인이 얼마 만큼의 이벤트 시간을 처리하였는지 나타내는 지표입니다.

    간단히 그림으로 설명을 드리면 아래와 같습니다.

    아래 그림과 같이 1분 단위로 생성되는 이벤트들이 존재합니다.

    2023.01.01 14:44:00 부터 1분 단위로 이벤트들이 생성됩니다.

     

    그리고 이벤트들은 아래 이미지와 같이 복잡한 데이터 처리 파이프라인을 흐르게 됩니다.

    이때 Window Operator 같은 Stateful Operator 는 시간을 기준으로 여러가지 동작을 수행하게 되는데요.

    보통 1시간 간격으로 Downstream 으로 Flush 하는 것과 같은 동작을 수행하게 됩니다.

    이때, 이러한 Flush 같은 Action 을 Trigger 하는 주체가 바로 Watermark 입니다.

     

     

    그리고 Watermark 가 흐르게 되면서 이제 Flush 를 Trigger 하게 되는데,

    아래 예시는 3분 간격의 Tumbling Window 를 표현한 예시입니다.

    Window 는 2023.01.01 14:44:00 부터 2023.01.01 14:47:00 까지의 데이터를 처리하였고,

    2023.01.01 14:47:00 인 Watermark 가 흐르게 되면서 Window Operator 는 2023.01.01 14:47:00 까지의 이벤트가 다 처리되었다고 판단하고 Downstream 으로 Flush 하게 됩니다.

     

     

    즉, 요약하자면 Watermark 는 Timestamp 정보를 가집니다.

    그리고 Watermark 의 상징적인 의미는 Watermark 의 timestamp 까지의 이벤트들은 모두 유입( 또는 처리) 되었다는 의미를 내포합니다.

    그래서 Window Operator 와 같이 시간에 의존적인 Stateful Operator 들은 Watermark 를 기준으로 동작하게 됩니다.

     

    Window.

    AllowedLateness 를 설명하기 이전에 간단한 Tumbling Window 예시를 작성해보도록 하겠습니다.

     

    < TestEvent.java >

    아래 클래스는 Flink Application 에서 사용할 간단한 TestEvent class 입니다.

    Field 는 timestamp 와 data 두개로 구성됩니다.

    Long timestamp 는 Event Time Semantics 를 적용한 시각 정보를 의미하구요.

    String data Field 는 이름 그대로 데이터를 저장할 용도로 사용됩니다.

    package com.westlife.model;
    
    import java.time.Instant;
    
    public class TestEvent {
      public Long timestamp;
      public String data;
    
      public TestEvent() {}
    
      public TestEvent(Long timestamp, String data) {
        this.timestamp = timestamp;
        this.data = data;
      }
    
      public String toString() {
        return String.format("%s %s", Instant.ofEpochMilli(timestamp), data);
      }
    }

     

    < TestEventAggregation.java >

    그리고 아래의 class 는 Window Operator 에 의해서 Aggregation 된 이후의 상태를 표현하는 class 입니다.

    Integer count 는 누적된 TestEvent 의 갯수를 뜻하구요.

    List<Long> timestamps Field 로 TestEvent 의 timestamp 들이 추가됩니다.

    package com.westlife.model;
    
    import java.time.Instant;
    import java.util.*;
    
    public class TestEventAggregation {
      public Integer count;
      public List<Long> timestamps;
    
      public TestEventAggregation() {
        this.count = 0;
        timestamps = new ArrayList<>();
      }
    
      public void append(TestEvent testEvent) {
        this.count++;
        this.timestamps.add(testEvent.timestamp);
      }
    
      public void append(Long timestamp) {
        this.count++;
        this.timestamps.add(timestamp);
      }
    
      public String toString() {
        long minTimestamp = this.timestamps.stream().sorted().findFirst().get();
        long maxTimestamp = this.timestamps.stream().sorted((Long a, Long b) -> (int)(b - a)).findFirst().get();
        return String.format("count : %s %s ~ %s", count, Instant.ofEpochMilli(minTimestamp), Instant.ofEpochMilli(maxTimestamp));
      }
    }

     

     

    < Main.class >

    아래 예시는 Tumbling Window 를 구현한 간단한 예시 프로그램입니다.

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<TestEvent> source = env.addSource(new SourceFunction<>() {
          private final Instant now = Instant.parse("2023-01-01T00:00:00.000Z");
    
          @Override
          public void run(SourceContext<TestEvent> ctx) throws Exception {
            int incremental = 0;
            while (true) {
              Thread.sleep(100);
              incremental++;
              ctx.collect(new TestEvent(now.plus(incremental, ChronoUnit.HOURS).toEpochMilli(), String.valueOf(incremental)));
            }
          }
    
          @Override
          public void cancel() {
          }
        });
    
        DataStream<TestEventAggregation> stream = source
                .assignTimestampsAndWatermarks(WatermarkStrategy.<TestEvent>forBoundedOutOfOrderness(Duration.ofHours(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                )
                .keyBy(event -> "_")
                .window(TumblingEventTimeWindows.of(Time.hours(24)))
                .process(new ProcessWindowFunction<>() {
                  @Override
                  public void process(String s, ProcessWindowFunction<TestEvent, TestEventAggregation, String, TimeWindow>.Context context, Iterable<TestEvent> elements, Collector<TestEventAggregation> out) {
                    TestEventAggregation aggregation = new TestEventAggregation();
                    for (TestEvent testEvent : elements) {
                      aggregation.append(testEvent);
                    }
                    out.collect(aggregation);
                  }
                });
    
        stream.print();
        env.execute();
      }

     

    위 프로그램의 그래프는 아래처럼 표현이 되구요.

    Source -> Watermark Generator -> Window Operator -> Sink 의 데이터 처리 흐름을 갖습니다.

     

    Source.

    Source 는 "2023-01-01 00:00:00" 시간부터 1시간 간격으로 이벤트를 생성합니다.

    데이터 타입은 TestEvent class 를 사용하구요.

    TestEvent 의 timestamp Field 에 시각 정보를 저장합니다.

     

    Watermark Generator.

    TestEvent class 의 timestamp Field 를 기준으로 Event Time 과 Watermark 를 설정하였습니다.

    직접 구현한 SourceFunction 으로부터 생성되는 데이터는 순서가 보장되기 때문에

    forBoundedOutOfOrderness 를 0 으로 설정하였습니다.

     

    Tumbling Window.

    24시간을 주기로 Tumbling Window 를 설정합니다.

    그리고 TestEvent class 는 TestEventAggregation class 로 Aggregation 됩니다.

     

     

    출력 결과는 아래와 같습니다.

    Tumbling Window 의 기준 시간이 24시간입니다.

    따라서 출력되는 TestEventAggregation class 의 count 는 24개, 그리고 timestamp 범위는 00시 ~ 23시 를 가집니다.

    9> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
    10> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z
    1> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z
    2> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z
    3> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z
    4> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z
    5> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z
    6> count : 24 2023-01-08T00:00:00Z ~ 2023-01-08T23:00:00Z
    7> count : 24 2023-01-09T00:00:00Z ~ 2023-01-09T23:00:00Z
    8> count : 24 2023-01-10T00:00:00Z ~ 2023-01-10T23:00:00Z
    9> count : 24 2023-01-11T00:00:00Z ~ 2023-01-11T23:00:00Z

     

    지연된 이벤트 처리하기.

    위 예시에서 Source Function 부분은 약간 수정하겠습니다.

    SourceFunction 의 익명클래스에서 run 함수 내부에 지연 이벤트를 추가하는 코드를 작성하였습니다.

    24시간을 주기로 3일이 지연된 이벤트가 유입됩니다.

    DataStream<TestEvent> source = env.addSource(new SourceFunction<>() {
          private final Instant now = Instant.parse("2023-01-01T00:00:00.000Z");
    
          @Override
          public void run(SourceContext<TestEvent> ctx) throws Exception {
            int incremental = -1;
            while (true) {
              Thread.sleep(100);
              incremental++;
              ctx.collect(new TestEvent(now.plus(incremental, ChronoUnit.HOURS).toEpochMilli(), String.valueOf(incremental)));
    
              if (incremental % 24 == 0) {
                TestEvent late3DaysEvent = new TestEvent(now.plus(incremental, ChronoUnit.HOURS)
                        .minus(72, ChronoUnit.HOURS)
                        .toEpochMilli(), String.valueOf(incremental));
    
                ctx.collect(late3DaysEvent);
              }
            }
          }
    
          @Override
          public void cancel() {
          }
        });

     

    아래 이미지와 같이 0시부터 23시까지 이벤트가 생성된 이후에 3일 이전의 데이터가 생성되는 구조입니다.

     

    아래 출력처럼 2022-12-29 를 제외한 모든 지연 이벤트들은 무시됩니다.

    2> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z
    3> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
    4> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z
    5> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z
    6> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z
    7> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z
    8> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z
    9> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z

     

     

    forBoundedOutOfOrderness 적용.

    forBoundedOutOfOrderness 를 적용함으로써 지연된 이벤트에 대한 처리를 수행할 수 있습니다.

    forBoundedOutOfOrderness 의 값으로 72시간을 적용하여 72시간 지연된 이벤트를 처리할 수 있습니다.

    이렇게 되면 현재 Event 의 timestamp 보다 72 시간이 지난 이후에 Watermark 가 생성되기 때문에 과거 이벤트를 누락시키지 않고 처리할 수 있습니다.

     

    아래의 구조처럼 Event 가 생성되고, Event 의 Timestamp 보다 72 시간 이전의 값을 가지는 Watermark 가 생성됩니다.

    이렇게 함으로써 72시간 지연된 이벤트를 누락시키지 않고 적절히 처리할 수 있습니다.

    대신 그만큼 Window 에서 처리되는 시간이 더 길어지게 됩니다.

     

    데이터 처리의 출력 결과는 아래와 같습니다.

    72시간 지연된 이벤트가 처리되어 TestEventAggregation 의 Count 는 25 개입니다.

    9> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z
    10> count : 1 2022-12-30T00:00:00Z ~ 2022-12-30T00:00:00Z
    1> count : 1 2022-12-31T00:00:00Z ~ 2022-12-31T00:00:00Z
    2> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
    3> count : 25 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z
    4> count : 25 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z
    5> count : 25 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z
    6> count : 25 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z
    7> count : 25 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z

     

    AllowedLateness.

     

    AllowedLateness 는 Watermark 의 OutOfOrderness 의 범위를 넘어서는 지연 이벤트를 처리하는 방식입니다.

    아래의 예시처럼 WindowedDataStream 에 allowedLateness 를 적용할 수 있구요.

    저는 100 시간의 지연을 처리하기 위해서 아래와 같이 Window Operation 을 구현하였습니다.

     

    DataStream<TestEventAggregation> stream = source
            .rebalance()
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<TestEvent>forBoundedOutOfOrderness(Duration.ofHours(72))
                    .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
            )
            .keyBy(event -> "_")
            .window(TumblingEventTimeWindows.of(Time.hours(24)))
            .allowedLateness(Time.hours(100))
            .process(new ProcessWindowFunction<>() {
              @Override
              public void process(String s, ProcessWindowFunction<TestEvent, TestEventAggregation, String, TimeWindow>.Context context, Iterable<TestEvent> elements, Collector<TestEventAggregation> out) {
                TestEventAggregation aggregation = new TestEventAggregation();
                for (TestEvent testEvent : elements) {
                  aggregation.append(testEvent);
                }
                out.collect(aggregation);
              }
            });

     

    처리된 출력 결과는 아래와 같습니다.

    유심히 보시면 2023-01-01 날짜의 처리 결과가 2번이 보여집니다.

    4> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z

    7> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z

    2> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z
    3> count : 1 2022-12-30T00:00:00Z ~ 2022-12-30T00:00:00Z
    4> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
    5> count : 1 2022-12-31T00:00:00Z ~ 2022-12-31T00:00:00Z
    6> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z
    7> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
    8> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z
    9> count : 25 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z
    10> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z
    1> count : 25 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z
    2> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z
    3> count : 25 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z
    4> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z
    5> count : 25 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z
    6> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z

     

    각 날짜별로 2번씩 처리된 결과를 확인할 수 있는데요.

    그 이유는 Watermark 에 의해서 Window 가 Flush 될 때에 정상적인 동작과

    AllowedLateness 에 의해서 지연된 이벤트를 처리하는 동작이 각각 수행됩니다.

    그래서 AllowedLateness 는 지연된 이벤트를 처리할 수 있는 방법을 제공해주시면

    데이터를 중복처리하는 문제가 발생합니다.

     

    따라서 AllowedLateness 를 사용할 때에는 반드시 Idempotence 한 구조를 취해야합니다.

    예를 들어, 위와 같은 데이터 처리의 최종적인 목적지가 SQL DB 의 Table 이라면 Upsert 구조를 위하여

    지연된 이벤트를 처리된 결과가 앞서 처리된 결과를 Update 하는 구조를 취하는 것이 좋습니다.

     

    마무리.

    Watermark, Window, AllowedLateness 에 대해서 글을 작성해보았습니다.

    혹시 잘못된 내용을 발견하신다면 좋은 피드백을 주시면 좋을 것 같습니다.

    감사합니다.

     

    반응형
Designed by Tistory.