-
Flink Window 이해하기Flink 2024. 1. 13. 10:18728x90반응형
- 목차
들어가며.
Stateful 이란 ?
먼저 Flink DataStream 의 Window 에 대해서 알아보기 이전에 State 에 대한 이야기를 먼저 하도록 하겠습니다.
State 를 활용함으로써 Stateless Stream Processing 과 Stateful Stream Processing 의 차별점이 발생합니다.
Stateless Stream Processing 을 아래와 같습니다.
stream.map(x -> x * 2) .filter(x -> x > 100) .flatMap((x, out) -> out.collect(NumberRange(0, x)))
Stateless Stream Processing 의 대표적인 Transformation 은 Map, Filter, FlatMap 등이 존재합니다.
이들은 모두 순수함수 형태로 구현되구요.
입력에 대한 Transformation Output 은 항상 동일합니다.
언제, 어느 환경에서 수행되더라도 동일한 결과가 발생합니다.
반면, Stateful Stream Processing 의 예시는 아래와 같습니다.
아래 예시의 Map Operator 는 100 번째 이상의 데이터에 한하여 x2 연산을 수행합니다.
Filter Operator 역시 100번째 데이터를 기준으로 필터링을 수행하며,
FlatMap Operator 는 1000번째 데이터를 기준으로 Ignore 또는 Enrichment 를 수행합니다.
Map<String, Integer> stateStore = new HashMap(); stream.map(x -> { int countPerKey = stateStore.get(x); stateStore.put(x, countPerKey + 1); if (countPerKey > 100) return x; return x * 2; }) .filter(x -> { int countPerKey = stateStore.get(x); if (countPerKey > 100) return true; return false; }) .flatMap((x, out)-> { int countPerKey = stateStore.get(x); if (countPerKey > 1000) out.collect(NumberRange(0, x)); else out.empty(); })
Stateful 이란 과거부터 현재까지의 데이터의 상태들이 현재의 데이터 처리에 영향을 주는 상태입니다.
이는 StateStore 라는 외부 저장소에 의해서 Side-Effect 가 발생하기 때문에 더 이상 순수함수가 아닙니다.
State 를 사용하는 케이스는 어떤 상황이 존재할까요 ?
구매하지 않는 사용자에게 쿠폰 전달해주기.
이커머스 데이터의 스트림이 존재하며, 특정 사용자가 장바구니에 상품을 추가하는 행동을 반복합니다.
그리고 구매를 망설이고 있죠.
이 경우에 State 에 최근 10개의 이벤트 중에서 "장바구니 추가" 행동 데이터가 많고, "구매" 행동 데이터가 부재한 경우에
쿠폰을 제공할 수 있는 스트림 처리가 가능합니다.
Map<UserID, List<Event>> StateStore = Map(); userBehaviorStream .filter( behavior -> { userID = behavior.getUserId() cartCount = StateStore.get(userID) .filter(event.getName().notEquals("purchase")) .filter(event.getName().notEquals("addCart")) .count() if (cartCount > 7) return true; else return false; }) .sinkTo( IssueCouponToUser() )
주식 거래 데이터의 상승, 하강 파악하기.
1시간 이내로 발생하는 특정 회사의 주식 변동을 파악할 수 있습니다.
StateStore 는 회사별 & 시간별로 주식 가격의 총합을 관리합니다.
그래서 시간별 주식 가격의 변동 추이를 확인할 수 있는 Stateful Stream Processing 이 가능합니다.
Map<CompanyNameAndTime, SumPrice> StateStore = Map() stockTradeStream .filter( storeTrade -> { companyName = storeTrade.getCompanyName(); timestamp = storeTrade.getTimestamp().truncate(HOUR) nowSumPrice = StateStore.get(companyName + timestamp) last_1_hour_price = StateStore.get(companyName + timestamp - oneHour) if (nowSumPrice > last_1_hour_price) return true; else return false; }) .sinkTo( NotifyToCustomers() )
Window 와 State 란?
위에서 설명한 것처럼 Stateful Stream Processing 을 StateStore 를 통해서 유의미한 결론을 도출할 수 있습니다.
그리고 Window 는 대표적인 Stateful Operator 입니다.
시간 기준 또는 이벤트의 갯수를 기준으로 상태를 저장할 수 있습니다.
이어지는 내용에서 Window 에 대한 이야기를 상세히 진행해보겠습니다.
Window 란 무엇일까 ?
Stream Processing 을 끊임없이 흐르는 데이터의 흐름입니다.
Window 는 끊임없이 흐르는 데이터들을 작은 부분집합으로 나눌 수 있습니다.
아래의 이미지처럼 1 부터 시작하는 무한한 자연수들이 순서대로 Window 에 진입합니다.
그리고 Window 는 데이터를 작은 Chunk 들로 구분짓습니다.
저의 Window 의 데이터를 구분짓는 기준은 Count 이며, 5개씩 한 Chunk 를 구성하도록 설정하였습니다.
시간 단위로 Window 의 기준을 설정할 수도 있습니다.
아래의 이미지는 2분 단위로 Window 의 Chunk 를 구성합니다.
Window Operator 알아보기.
Window 는 여러 Chunk 단위로 State를 저장한다고 말씀드렸습니다.
그리고 Chunk 에 저장된 State 를 기준으로 Aggregation 을 수행합니다.
keyBy.
keyBy 를 Chunk 를 나누는 첫번째 기준입니다.
keyBy 는 KeyedStream 을 만드는 Hash Partitioning 방식인데요.
아래 이미지처럼 KeyBy Operator 에 의해서 Hash Partitioning 되는 결과를 기준으로 Chunk 가 생성됩니다.
새롭게 "Daniel" 이라는 Event 가 DataSource 에서 발생한다면, Daniel 는 새로운 Chunk 를 생성할 것입니다.
WindowAssigner.
WindowAssigner 는 Chunk 를 관리하는 두번째 기준입니다.
WindowAssigner 는 Chunk 를 생성하고, 삭제하는 역할을 수행합니다.
WindowAssigner 의 대표적인 종류는 TumblingWindow, SlidingWindow, SessionWindow 가 존재합니다.
WindowAssigner 는 시간을 기준으로 Chunk 를 생성하고, 삭제할 수 있습니다.
Count 기반의 WindowAssigner 도 존재하지만, Time 기반의 WindowAssigner 를 중점적으로 설명하고자 합니다.
TumblingWindow.
시간을 기준으로 Chunk 를 생성합니다.
만약 Tumbling Window 를 5분 단위로 설정하게 되면, Data Stream 에서 발생하는 Event 를 5분 단위로 구분짓게 됩니다.
아래 이미지는 Tumbling Window 를 표현하는 Flink Documentation 의 이미지 예시입니다.
y 축의 user 1, 2, 3 는 keyBy 에 의해서 Partitioning 된 Chunk 의 첫번째 기준이구요.
x 축의 시간에 따라서 Chunk 가 나뉘어지게 됩니다.
아래 예시는 Andy 과 Bob 과 시간 정보를 담은 데이터 스트림입니다.
Andy 는 1시 1분, 1시 5분, 1시 9분, Bob 은 1시 2분, 1시 6분, 1시 10분에 해당하는 데이터가 존재합니다.
Tumbling Window 의 기준은 5분으로 설정하였습니다.
이렇게 되면, (Andy, 1시 1분) 데이터를 시작으로 하나의 Chunk 가 생성됩니다.
그리고 (Andy, 1시 5분) 데이터는 아직 5분이 경과하지 않았기 때문에 (Andy, 1시 1분) 과 같은 Chunk 에 묶이게 됩니다.
이러한 원리로 4개의 Chunk 가 생성됩니다.
그 외의 SessionWindow 와 SlidingWindow 은 다음 컨텐츠에서 다루도록 하겠습니다.
내용이 너무 길어지네요.
Watermark.
그럼 생성된 Window 의 Chunk 는 언제 종료될까요 ?
이 과정에서 Watermark 가 사용됩니다.
Watermark 의 의미는 Watermark 가 명시하는 시각 이후로는 과거의 Event 가 유입되지 않을 것을 의미합니다.
말이 어려운데요.
DataSource 가 주기적으로 Watermark 라는 레코드를 데이트 스트림으로 전송하게 됩니다.
그럼 각 Operator / Task 들은 Watermark 레코드를 처리하게 되는데요.
Window Operator 가 Watermark 레코드를 전달받게 되면, Window Operator 는 이렇게 해석합니다.
" 아! 이제 Watermark 가 명시한 시간보다 과거의 Event 는 유입되지 않겠군.
그럼 내가 관리하는 Chunk 중에서 Watermark 이전의 Chunk 는 종료시키고 Downstream 으로 보내야겠어 ! "
이 과정에서 State 를 정리하고 (Clear), Downstream Task 로 데이터를 전송합니다. (Flush)
아래 이미지는 1시 9분 Watermark 가 유입되는 모습입니다.
그럼 이제 1시 9분보다 과거인 1시 8분, 1시 1분 같은 Event 는 더 이상 유입되지 않는다는 것은 Window Operator 에게 알리게 됩니다.
그리고 Window Operator 는 과감하게 Chunk 를 Clear & Flush 하게 됩니다.
이렇게 되면, (Bob, 1시 10분) 데이터와 Chunk 는 Window 내에 머물게 됩니다.
1시 10분은 Watermark 1시 9분보다 미래의 시간이니까요.
마치며.
다음 컨텐츠에서 Window Operator 의 구체적인 사용법에 대해서 작성해보려고 합니다.
감사합니다.
반응형'Flink' 카테고리의 다른 글
[Flink] KafkaSource Connector 알아보기 (0) 2024.02.04 [Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap) (0) 2024.01.25 Flink StreamGraph 알아보기 (0) 2024.01.11 Flink State 알아보기 (0) 2024.01.10 Flink Watermark 알아보기 (0) 2024.01.10