ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flink Watermark 알아보기
    Flink 2024. 1. 10. 22:18
    728x90
    반응형

    - 목차

     

    들어가며.

    Watermark 는 Checkpoint Barrier 와 같이 Flink Data Stream 을 흐르는 하나의 레코드입니다.

    Watermark 가 표현하는 정보는 오직 시간 밖에 없는데요.

    특정 시각 정보를 담고 있는 Watermark 는 Data Stream 을 흐르게 됩니다.

    아래의 이미지는 4개의 Event 가 Data Stream 을 흐르고 있구요.

    마지막으로 Watermark 가 생성됩니다.

    Watermark in Data Stream

     

    Watermark 가 흐르게 되면서 Source 와 여러 Transformation Task 들을 거치고 마지막으로 Sink 에 도달하게 됩니다.

    이 과정에서 각 Task 들은 도달한 Watermark 를 통하여 Watermark 시간까지의 Event 들은 모두 처리되었음을 알게됩니다.

    다른 표현으로 앞으로 유입되는 Event 들은 모두 Watermark 보다 새로운 또는 최신의 Event 임을 알게 됩니다.

     

     

    Watermark 는 이러한 방식으로 Data Stream Processing 에서 처리가 완료된 Event 의 시각 정보를 모든 Task 에게 제공하게 됩니다.

     

    Watermark 는 왜 필요할까 ?

    Watermark 는 단순히 시각 정보만을 담고 있는 데이터입니다.

    "2023-10-23 10:04:01" 와 같은 시각 정보만을 가지고 있을 뿐입니다.

    그래서 Watermark 라는 데이터의 모양은 큰 임팩트가 없지만,

    Watermark 가 Flink Data Stream 을 흐르게 되면서,

    Flink Data Stream 을 구성하는 모든 Task 들에게 중요한 정보를 전달합니다.

     

    - Watermark 에 명시된 시간 이전의 Event 들은 모두 처리가 되었다.

    - Watermark 에 명시된 시간보다 과거의 Event 는 더 이상 Data Stream 으로 유입되지 않는다.

    - 지금부터는 Watermark 에 명시된 시간보다 미래의 Event, 최신의 Event 들이 유입될 것이다.

     

    의 메시지를 의미합니다.

    Stream Processing Application 들은 시계열의 데이터 처리를 수행하기 때문에 시간의 순서를 정렬하는 것이 중요한 경우가 많습니다.

    그리고 Sorting 과 같은 Aggregation 을 수행하기 위해서는 당연히 정렬 대상인 Event 들이 모여야겠죠.

    이 과정에서 어디부터 어디까지의 Event 를 집계할 것인지에 대한 기준이 중요합니다.

    그 기준이 Watermark 입니다.

     

    아래 이미지와 같이 시간이 뒤섞인 Event 들이 유입되고 있습니다.

    그리고 이 Event 들을 Window Operator 내부로 흘러들어갑니다.

    Watermark 는 Event 의 Timestamp 보다 2분 정도 과거로 설정하였습니다.

    저는 Network Delay 가 2분 이상 발생하지 않는다는 가정을 하였습니다.

    따라서 Watermark 는 1h 15m 보다 2분 과거인 1h 13m 으로 설정됩니다.

     

    window, watermark and purge

     

    Window Operator 에 Watermark 가 유입되고 나면 Window Operator 는 더 이상 1h 13m 보다 과거의 Event 는 유입되지 않는다고 판단합니다.

    그래서 Watermark 가 명시한 시간보다 과거의 Event 들은 모두 Purge 됩니다.

    Purge 는 Downstream Task 로 Event 를 전달하는 기능입니다.

     

    이러한 관점에서 시간의 순서가 100% 보장되지 않는 현실에서 Watermark 를 통해 안정적인 시계열의 Stream Processing 을 수행할 수 있습니다.

     

    Allowed Lateness.

    현실 세계에서 Flink Data Stream 으로 유입되는 Event 들의 순서는 보장되지 않는다고 말씀드렸습니다.

    그리고 Watermark 또한 Network Delay 를 고려한다고 하더라도 항상 예외는 발생하기 마련이죠.

    그래서 Flink Window Operator 는 Allowed Lateness 라는 기능을 제공합니다.

    Allowed Latest 는 이미 Watermark 가 흐른 뒤에

    "Watermark 가 명시한 시각보다 과거의 Event 가 유입되더라도 특정 시간만큼만 Delay 를 허용을 해주자"

    라는 느낌의 Latency 를 허용하는 범위입니다.

     

    아래와 같은 형식으로 Window Operator 와 함께 사용되구요.

    allowedLateness(Time.hour(1)) 는 Watermark 보다 1시간까지만 Delay 를 허용하겠다는 의미입니다.

    inputStream
        .keyBy(...)
        .window(TumblingEventTimeWindows.of(Time.minutes(10)))
        .allowedLateness(Time.hour(1))  // Set allowed lateness to 1 minute
        .apply(new MyWindowFunction());

     

    아래의 상황을 예시로 들어보겠습니다.

    Allowed Latest 는 1시간입니다.

    그리고 어쩌다보니 Watermark (2시 15분) 가 이미 흐른 뒤로 5개의 과거 Event 가 뒤늦게 유입됩니다.

    1시 10분, 1시 11분, 1시 12분, 1시 15분, 1시 16분 의 Event 들입니다.

    사실상 Watermark 보다 대략 1시간 정도 늦은 Event 들이기 때문에 비정상적인 Event 들입니다.

    하지만 Window Operator 의 Allowed Lateness 를 1시간으로 설정하였기 때문에

    2시 15분의 Watermark 보다 1시간 이내의 Event dls 1시 15분, 1시 16분 Event 는 Window Operator 에서 허용을 합니다.

    그렇지만 Allowed Lateness 의 허용범위보다도 늦은 Event 들은 전부 Drop 되게 됩니다.

     

     

     

    Watermark 코드로 구현해보기.

    먼저 저는 java 11 과 Flink 1.13.2 버전으로 테스트를 진행합니다.

     

    assignTimestampsAndWatermarks.

    Watermark 는 Source 로부터 생성됩니다.

    assignTimestampsAndWatermarks 함수를 적용하여 Watermark 를 Flink Data Stream 으로 흘려보낼 수 있습니다.

     

    Event 라는 이름의 pojo 타입의 클래스를 만들었습니다.

    Flink Data Stream 을 흐르게 될 데이터의 모양입니다.

    class Event {
      Long timestamp;
      String data;
    
      public Event(Long timestamp,String data) {
        this.timestamp = timestamp;
        this.data = data;
      }
    }

     

    Flink DataStream Program 에서 Source 와 Watermark 의 설정은 아래와 같습니다.

        DataStream<Event> source = env.fromElements(
                new Event(Instant.now().toEpochMilli(), "1"),
                new Event(Instant.now().toEpochMilli(), "2"),
                new Event(Instant.now().toEpochMilli(), "3"),
                new Event(Instant.now().toEpochMilli(), "4")
        );
    
        SerializableTimestampAssigner<Event> timestampAssigner = 
                (element, recordTimestamp) -> element.timestamp;
    
        WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofMinutes(2))
                .withTimestampAssigner(timestampAssigner);
        
        SingleOutputStreamOperator<Event> sourceWithWatermark = source
                .assignTimestampsAndWatermarks(watermarkStrategy);

     

    WatermarkStrategy.

    WatermarkStrategy 는 어떤 방식으로 Watermark 를 생성할 것인지 결정할 수 있습니다.

    저의 경우에는 크게 두가지 설정을 하였습니다.

     

    forBoundedOutOfOrderness(Duration.forMinutes(2)) 를 사용하여,

    현재까지 발생한 Event 의 Timestamp 중에서 가장 최신의 값에서 2분을 뺀 Timestamp 를 Watermark 로 설정하겠다는 의미입니다.

     

    그림으로 표현하게 되면 아래와 같습니다.

    유입된 Event 들 중에서 가장 최신의 Event 는 1시 16분이므로 2분을 뺀 1시 14분이 Watermark 가 됩니다.

    이러한 방식으로 2분 정보의 시간을 빼는 이유는 Network Delay 와 같은 지연을 고려하기 위함입니다.

     

     

    "지금 유입된 이벤트 중에서 가장 최근 시각이 1시 16분이니깐 네트워크 지연이 2분 정보라고 가정하면 1시 14분까지의 이벤트들은 전부 유입되었겠군."

    라고 판단하는 느낌입니다.

    사실 더 많은 시간이 지연될 수도 있겠죠.

    따라서 Watermark 를 설정하는 시간에 대한 정답을 없으며, 서비스의 허용 범위를 찾는 것이 중요할 것 같습니다.

     

    TimestampAssigner.

    TimestampAssigner 는 Event 데이터 중에서 어떤 Field 가 시각 정보를 가지는지 선택하는 함수입니다.

    제가 만든 Event 라는 클래스는 단순히 timestamp 와 data 만을 가집니다.

    그래서 Event.timestamp 가 시간 정보라는 것을 Watermark Generator 에게 알려줍니다.

     

     

    위 코드로 구현한 Flink DataStream API Program 은 실제 Flink Dashbaord UI 에서 아래와 같이 표현됩니다.

    "Source: Custom Source" -> "Timestamps/Watermarks"

    Watermark Generator 는 Source Operator 에서 생성한 Event 를 받아 Watermark 를 생성하는 구조입니다.

     

    그리고 아래 이미지는 Watermark 를 전달받은 Downstream Operator 의 Metric 도표입니다.

    꾸준히 증가하는게 보이시죠 ?

     

     

     

     

    마치며.

    다음에 Flink Window 에 대한 글을 작성하면서 Watermark 의 추가적인 내용에 대해서 심도있게 다뤄보도록 하겠습니다.

    감사합니다.

     

     

    반응형

    'Flink' 카테고리의 다른 글

    Flink StreamGraph 알아보기  (0) 2024.01.11
    Flink State 알아보기  (0) 2024.01.10
    Flink Checkpoint 알아보기  (0) 2024.01.10
    [Flink] 바이너리 파일 실행하기 (Binary Execution File)  (0) 2023.12.29
    Flink KeyedStream 알아보기  (0) 2023.10.11
Designed by Tistory.