-
Flink Watermark 알아보기Flink 2024. 1. 10. 22:18728x90반응형
- 목차
들어가며.
Watermark 는 Checkpoint Barrier 와 같이 Flink Data Stream 을 흐르는 하나의 레코드입니다.
Watermark 가 표현하는 정보는 오직 시간 밖에 없는데요.
특정 시각 정보를 담고 있는 Watermark 는 Data Stream 을 흐르게 됩니다.
아래의 이미지는 4개의 Event 가 Data Stream 을 흐르고 있구요.
마지막으로 Watermark 가 생성됩니다.
Watermark 가 흐르게 되면서 Source 와 여러 Transformation Task 들을 거치고 마지막으로 Sink 에 도달하게 됩니다.
이 과정에서 각 Task 들은 도달한 Watermark 를 통하여 Watermark 시간까지의 Event 들은 모두 처리되었음을 알게됩니다.
다른 표현으로 앞으로 유입되는 Event 들은 모두 Watermark 보다 새로운 또는 최신의 Event 임을 알게 됩니다.
Watermark 는 이러한 방식으로 Data Stream Processing 에서 처리가 완료된 Event 의 시각 정보를 모든 Task 에게 제공하게 됩니다.
Watermark 는 왜 필요할까 ?
Watermark 는 단순히 시각 정보만을 담고 있는 데이터입니다.
"2023-10-23 10:04:01" 와 같은 시각 정보만을 가지고 있을 뿐입니다.
그래서 Watermark 라는 데이터의 모양은 큰 임팩트가 없지만,
Watermark 가 Flink Data Stream 을 흐르게 되면서,
Flink Data Stream 을 구성하는 모든 Task 들에게 중요한 정보를 전달합니다.
- Watermark 에 명시된 시간 이전의 Event 들은 모두 처리가 되었다.
- Watermark 에 명시된 시간보다 과거의 Event 는 더 이상 Data Stream 으로 유입되지 않는다.
- 지금부터는 Watermark 에 명시된 시간보다 미래의 Event, 최신의 Event 들이 유입될 것이다.
의 메시지를 의미합니다.
Stream Processing Application 들은 시계열의 데이터 처리를 수행하기 때문에 시간의 순서를 정렬하는 것이 중요한 경우가 많습니다.
그리고 Sorting 과 같은 Aggregation 을 수행하기 위해서는 당연히 정렬 대상인 Event 들이 모여야겠죠.
이 과정에서 어디부터 어디까지의 Event 를 집계할 것인지에 대한 기준이 중요합니다.
그 기준이 Watermark 입니다.
아래 이미지와 같이 시간이 뒤섞인 Event 들이 유입되고 있습니다.
그리고 이 Event 들을 Window Operator 내부로 흘러들어갑니다.
Watermark 는 Event 의 Timestamp 보다 2분 정도 과거로 설정하였습니다.
저는 Network Delay 가 2분 이상 발생하지 않는다는 가정을 하였습니다.
따라서 Watermark 는 1h 15m 보다 2분 과거인 1h 13m 으로 설정됩니다.
Window Operator 에 Watermark 가 유입되고 나면 Window Operator 는 더 이상 1h 13m 보다 과거의 Event 는 유입되지 않는다고 판단합니다.
그래서 Watermark 가 명시한 시간보다 과거의 Event 들은 모두 Purge 됩니다.
Purge 는 Downstream Task 로 Event 를 전달하는 기능입니다.
이러한 관점에서 시간의 순서가 100% 보장되지 않는 현실에서 Watermark 를 통해 안정적인 시계열의 Stream Processing 을 수행할 수 있습니다.
Allowed Lateness.
현실 세계에서 Flink Data Stream 으로 유입되는 Event 들의 순서는 보장되지 않는다고 말씀드렸습니다.
그리고 Watermark 또한 Network Delay 를 고려한다고 하더라도 항상 예외는 발생하기 마련이죠.
그래서 Flink Window Operator 는 Allowed Lateness 라는 기능을 제공합니다.
Allowed Latest 는 이미 Watermark 가 흐른 뒤에
"Watermark 가 명시한 시각보다 과거의 Event 가 유입되더라도 특정 시간만큼만 Delay 를 허용을 해주자"
라는 느낌의 Latency 를 허용하는 범위입니다.
아래와 같은 형식으로 Window Operator 와 함께 사용되구요.
allowedLateness(Time.hour(1)) 는 Watermark 보다 1시간까지만 Delay 를 허용하겠다는 의미입니다.
inputStream .keyBy(...) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .allowedLateness(Time.hour(1)) // Set allowed lateness to 1 minute .apply(new MyWindowFunction());
아래의 상황을 예시로 들어보겠습니다.
Allowed Latest 는 1시간입니다.
그리고 어쩌다보니 Watermark (2시 15분) 가 이미 흐른 뒤로 5개의 과거 Event 가 뒤늦게 유입됩니다.
1시 10분, 1시 11분, 1시 12분, 1시 15분, 1시 16분 의 Event 들입니다.
사실상 Watermark 보다 대략 1시간 정도 늦은 Event 들이기 때문에 비정상적인 Event 들입니다.
하지만 Window Operator 의 Allowed Lateness 를 1시간으로 설정하였기 때문에
2시 15분의 Watermark 보다 1시간 이내의 Event dls 1시 15분, 1시 16분 Event 는 Window Operator 에서 허용을 합니다.
그렇지만 Allowed Lateness 의 허용범위보다도 늦은 Event 들은 전부 Drop 되게 됩니다.
Watermark 코드로 구현해보기.
먼저 저는 java 11 과 Flink 1.13.2 버전으로 테스트를 진행합니다.
assignTimestampsAndWatermarks.
Watermark 는 Source 로부터 생성됩니다.
assignTimestampsAndWatermarks 함수를 적용하여 Watermark 를 Flink Data Stream 으로 흘려보낼 수 있습니다.
Event 라는 이름의 pojo 타입의 클래스를 만들었습니다.
Flink Data Stream 을 흐르게 될 데이터의 모양입니다.
class Event { Long timestamp; String data; public Event(Long timestamp,String data) { this.timestamp = timestamp; this.data = data; } }
Flink DataStream Program 에서 Source 와 Watermark 의 설정은 아래와 같습니다.
DataStream<Event> source = env.fromElements( new Event(Instant.now().toEpochMilli(), "1"), new Event(Instant.now().toEpochMilli(), "2"), new Event(Instant.now().toEpochMilli(), "3"), new Event(Instant.now().toEpochMilli(), "4") ); SerializableTimestampAssigner<Event> timestampAssigner = (element, recordTimestamp) -> element.timestamp; WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofMinutes(2)) .withTimestampAssigner(timestampAssigner); SingleOutputStreamOperator<Event> sourceWithWatermark = source .assignTimestampsAndWatermarks(watermarkStrategy);
WatermarkStrategy.
WatermarkStrategy 는 어떤 방식으로 Watermark 를 생성할 것인지 결정할 수 있습니다.
저의 경우에는 크게 두가지 설정을 하였습니다.
forBoundedOutOfOrderness(Duration.forMinutes(2)) 를 사용하여,
현재까지 발생한 Event 의 Timestamp 중에서 가장 최신의 값에서 2분을 뺀 Timestamp 를 Watermark 로 설정하겠다는 의미입니다.
그림으로 표현하게 되면 아래와 같습니다.
유입된 Event 들 중에서 가장 최신의 Event 는 1시 16분이므로 2분을 뺀 1시 14분이 Watermark 가 됩니다.
이러한 방식으로 2분 정보의 시간을 빼는 이유는 Network Delay 와 같은 지연을 고려하기 위함입니다.
"지금 유입된 이벤트 중에서 가장 최근 시각이 1시 16분이니깐 네트워크 지연이 2분 정보라고 가정하면 1시 14분까지의 이벤트들은 전부 유입되었겠군."
라고 판단하는 느낌입니다.
사실 더 많은 시간이 지연될 수도 있겠죠.
따라서 Watermark 를 설정하는 시간에 대한 정답을 없으며, 서비스의 허용 범위를 찾는 것이 중요할 것 같습니다.
TimestampAssigner.
TimestampAssigner 는 Event 데이터 중에서 어떤 Field 가 시각 정보를 가지는지 선택하는 함수입니다.
제가 만든 Event 라는 클래스는 단순히 timestamp 와 data 만을 가집니다.
그래서 Event.timestamp 가 시간 정보라는 것을 Watermark Generator 에게 알려줍니다.
위 코드로 구현한 Flink DataStream API Program 은 실제 Flink Dashbaord UI 에서 아래와 같이 표현됩니다.
"Source: Custom Source" -> "Timestamps/Watermarks"
Watermark Generator 는 Source Operator 에서 생성한 Event 를 받아 Watermark 를 생성하는 구조입니다.
그리고 아래 이미지는 Watermark 를 전달받은 Downstream Operator 의 Metric 도표입니다.
꾸준히 증가하는게 보이시죠 ?
마치며.
다음에 Flink Window 에 대한 글을 작성하면서 Watermark 의 추가적인 내용에 대해서 심도있게 다뤄보도록 하겠습니다.
감사합니다.
반응형'Flink' 카테고리의 다른 글
Flink StreamGraph 알아보기 (0) 2024.01.11 Flink State 알아보기 (0) 2024.01.10 Flink Checkpoint 알아보기 (0) 2024.01.10 [Flink] 바이너리 파일 실행하기 (Binary Execution File) (0) 2023.12.29 Flink KeyedStream 알아보기 (0) 2023.10.11