ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Tumbling Time Window 알아보기 (TumblingEventTimeWindows)
    Flink 2024. 2. 12. 08:33
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Tumbling Time Window 는 DataStream 를 시간 단위로 구분지을 수 있는 Windowing 방식입니다.

    Tumbling Window 는 fixed-size 와 non-overlapping 의 특징을 가집니다.

    이를 그림으로 표현하면 아래와 같은데요.

     

    출처 : https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/

     

    Tumbling Time Window 의 시간 기준은 분, 시간, 일에 구애받지 않고 자유롭게 설정할 수 있습니다.

    그리고 고정된 시간 크기만큼 데이터 스트림을 작업 청크 단위로 나누어 관리합니다.

     

    Tumbling Time Window 예시.

     

    아래 예시는 "2023-01-01 00:00:00" 시간부터 1시간 간격으로 5일간의 시각 정보를 생성하는 예시 프로그램입니다.

    Instant class 를 Event 로써 사용하구요.

    아래 예시를 통해서 Tumblign Time Window 에 대한 여러 테스트를 진행해보겠습니다.

    package com.westlife.jobs;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.time.*;
    import java.time.temporal.ChronoUnit;
    import java.util.*;
    
    public class TestTumblingWindow {
      public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final Instant now = Instant.parse("2023-01-01T00:00:00.000Z");
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<Instant> source = env.fromSequence(0, 24 * 5)
                .map(i -> now.plus(i, ChronoUnit.HOURS));
    
        DataStream<List<Instant>> stream = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Instant>forBoundedOutOfOrderness(Duration.ofHours(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.toEpochMilli())
        ).keyBy(event -> "_")
                .window(TumblingEventTimeWindows.of(Time.hours(24)))
                .process(new ProcessWindowFunction<>() {
                  @Override
                  public void process(String s, ProcessWindowFunction<Instant, List<Instant>, String, TimeWindow>.Context context, Iterable<Instant> elements, Collector<List<Instant>> out) {
                    List<Instant> aggregation = new ArrayList<>();
                    for (Instant event : elements) {
                      aggregation.add(event);
                    }
                    out.collect(aggregation);
                  }
                });
    
        stream.flatMap((FlatMapFunction<List<Instant>, Object>) (value, out) -> System.out.println(String.format("count : %s, values : %s", value.size(), value))).returns(Object.class);
        env.execute();
      }
    }

     

    Tumbling Time Window 24시간.

     

    아래 결과는 TumblingEventTimeWindows 의 값은 24시간으로 설정한 결과입니다.

    시간 기준이 24시간이므로 Window 가 Chunk 로 나누는 기준은 0시 이상 24시 미만인 범위가 됩니다.

    그래서 각 날짜를 기준으로 24개의 이벤트들이 Tumbling Time Window 의 Chunk 가 됩니다.

    count : 24, values : [2023-01-01T00:00:00Z, 2023-01-01T01:00:00Z, 2023-01-01T02:00:00Z, 2023-01-01T03:00:00Z, 2023-01-01T04:00:00Z, 2023-01-01T05:00:00Z, 2023-01-01T06:00:00Z, 2023-01-01T07:00:00Z, 2023-01-01T08:00:00Z, 2023-01-01T09:00:00Z, 2023-01-01T10:00:00Z, 2023-01-01T11:00:00Z, 2023-01-01T12:00:00Z, 2023-01-01T13:00:00Z, 2023-01-01T14:00:00Z, 2023-01-01T15:00:00Z, 2023-01-01T16:00:00Z, 2023-01-01T17:00:00Z, 2023-01-01T18:00:00Z, 2023-01-01T19:00:00Z, 2023-01-01T20:00:00Z, 2023-01-01T21:00:00Z, 2023-01-01T22:00:00Z, 2023-01-01T23:00:00Z]
    count : 24, values : [2023-01-02T01:00:00Z, 2023-01-02T02:00:00Z, 2023-01-02T03:00:00Z, 2023-01-02T04:00:00Z, 2023-01-02T05:00:00Z, 2023-01-02T06:00:00Z, 2023-01-02T07:00:00Z, 2023-01-02T08:00:00Z, 2023-01-02T09:00:00Z, 2023-01-02T10:00:00Z, 2023-01-02T11:00:00Z, 2023-01-02T12:00:00Z, 2023-01-02T00:00:00Z, 2023-01-02T13:00:00Z, 2023-01-02T14:00:00Z, 2023-01-02T15:00:00Z, 2023-01-02T16:00:00Z, 2023-01-02T17:00:00Z, 2023-01-02T18:00:00Z, 2023-01-02T19:00:00Z, 2023-01-02T20:00:00Z, 2023-01-02T21:00:00Z, 2023-01-02T22:00:00Z, 2023-01-02T23:00:00Z]
    count : 24, values : [2023-01-03T13:00:00Z, 2023-01-03T14:00:00Z, 2023-01-03T15:00:00Z, 2023-01-03T16:00:00Z, 2023-01-03T17:00:00Z, 2023-01-03T18:00:00Z, 2023-01-03T19:00:00Z, 2023-01-03T20:00:00Z, 2023-01-03T21:00:00Z, 2023-01-03T22:00:00Z, 2023-01-03T23:00:00Z, 2023-01-03T01:00:00Z, 2023-01-03T02:00:00Z, 2023-01-03T03:00:00Z, 2023-01-03T04:00:00Z, 2023-01-03T05:00:00Z, 2023-01-03T06:00:00Z, 2023-01-03T07:00:00Z, 2023-01-03T08:00:00Z, 2023-01-03T09:00:00Z, 2023-01-03T10:00:00Z, 2023-01-03T11:00:00Z, 2023-01-03T12:00:00Z, 2023-01-03T00:00:00Z]
    count : 24, values : [2023-01-04T00:00:00Z, 2023-01-04T13:00:00Z, 2023-01-04T14:00:00Z, 2023-01-04T15:00:00Z, 2023-01-04T16:00:00Z, 2023-01-04T17:00:00Z, 2023-01-04T18:00:00Z, 2023-01-04T19:00:00Z, 2023-01-04T20:00:00Z, 2023-01-04T21:00:00Z, 2023-01-04T22:00:00Z, 2023-01-04T23:00:00Z, 2023-01-04T01:00:00Z, 2023-01-04T02:00:00Z, 2023-01-04T03:00:00Z, 2023-01-04T04:00:00Z, 2023-01-04T05:00:00Z, 2023-01-04T06:00:00Z, 2023-01-04T07:00:00Z, 2023-01-04T08:00:00Z, 2023-01-04T09:00:00Z, 2023-01-04T10:00:00Z, 2023-01-04T11:00:00Z, 2023-01-04T12:00:00Z]
    count : 24, values : [2023-01-05T01:00:00Z, 2023-01-05T02:00:00Z, 2023-01-05T03:00:00Z, 2023-01-05T04:00:00Z, 2023-01-05T05:00:00Z, 2023-01-05T06:00:00Z, 2023-01-05T07:00:00Z, 2023-01-05T08:00:00Z, 2023-01-05T09:00:00Z, 2023-01-05T10:00:00Z, 2023-01-05T11:00:00Z, 2023-01-05T12:00:00Z, 2023-01-05T00:00:00Z, 2023-01-05T13:00:00Z, 2023-01-05T14:00:00Z, 2023-01-05T15:00:00Z, 2023-01-05T16:00:00Z, 2023-01-05T17:00:00Z, 2023-01-05T18:00:00Z, 2023-01-05T19:00:00Z, 2023-01-05T20:00:00Z, 2023-01-05T21:00:00Z, 2023-01-05T22:00:00Z, 2023-01-05T23:00:00Z]
    count : 1, values : [2023-01-06T00:00:00Z]

     

     

    Tumbling Time Window 8시간.

    유사한 예시로 Tumbling Time Window 의 시간 기준을 8 시간으로 설정하였습니다.

    아래 출력과 같이 0 ~ 8시, 8시 ~ 16시, 16 ~ 24시 를 기준으로 Chunk 가 생성됩니다.

    count : 8, values : [2023-01-01T00:00:00Z, 2023-01-01T01:00:00Z, 2023-01-01T02:00:00Z, 2023-01-01T03:00:00Z, 2023-01-01T04:00:00Z, 2023-01-01T05:00:00Z, 2023-01-01T06:00:00Z, 2023-01-01T07:00:00Z]
    count : 8, values : [2023-01-01T08:00:00Z, 2023-01-01T09:00:00Z, 2023-01-01T10:00:00Z, 2023-01-01T11:00:00Z, 2023-01-01T12:00:00Z, 2023-01-01T13:00:00Z, 2023-01-01T14:00:00Z, 2023-01-01T15:00:00Z]
    count : 8, values : [2023-01-01T16:00:00Z, 2023-01-01T17:00:00Z, 2023-01-01T18:00:00Z, 2023-01-01T19:00:00Z, 2023-01-01T20:00:00Z, 2023-01-01T21:00:00Z, 2023-01-01T22:00:00Z, 2023-01-01T23:00:00Z]
    count : 8, values : [2023-01-02T01:00:00Z, 2023-01-02T02:00:00Z, 2023-01-02T03:00:00Z, 2023-01-02T04:00:00Z, 2023-01-02T05:00:00Z, 2023-01-02T06:00:00Z, 2023-01-02T07:00:00Z, 2023-01-02T00:00:00Z]
    count : 8, values : [2023-01-02T08:00:00Z, 2023-01-02T09:00:00Z, 2023-01-02T10:00:00Z, 2023-01-02T11:00:00Z, 2023-01-02T12:00:00Z, 2023-01-02T13:00:00Z, 2023-01-02T14:00:00Z, 2023-01-02T15:00:00Z]
    count : 8, values : [2023-01-02T16:00:00Z, 2023-01-02T17:00:00Z, 2023-01-02T18:00:00Z, 2023-01-02T19:00:00Z, 2023-01-02T20:00:00Z, 2023-01-02T21:00:00Z, 2023-01-02T22:00:00Z, 2023-01-02T23:00:00Z]
    count : 8, values : [2023-01-03T01:00:00Z, 2023-01-03T02:00:00Z, 2023-01-03T03:00:00Z, 2023-01-03T04:00:00Z, 2023-01-03T05:00:00Z, 2023-01-03T06:00:00Z, 2023-01-03T07:00:00Z, 2023-01-03T00:00:00Z]
    count : 8, values : [2023-01-03T08:00:00Z, 2023-01-03T09:00:00Z, 2023-01-03T10:00:00Z, 2023-01-03T11:00:00Z, 2023-01-03T12:00:00Z, 2023-01-03T13:00:00Z, 2023-01-03T14:00:00Z, 2023-01-03T15:00:00Z]
    count : 8, values : [2023-01-03T16:00:00Z, 2023-01-03T17:00:00Z, 2023-01-03T18:00:00Z, 2023-01-03T19:00:00Z, 2023-01-03T20:00:00Z, 2023-01-03T21:00:00Z, 2023-01-03T22:00:00Z, 2023-01-03T23:00:00Z]
    count : 8, values : [2023-01-04T01:00:00Z, 2023-01-04T02:00:00Z, 2023-01-04T03:00:00Z, 2023-01-04T04:00:00Z, 2023-01-04T05:00:00Z, 2023-01-04T06:00:00Z, 2023-01-04T07:00:00Z, 2023-01-04T00:00:00Z]
    count : 8, values : [2023-01-04T13:00:00Z, 2023-01-04T14:00:00Z, 2023-01-04T15:00:00Z, 2023-01-04T08:00:00Z, 2023-01-04T09:00:00Z, 2023-01-04T10:00:00Z, 2023-01-04T11:00:00Z, 2023-01-04T12:00:00Z]
    count : 8, values : [2023-01-04T16:00:00Z, 2023-01-04T17:00:00Z, 2023-01-04T18:00:00Z, 2023-01-04T19:00:00Z, 2023-01-04T20:00:00Z, 2023-01-04T21:00:00Z, 2023-01-04T22:00:00Z, 2023-01-04T23:00:00Z]
    count : 8, values : [2023-01-05T00:00:00Z, 2023-01-05T01:00:00Z, 2023-01-05T02:00:00Z, 2023-01-05T03:00:00Z, 2023-01-05T04:00:00Z, 2023-01-05T05:00:00Z, 2023-01-05T06:00:00Z, 2023-01-05T07:00:00Z]
    count : 8, values : [2023-01-05T13:00:00Z, 2023-01-05T14:00:00Z, 2023-01-05T15:00:00Z, 2023-01-05T08:00:00Z, 2023-01-05T09:00:00Z, 2023-01-05T10:00:00Z, 2023-01-05T11:00:00Z, 2023-01-05T12:00:00Z]
    count : 8, values : [2023-01-05T16:00:00Z, 2023-01-05T17:00:00Z, 2023-01-05T18:00:00Z, 2023-01-05T19:00:00Z, 2023-01-05T20:00:00Z, 2023-01-05T21:00:00Z, 2023-01-05T22:00:00Z, 2023-01-05T23:00:00Z]
    count : 1, values : [2023-01-06T00:00:00Z]

     

     

    Tumbling Time Window 10시간.

    이번에는 Tumbling Time Window 의 시간 기준을 10 시간으로 설정하였습니다.

    실행 결과는 아래와 같은데요.

    24의 공약수가 아닌 값을 Tumbling Time Window 의 시간 기준으로 적용하게 되면 첫번째 Chunk 가 기대와 달리

    10개의 Event 가 사용되지 않습니다.

    정확한 기준은 모르겠지만, 24시의 공약수가 아닌 Hour 를 사용하는 경우에 이러한 점을 유의하시면 좋을 것 같습니다.

     

    count : 8, values : [2023-01-01T00:00:00Z, 2023-01-01T01:00:00Z, 2023-01-01T02:00:00Z, 2023-01-01T03:00:00Z, 2023-01-01T04:00:00Z, 2023-01-01T05:00:00Z, 2023-01-01T06:00:00Z, 2023-01-01T07:00:00Z]
    count : 10, values : [2023-01-01T13:00:00Z, 2023-01-01T14:00:00Z, 2023-01-01T15:00:00Z, 2023-01-01T16:00:00Z, 2023-01-01T17:00:00Z, 2023-01-01T08:00:00Z, 2023-01-01T09:00:00Z, 2023-01-01T10:00:00Z, 2023-01-01T11:00:00Z, 2023-01-01T12:00:00Z]
    count : 10, values : [2023-01-02T01:00:00Z, 2023-01-02T02:00:00Z, 2023-01-02T03:00:00Z, 2023-01-01T18:00:00Z, 2023-01-01T19:00:00Z, 2023-01-01T20:00:00Z, 2023-01-01T21:00:00Z, 2023-01-01T22:00:00Z, 2023-01-01T23:00:00Z, 2023-01-02T00:00:00Z]
    count : 10, values : [2023-01-02T13:00:00Z, 2023-01-02T04:00:00Z, 2023-01-02T05:00:00Z, 2023-01-02T06:00:00Z, 2023-01-02T07:00:00Z, 2023-01-02T08:00:00Z, 2023-01-02T09:00:00Z, 2023-01-02T10:00:00Z, 2023-01-02T11:00:00Z, 2023-01-02T12:00:00Z]
    count : 10, values : [2023-01-02T14:00:00Z, 2023-01-02T15:00:00Z, 2023-01-02T16:00:00Z, 2023-01-02T17:00:00Z, 2023-01-02T18:00:00Z, 2023-01-02T19:00:00Z, 2023-01-02T20:00:00Z, 2023-01-02T21:00:00Z, 2023-01-02T22:00:00Z, 2023-01-02T23:00:00Z]
    count : 10, values : [2023-01-03T00:00:00Z, 2023-01-03T01:00:00Z, 2023-01-03T02:00:00Z, 2023-01-03T03:00:00Z, 2023-01-03T04:00:00Z, 2023-01-03T05:00:00Z, 2023-01-03T06:00:00Z, 2023-01-03T07:00:00Z, 2023-01-03T08:00:00Z, 2023-01-03T09:00:00Z]
    count : 10, values : [2023-01-03T13:00:00Z, 2023-01-03T14:00:00Z, 2023-01-03T15:00:00Z, 2023-01-03T16:00:00Z, 2023-01-03T17:00:00Z, 2023-01-03T18:00:00Z, 2023-01-03T19:00:00Z, 2023-01-03T10:00:00Z, 2023-01-03T11:00:00Z, 2023-01-03T12:00:00Z]
    count : 10, values : [2023-01-04T01:00:00Z, 2023-01-04T02:00:00Z, 2023-01-04T03:00:00Z, 2023-01-04T04:00:00Z, 2023-01-04T05:00:00Z, 2023-01-03T20:00:00Z, 2023-01-03T21:00:00Z, 2023-01-03T22:00:00Z, 2023-01-03T23:00:00Z, 2023-01-04T00:00:00Z]
    count : 10, values : [2023-01-04T13:00:00Z, 2023-01-04T14:00:00Z, 2023-01-04T15:00:00Z, 2023-01-04T06:00:00Z, 2023-01-04T07:00:00Z, 2023-01-04T08:00:00Z, 2023-01-04T09:00:00Z, 2023-01-04T10:00:00Z, 2023-01-04T11:00:00Z, 2023-01-04T12:00:00Z]
    count : 10, values : [2023-01-04T16:00:00Z, 2023-01-04T17:00:00Z, 2023-01-04T18:00:00Z, 2023-01-04T19:00:00Z, 2023-01-04T20:00:00Z, 2023-01-04T21:00:00Z, 2023-01-04T22:00:00Z, 2023-01-04T23:00:00Z, 2023-01-05T00:00:00Z, 2023-01-05T01:00:00Z]
    count : 10, values : [2023-01-05T02:00:00Z, 2023-01-05T03:00:00Z, 2023-01-05T04:00:00Z, 2023-01-05T05:00:00Z, 2023-01-05T06:00:00Z, 2023-01-05T07:00:00Z, 2023-01-05T08:00:00Z, 2023-01-05T09:00:00Z, 2023-01-05T10:00:00Z, 2023-01-05T11:00:00Z]
    count : 10, values : [2023-01-05T13:00:00Z, 2023-01-05T14:00:00Z, 2023-01-05T15:00:00Z, 2023-01-05T16:00:00Z, 2023-01-05T17:00:00Z, 2023-01-05T18:00:00Z, 2023-01-05T19:00:00Z, 2023-01-05T20:00:00Z, 2023-01-05T21:00:00Z, 2023-01-05T12:00:00Z]
    count : 3, values : [2023-01-05T22:00:00Z, 2023-01-05T23:00:00Z, 2023-01-06T00:00:00Z]

     

    < Tumbling Time Window 7 Hours >

    count : 5, values : [2023-01-01T00:00:00Z, 2023-01-01T01:00:00Z, 2023-01-01T02:00:00Z, 2023-01-01T03:00:00Z, 2023-01-01T04:00:00Z]
    count : 7, values : [2023-01-01T05:00:00Z, 2023-01-01T06:00:00Z, 2023-01-01T07:00:00Z, 2023-01-01T08:00:00Z, 2023-01-01T09:00:00Z, 2023-01-01T10:00:00Z, 2023-01-01T11:00:00Z]
    count : 7, values : [2023-01-01T13:00:00Z, 2023-01-01T14:00:00Z, 2023-01-01T15:00:00Z, 2023-01-01T16:00:00Z, 2023-01-01T17:00:00Z, 2023-01-01T18:00:00Z, 2023-01-01T12:00:00Z]
    count : 7, values : [2023-01-02T01:00:00Z, 2023-01-01T19:00:00Z, 2023-01-01T20:00:00Z, 2023-01-01T21:00:00Z, 2023-01-01T22:00:00Z, 2023-01-01T23:00:00Z, 2023-01-02T00:00:00Z]
    count : 7, values : [2023-01-02T02:00:00Z, 2023-01-02T03:00:00Z, 2023-01-02T04:00:00Z, 2023-01-02T05:00:00Z, 2023-01-02T06:00:00Z, 2023-01-02T07:00:00Z, 2023-01-02T08:00:00Z]
    count : 7, values : [2023-01-02T09:00:00Z, 2023-01-02T10:00:00Z, 2023-01-02T11:00:00Z, 2023-01-02T12:00:00Z, 2023-01-02T13:00:00Z, 2023-01-02T14:00:00Z, 2023-01-02T15:00:00Z]
    count : 7, values : [2023-01-02T16:00:00Z, 2023-01-02T17:00:00Z, 2023-01-02T18:00:00Z, 2023-01-02T19:00:00Z, 2023-01-02T20:00:00Z, 2023-01-02T21:00:00Z, 2023-01-02T22:00:00Z]
    count : 7, values : [2023-01-02T23:00:00Z, 2023-01-03T00:00:00Z, 2023-01-03T01:00:00Z, 2023-01-03T02:00:00Z, 2023-01-03T03:00:00Z, 2023-01-03T04:00:00Z, 2023-01-03T05:00:00Z]
    count : 7, values : [2023-01-03T06:00:00Z, 2023-01-03T07:00:00Z, 2023-01-03T08:00:00Z, 2023-01-03T09:00:00Z, 2023-01-03T10:00:00Z, 2023-01-03T11:00:00Z, 2023-01-03T12:00:00Z]
    count : 7, values : [2023-01-03T13:00:00Z, 2023-01-03T14:00:00Z, 2023-01-03T15:00:00Z, 2023-01-03T16:00:00Z, 2023-01-03T17:00:00Z, 2023-01-03T18:00:00Z, 2023-01-03T19:00:00Z]
    count : 7, values : [2023-01-03T20:00:00Z, 2023-01-03T21:00:00Z, 2023-01-03T22:00:00Z, 2023-01-03T23:00:00Z, 2023-01-04T00:00:00Z, 2023-01-04T01:00:00Z, 2023-01-04T02:00:00Z]
    count : 7, values : [2023-01-04T03:00:00Z, 2023-01-04T04:00:00Z, 2023-01-04T05:00:00Z, 2023-01-04T06:00:00Z, 2023-01-04T07:00:00Z, 2023-01-04T08:00:00Z, 2023-01-04T09:00:00Z]
    count : 7, values : [2023-01-04T13:00:00Z, 2023-01-04T14:00:00Z, 2023-01-04T15:00:00Z, 2023-01-04T16:00:00Z, 2023-01-04T10:00:00Z, 2023-01-04T11:00:00Z, 2023-01-04T12:00:00Z]
    count : 7, values : [2023-01-04T17:00:00Z, 2023-01-04T18:00:00Z, 2023-01-04T19:00:00Z, 2023-01-04T20:00:00Z, 2023-01-04T21:00:00Z, 2023-01-04T22:00:00Z, 2023-01-04T23:00:00Z]
    count : 7, values : [2023-01-05T00:00:00Z, 2023-01-05T01:00:00Z, 2023-01-05T02:00:00Z, 2023-01-05T03:00:00Z, 2023-01-05T04:00:00Z, 2023-01-05T05:00:00Z, 2023-01-05T06:00:00Z]
    count : 7, values : [2023-01-05T13:00:00Z, 2023-01-05T07:00:00Z, 2023-01-05T08:00:00Z, 2023-01-05T09:00:00Z, 2023-01-05T10:00:00Z, 2023-01-05T11:00:00Z, 2023-01-05T12:00:00Z]
    count : 7, values : [2023-01-05T14:00:00Z, 2023-01-05T15:00:00Z, 2023-01-05T16:00:00Z, 2023-01-05T17:00:00Z, 2023-01-05T18:00:00Z, 2023-01-05T19:00:00Z, 2023-01-05T20:00:00Z]
    count : 4, values : [2023-01-05T21:00:00Z, 2023-01-05T22:00:00Z, 2023-01-05T23:00:00Z, 2023-01-06T00:00:00Z]

     

     

    Offset 적용하기.

    Tumbling Time Window 는 Offset 를 적용할 수 있습니다.

    적용하는 방법은 아래와 같습니다.

    DataStream<List<Instant>> stream = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Instant>forBoundedOutOfOrderness(Duration.ofHours(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.toEpochMilli())
        ).keyBy(event -> "_")
                .window(TumblingEventTimeWindows.of(Time.hours(24), Time.hours(-9)))
                .process(new ProcessWindowFunction<>() {
                  @Override
                  public void process(String s, ProcessWindowFunction<Instant, List<Instant>, String, TimeWindow>.Context context, Iterable<Instant> elements, Collector<List<Instant>> out) {
                    List<Instant> aggregation = new ArrayList<>();
                    for (Instant event : elements) {
                      aggregation.add(event);
                    }
                    out.collect(aggregation);
                  }
                });

     

    위 예시는 UTC 시간이 아닌 한국 시간을 적용하기 위해서 Offset 으로 -9 시간을 적용하였습니다.

    count : 15, values : [2023-01-01T00:00:00Z, 2023-01-01T01:00:00Z, 2023-01-01T02:00:00Z, 2023-01-01T03:00:00Z, 2023-01-01T04:00:00Z, 2023-01-01T05:00:00Z, 2023-01-01T06:00:00Z, 2023-01-01T07:00:00Z, 2023-01-01T08:00:00Z, 2023-01-01T09:00:00Z, 2023-01-01T10:00:00Z, 2023-01-01T11:00:00Z, 2023-01-01T12:00:00Z, 2023-01-01T13:00:00Z, 2023-01-01T14:00:00Z]
    count : 24, values : [2023-01-01T15:00:00Z, 2023-01-01T16:00:00Z, 2023-01-01T17:00:00Z, 2023-01-01T18:00:00Z, 2023-01-01T19:00:00Z, 2023-01-01T20:00:00Z, 2023-01-01T21:00:00Z, 2023-01-01T22:00:00Z, 2023-01-01T23:00:00Z, 2023-01-02T00:00:00Z, 2023-01-02T01:00:00Z, 2023-01-02T02:00:00Z, 2023-01-02T03:00:00Z, 2023-01-02T04:00:00Z, 2023-01-02T05:00:00Z, 2023-01-02T06:00:00Z, 2023-01-02T07:00:00Z, 2023-01-02T08:00:00Z, 2023-01-02T09:00:00Z, 2023-01-02T10:00:00Z, 2023-01-02T11:00:00Z, 2023-01-02T12:00:00Z, 2023-01-02T13:00:00Z, 2023-01-02T14:00:00Z]
    count : 24, values : [2023-01-02T15:00:00Z, 2023-01-02T16:00:00Z, 2023-01-02T17:00:00Z, 2023-01-02T18:00:00Z, 2023-01-02T19:00:00Z, 2023-01-02T20:00:00Z, 2023-01-02T21:00:00Z, 2023-01-02T22:00:00Z, 2023-01-02T23:00:00Z, 2023-01-03T00:00:00Z, 2023-01-03T01:00:00Z, 2023-01-03T02:00:00Z, 2023-01-03T03:00:00Z, 2023-01-03T04:00:00Z, 2023-01-03T05:00:00Z, 2023-01-03T06:00:00Z, 2023-01-03T07:00:00Z, 2023-01-03T08:00:00Z, 2023-01-03T09:00:00Z, 2023-01-03T10:00:00Z, 2023-01-03T11:00:00Z, 2023-01-03T12:00:00Z, 2023-01-03T13:00:00Z, 2023-01-03T14:00:00Z]
    count : 24, values : [2023-01-03T15:00:00Z, 2023-01-03T16:00:00Z, 2023-01-03T17:00:00Z, 2023-01-03T18:00:00Z, 2023-01-03T19:00:00Z, 2023-01-03T20:00:00Z, 2023-01-03T21:00:00Z, 2023-01-03T22:00:00Z, 2023-01-03T23:00:00Z, 2023-01-04T00:00:00Z, 2023-01-04T01:00:00Z, 2023-01-04T02:00:00Z, 2023-01-04T03:00:00Z, 2023-01-04T04:00:00Z, 2023-01-04T05:00:00Z, 2023-01-04T06:00:00Z, 2023-01-04T07:00:00Z, 2023-01-04T08:00:00Z, 2023-01-04T09:00:00Z, 2023-01-04T10:00:00Z, 2023-01-04T11:00:00Z, 2023-01-04T12:00:00Z, 2023-01-04T13:00:00Z, 2023-01-04T14:00:00Z]
    count : 24, values : [2023-01-04T15:00:00Z, 2023-01-04T16:00:00Z, 2023-01-04T17:00:00Z, 2023-01-04T18:00:00Z, 2023-01-04T19:00:00Z, 2023-01-04T20:00:00Z, 2023-01-04T21:00:00Z, 2023-01-04T22:00:00Z, 2023-01-04T23:00:00Z, 2023-01-05T00:00:00Z, 2023-01-05T01:00:00Z, 2023-01-05T02:00:00Z, 2023-01-05T03:00:00Z, 2023-01-05T04:00:00Z, 2023-01-05T05:00:00Z, 2023-01-05T06:00:00Z, 2023-01-05T07:00:00Z, 2023-01-05T08:00:00Z, 2023-01-05T09:00:00Z, 2023-01-05T10:00:00Z, 2023-01-05T11:00:00Z, 2023-01-05T12:00:00Z, 2023-01-05T13:00:00Z, 2023-01-05T14:00:00Z]
    count : 10, values : [2023-01-05T15:00:00Z, 2023-01-05T16:00:00Z, 2023-01-05T17:00:00Z, 2023-01-05T18:00:00Z, 2023-01-05T19:00:00Z, 2023-01-05T20:00:00Z, 2023-01-05T21:00:00Z, 2023-01-05T22:00:00Z, 2023-01-05T23:00:00Z, 2023-01-06T00:00:00Z]

     

     

    반응형
Designed by Tistory.