-
[Flink] Window AllowedLateness 알아보기 (Watermark)Flink 2024. 2. 12. 08:33728x90반응형
- 목차
들어가며.
AllowedLateness 은 Window Operation 에서 사용되는 옵션입니다.
AllowedLateness 는 이름 그대로 지연됨을 허용한다는 의미인데요.
Flink 와 같은 이벤트 스트리밍 처리 도메인에서는 모든 이벤트들이 뒤섞인 순서로 유입된다는 대전제가 존재합니다.
네트워크 지연이 가장 큰 원인인데요.
데이터 소스에서 데이터가 생성된 순서대로 이벤트가 유입된다는 보장이 없습니다.
그래서 Window Operation 의 AllowedLateness 설정을 통해서 순서가 뒤섞인 이벤트들을 순서대로 처리할 수 있습니다.
위 이미지처럼 1, 2, 3, 4 의 순서대로 데이터는 생성됩니다.
하지만 복잡한 네트워크를 통하게 되면, Flink Application 으로 유입되는 데이터들의 순서는 보장되지 않습니다.
이번 글에서는 Flink Window Operator 의 AllowedLateness 설정이 어떻게 지연된 메시지를 처리하는지에 대한 설명을 작성해보려고 합니다.
Watermark.
https://westlife0615.tistory.com/573
먼저 Watermark 에 대한 간단한 설명을 하려고 합니다.
위 링크는 제가 이전에 작성한 Watermark 에 대한 글이며, 한번 읽어보시는 것을 추천합니다.
Watermark 는 Flink 데이터 파이프라인이 얼마 만큼의 이벤트 시간을 처리하였는지 나타내는 지표입니다.
간단히 그림으로 설명을 드리면 아래와 같습니다.
아래 그림과 같이 1분 단위로 생성되는 이벤트들이 존재합니다.
2023.01.01 14:44:00 부터 1분 단위로 이벤트들이 생성됩니다.
그리고 이벤트들은 아래 이미지와 같이 복잡한 데이터 처리 파이프라인을 흐르게 됩니다.
이때 Window Operator 같은 Stateful Operator 는 시간을 기준으로 여러가지 동작을 수행하게 되는데요.
보통 1시간 간격으로 Downstream 으로 Flush 하는 것과 같은 동작을 수행하게 됩니다.
이때, 이러한 Flush 같은 Action 을 Trigger 하는 주체가 바로 Watermark 입니다.
그리고 Watermark 가 흐르게 되면서 이제 Flush 를 Trigger 하게 되는데,
아래 예시는 3분 간격의 Tumbling Window 를 표현한 예시입니다.
Window 는 2023.01.01 14:44:00 부터 2023.01.01 14:47:00 까지의 데이터를 처리하였고,
2023.01.01 14:47:00 인 Watermark 가 흐르게 되면서 Window Operator 는 2023.01.01 14:47:00 까지의 이벤트가 다 처리되었다고 판단하고 Downstream 으로 Flush 하게 됩니다.
즉, 요약하자면 Watermark 는 Timestamp 정보를 가집니다.
그리고 Watermark 의 상징적인 의미는 Watermark 의 timestamp 까지의 이벤트들은 모두 유입( 또는 처리) 되었다는 의미를 내포합니다.
그래서 Window Operator 와 같이 시간에 의존적인 Stateful Operator 들은 Watermark 를 기준으로 동작하게 됩니다.
Window.
AllowedLateness 를 설명하기 이전에 간단한 Tumbling Window 예시를 작성해보도록 하겠습니다.
< TestEvent.java >
아래 클래스는 Flink Application 에서 사용할 간단한 TestEvent class 입니다.
Field 는 timestamp 와 data 두개로 구성됩니다.
Long timestamp 는 Event Time Semantics 를 적용한 시각 정보를 의미하구요.
String data Field 는 이름 그대로 데이터를 저장할 용도로 사용됩니다.
package com.westlife.model; import java.time.Instant; public class TestEvent { public Long timestamp; public String data; public TestEvent() {} public TestEvent(Long timestamp, String data) { this.timestamp = timestamp; this.data = data; } public String toString() { return String.format("%s %s", Instant.ofEpochMilli(timestamp), data); } }
< TestEventAggregation.java >
그리고 아래의 class 는 Window Operator 에 의해서 Aggregation 된 이후의 상태를 표현하는 class 입니다.
Integer count 는 누적된 TestEvent 의 갯수를 뜻하구요.
List<Long> timestamps Field 로 TestEvent 의 timestamp 들이 추가됩니다.
package com.westlife.model; import java.time.Instant; import java.util.*; public class TestEventAggregation { public Integer count; public List<Long> timestamps; public TestEventAggregation() { this.count = 0; timestamps = new ArrayList<>(); } public void append(TestEvent testEvent) { this.count++; this.timestamps.add(testEvent.timestamp); } public void append(Long timestamp) { this.count++; this.timestamps.add(timestamp); } public String toString() { long minTimestamp = this.timestamps.stream().sorted().findFirst().get(); long maxTimestamp = this.timestamps.stream().sorted((Long a, Long b) -> (int)(b - a)).findFirst().get(); return String.format("count : %s %s ~ %s", count, Instant.ofEpochMilli(minTimestamp), Instant.ofEpochMilli(maxTimestamp)); } }
< Main.class >
아래 예시는 Tumbling Window 를 구현한 간단한 예시 프로그램입니다.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<TestEvent> source = env.addSource(new SourceFunction<>() { private final Instant now = Instant.parse("2023-01-01T00:00:00.000Z"); @Override public void run(SourceContext<TestEvent> ctx) throws Exception { int incremental = 0; while (true) { Thread.sleep(100); incremental++; ctx.collect(new TestEvent(now.plus(incremental, ChronoUnit.HOURS).toEpochMilli(), String.valueOf(incremental))); } } @Override public void cancel() { } }); DataStream<TestEventAggregation> stream = source .assignTimestampsAndWatermarks(WatermarkStrategy.<TestEvent>forBoundedOutOfOrderness(Duration.ofHours(0)) .withTimestampAssigner((element, recordTimestamp) -> element.timestamp) ) .keyBy(event -> "_") .window(TumblingEventTimeWindows.of(Time.hours(24))) .process(new ProcessWindowFunction<>() { @Override public void process(String s, ProcessWindowFunction<TestEvent, TestEventAggregation, String, TimeWindow>.Context context, Iterable<TestEvent> elements, Collector<TestEventAggregation> out) { TestEventAggregation aggregation = new TestEventAggregation(); for (TestEvent testEvent : elements) { aggregation.append(testEvent); } out.collect(aggregation); } }); stream.print(); env.execute(); }
위 프로그램의 그래프는 아래처럼 표현이 되구요.
Source -> Watermark Generator -> Window Operator -> Sink 의 데이터 처리 흐름을 갖습니다.
Source.
Source 는 "2023-01-01 00:00:00" 시간부터 1시간 간격으로 이벤트를 생성합니다.
데이터 타입은 TestEvent class 를 사용하구요.
TestEvent 의 timestamp Field 에 시각 정보를 저장합니다.
Watermark Generator.
TestEvent class 의 timestamp Field 를 기준으로 Event Time 과 Watermark 를 설정하였습니다.
직접 구현한 SourceFunction 으로부터 생성되는 데이터는 순서가 보장되기 때문에
forBoundedOutOfOrderness 를 0 으로 설정하였습니다.
Tumbling Window.
24시간을 주기로 Tumbling Window 를 설정합니다.
그리고 TestEvent class 는 TestEventAggregation class 로 Aggregation 됩니다.
출력 결과는 아래와 같습니다.
Tumbling Window 의 기준 시간이 24시간입니다.
따라서 출력되는 TestEventAggregation class 의 count 는 24개, 그리고 timestamp 범위는 00시 ~ 23시 를 가집니다.
9> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z 10> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z 1> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z 2> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z 3> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z 4> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z 5> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z 6> count : 24 2023-01-08T00:00:00Z ~ 2023-01-08T23:00:00Z 7> count : 24 2023-01-09T00:00:00Z ~ 2023-01-09T23:00:00Z 8> count : 24 2023-01-10T00:00:00Z ~ 2023-01-10T23:00:00Z 9> count : 24 2023-01-11T00:00:00Z ~ 2023-01-11T23:00:00Z
지연된 이벤트 처리하기.
위 예시에서 Source Function 부분은 약간 수정하겠습니다.
SourceFunction 의 익명클래스에서 run 함수 내부에 지연 이벤트를 추가하는 코드를 작성하였습니다.
24시간을 주기로 3일이 지연된 이벤트가 유입됩니다.
DataStream<TestEvent> source = env.addSource(new SourceFunction<>() { private final Instant now = Instant.parse("2023-01-01T00:00:00.000Z"); @Override public void run(SourceContext<TestEvent> ctx) throws Exception { int incremental = -1; while (true) { Thread.sleep(100); incremental++; ctx.collect(new TestEvent(now.plus(incremental, ChronoUnit.HOURS).toEpochMilli(), String.valueOf(incremental))); if (incremental % 24 == 0) { TestEvent late3DaysEvent = new TestEvent(now.plus(incremental, ChronoUnit.HOURS) .minus(72, ChronoUnit.HOURS) .toEpochMilli(), String.valueOf(incremental)); ctx.collect(late3DaysEvent); } } } @Override public void cancel() { } });
아래 이미지와 같이 0시부터 23시까지 이벤트가 생성된 이후에 3일 이전의 데이터가 생성되는 구조입니다.
아래 출력처럼 2022-12-29 를 제외한 모든 지연 이벤트들은 무시됩니다.
2> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z 3> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z 4> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z 5> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z 6> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z 7> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z 8> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z 9> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z
forBoundedOutOfOrderness 적용.
forBoundedOutOfOrderness 를 적용함으로써 지연된 이벤트에 대한 처리를 수행할 수 있습니다.
forBoundedOutOfOrderness 의 값으로 72시간을 적용하여 72시간 지연된 이벤트를 처리할 수 있습니다.
이렇게 되면 현재 Event 의 timestamp 보다 72 시간이 지난 이후에 Watermark 가 생성되기 때문에 과거 이벤트를 누락시키지 않고 처리할 수 있습니다.
아래의 구조처럼 Event 가 생성되고, Event 의 Timestamp 보다 72 시간 이전의 값을 가지는 Watermark 가 생성됩니다.
이렇게 함으로써 72시간 지연된 이벤트를 누락시키지 않고 적절히 처리할 수 있습니다.
대신 그만큼 Window 에서 처리되는 시간이 더 길어지게 됩니다.
데이터 처리의 출력 결과는 아래와 같습니다.
72시간 지연된 이벤트가 처리되어 TestEventAggregation 의 Count 는 25 개입니다.
9> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z 10> count : 1 2022-12-30T00:00:00Z ~ 2022-12-30T00:00:00Z 1> count : 1 2022-12-31T00:00:00Z ~ 2022-12-31T00:00:00Z 2> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z 3> count : 25 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z 4> count : 25 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z 5> count : 25 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z 6> count : 25 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z 7> count : 25 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z
AllowedLateness.
AllowedLateness 는 Watermark 의 OutOfOrderness 의 범위를 넘어서는 지연 이벤트를 처리하는 방식입니다.
아래의 예시처럼 WindowedDataStream 에 allowedLateness 를 적용할 수 있구요.
저는 100 시간의 지연을 처리하기 위해서 아래와 같이 Window Operation 을 구현하였습니다.
DataStream<TestEventAggregation> stream = source .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.<TestEvent>forBoundedOutOfOrderness(Duration.ofHours(72)) .withTimestampAssigner((element, recordTimestamp) -> element.timestamp) ) .keyBy(event -> "_") .window(TumblingEventTimeWindows.of(Time.hours(24))) .allowedLateness(Time.hours(100)) .process(new ProcessWindowFunction<>() { @Override public void process(String s, ProcessWindowFunction<TestEvent, TestEventAggregation, String, TimeWindow>.Context context, Iterable<TestEvent> elements, Collector<TestEventAggregation> out) { TestEventAggregation aggregation = new TestEventAggregation(); for (TestEvent testEvent : elements) { aggregation.append(testEvent); } out.collect(aggregation); } });
처리된 출력 결과는 아래와 같습니다.
유심히 보시면 2023-01-01 날짜의 처리 결과가 2번이 보여집니다.
4> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
7> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z
2> count : 1 2022-12-29T00:00:00Z ~ 2022-12-29T00:00:00Z 3> count : 1 2022-12-30T00:00:00Z ~ 2022-12-30T00:00:00Z 4> count : 24 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z 5> count : 1 2022-12-31T00:00:00Z ~ 2022-12-31T00:00:00Z 6> count : 24 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z 7> count : 25 2023-01-01T00:00:00Z ~ 2023-01-01T23:00:00Z 8> count : 24 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z 9> count : 25 2023-01-02T00:00:00Z ~ 2023-01-02T23:00:00Z 10> count : 24 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z 1> count : 25 2023-01-03T00:00:00Z ~ 2023-01-03T23:00:00Z 2> count : 24 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z 3> count : 25 2023-01-04T00:00:00Z ~ 2023-01-04T23:00:00Z 4> count : 24 2023-01-06T00:00:00Z ~ 2023-01-06T23:00:00Z 5> count : 25 2023-01-05T00:00:00Z ~ 2023-01-05T23:00:00Z 6> count : 24 2023-01-07T00:00:00Z ~ 2023-01-07T23:00:00Z
각 날짜별로 2번씩 처리된 결과를 확인할 수 있는데요.
그 이유는 Watermark 에 의해서 Window 가 Flush 될 때에 정상적인 동작과
AllowedLateness 에 의해서 지연된 이벤트를 처리하는 동작이 각각 수행됩니다.
그래서 AllowedLateness 는 지연된 이벤트를 처리할 수 있는 방법을 제공해주시면
데이터를 중복처리하는 문제가 발생합니다.
따라서 AllowedLateness 를 사용할 때에는 반드시 Idempotence 한 구조를 취해야합니다.
예를 들어, 위와 같은 데이터 처리의 최종적인 목적지가 SQL DB 의 Table 이라면 Upsert 구조를 위하여
지연된 이벤트를 처리된 결과가 앞서 처리된 결과를 Update 하는 구조를 취하는 것이 좋습니다.
마무리.
Watermark, Window, AllowedLateness 에 대해서 글을 작성해보았습니다.
혹시 잘못된 내용을 발견하신다면 좋은 피드백을 주시면 좋을 것 같습니다.
감사합니다.
반응형'Flink' 카테고리의 다른 글
[Flink] JSON FileSink 만들어보기 (Row Format) (0) 2024.02.12 [Flink] Tumbling Time Window 알아보기 (TumblingEventTimeWindows) (0) 2024.02.12 [Kryo] Kyro Serialization 알아보기 (0) 2024.02.05 [Flink] FileSource 알아보기 (json, csv, parquet, avro) (0) 2024.02.04 [Flink] KafkaSource Connector 알아보기 (0) 2024.02.04