ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flink Window 이해하기
    Flink 2024. 1. 13. 10:18
    728x90
    반응형

    - 목차

     

    들어가며.

    Stateful 이란 ?

     

    먼저 Flink DataStream 의 Window 에 대해서 알아보기 이전에 State 에 대한 이야기를 먼저 하도록 하겠습니다.

    State 를 활용함으로써 Stateless Stream Processing 과 Stateful Stream Processing 의 차별점이 발생합니다.

    Stateless Stream Processing 을 아래와 같습니다.

    stream.map(x -> x * 2)
        .filter(x -> x > 100)
        .flatMap((x, out) -> out.collect(NumberRange(0, x)))

     

    Stateless Stream Processing 의 대표적인 Transformation 은 Map, Filter, FlatMap 등이 존재합니다.

    이들은 모두 순수함수 형태로 구현되구요.

    입력에 대한 Transformation Output 은 항상 동일합니다.

    언제, 어느 환경에서 수행되더라도 동일한 결과가 발생합니다.

     

    반면, Stateful Stream Processing 의 예시는 아래와 같습니다.

    아래 예시의 Map Operator 는 100 번째 이상의 데이터에 한하여 x2 연산을 수행합니다.

    Filter Operator 역시 100번째 데이터를 기준으로 필터링을 수행하며,

    FlatMap Operator 는 1000번째 데이터를 기준으로 Ignore 또는 Enrichment 를 수행합니다.

    Map<String, Integer> stateStore = new HashMap();
    
    stream.map(x -> {
            int countPerKey = stateStore.get(x);
            stateStore.put(x, countPerKey + 1);
        
            if (countPerKey > 100) return x;
            return x * 2;
        })
        .filter(x -> {
            int countPerKey = stateStore.get(x);
            
            if (countPerKey > 100) return true;
            return false;        
        })
        .flatMap((x, out)-> {
            int countPerKey = stateStore.get(x);
            
            if (countPerKey > 1000) out.collect(NumberRange(0, x));
            else out.empty();
        })

     

    Stateful 이란 과거부터 현재까지의 데이터의 상태들이 현재의 데이터 처리에 영향을 주는 상태입니다.

    이는 StateStore 라는 외부 저장소에 의해서 Side-Effect 가 발생하기 때문에 더 이상 순수함수가 아닙니다.

     

    State 를 사용하는 케이스는 어떤 상황이 존재할까요 ?

    구매하지 않는 사용자에게 쿠폰 전달해주기.

    이커머스 데이터의 스트림이 존재하며, 특정 사용자가 장바구니에 상품을 추가하는 행동을 반복합니다.

    그리고 구매를 망설이고 있죠.

    이 경우에 State 에 최근 10개의 이벤트 중에서 "장바구니 추가" 행동 데이터가 많고, "구매" 행동 데이터가 부재한 경우에

    쿠폰을 제공할 수 있는 스트림 처리가 가능합니다.

    Map<UserID, List<Event>> StateStore = Map();
    
    userBehaviorStream
        .filter( behavior -> {
            
            userID = behavior.getUserId()
            cartCount = StateStore.get(userID)
                .filter(event.getName().notEquals("purchase"))
                .filter(event.getName().notEquals("addCart"))
                .count()
            
            if (cartCount > 7) return true;
            else return false;
        })
        .sinkTo( IssueCouponToUser() )

     

     

    주식 거래 데이터의 상승, 하강 파악하기.

    1시간 이내로 발생하는 특정 회사의 주식 변동을 파악할 수 있습니다.

    StateStore 는 회사별 & 시간별로 주식 가격의 총합을 관리합니다.

    그래서 시간별 주식 가격의 변동 추이를 확인할 수 있는 Stateful Stream Processing 이 가능합니다.

     

    Map<CompanyNameAndTime, SumPrice> StateStore = Map()
    
    stockTradeStream
        .filter( storeTrade -> {
            
            companyName = storeTrade.getCompanyName();
            timestamp = storeTrade.getTimestamp().truncate(HOUR)
            
            nowSumPrice = StateStore.get(companyName + timestamp)
            last_1_hour_price = StateStore.get(companyName + timestamp - oneHour)        
            
            if (nowSumPrice > last_1_hour_price) return true;
            else return false;
        })
        .sinkTo( NotifyToCustomers() )

     

     

    Window 와 State 란?

    위에서 설명한 것처럼 Stateful Stream Processing 을 StateStore 를 통해서 유의미한 결론을 도출할 수 있습니다.

    그리고 Window 는 대표적인 Stateful Operator 입니다.

    시간 기준 또는 이벤트의 갯수를 기준으로 상태를 저장할 수 있습니다.

    이어지는 내용에서 Window 에 대한 이야기를 상세히 진행해보겠습니다.

     

     

    Window 란 무엇일까 ?

    Stream Processing 을 끊임없이 흐르는 데이터의 흐름입니다.

    Window 는 끊임없이 흐르는 데이터들을 작은 부분집합으로 나눌 수 있습니다.

    아래의 이미지처럼 1 부터 시작하는 무한한 자연수들이 순서대로 Window 에 진입합니다.

    그리고 Window 는 데이터를 작은 Chunk 들로 구분짓습니다.

    저의 Window 의 데이터를 구분짓는 기준은 Count 이며, 5개씩 한 Chunk 를 구성하도록 설정하였습니다.

     

     

     

    시간 단위로 Window 의 기준을 설정할 수도 있습니다.

    아래의 이미지는 2분 단위로 Window 의 Chunk 를 구성합니다.

     

     

     

    Window Operator 알아보기.

    Window 는 여러 Chunk 단위로 State를 저장한다고 말씀드렸습니다.

    그리고 Chunk 에 저장된 State 를 기준으로 Aggregation 을 수행합니다.

    keyBy.

    keyBy 를 Chunk 를 나누는 첫번째 기준입니다.

    keyBy 는 KeyedStream 을 만드는 Hash Partitioning 방식인데요.

    아래 이미지처럼 KeyBy Operator 에 의해서 Hash Partitioning 되는 결과를 기준으로 Chunk 가 생성됩니다.

    새롭게 "Daniel" 이라는 Event 가 DataSource 에서 발생한다면, Daniel 는 새로운 Chunk 를 생성할 것입니다.

     

     

     

    WindowAssigner.

    WindowAssigner 는 Chunk 를 관리하는 두번째 기준입니다.

    WindowAssigner 는 Chunk 를 생성하고, 삭제하는 역할을 수행합니다.

    WindowAssigner 의 대표적인 종류는 TumblingWindow, SlidingWindow, SessionWindow 가 존재합니다.

    WindowAssigner 는 시간을 기준으로 Chunk 를 생성하고, 삭제할 수 있습니다.

    Count 기반의 WindowAssigner 도 존재하지만, Time 기반의 WindowAssigner 를 중점적으로 설명하고자 합니다.

     

    TumblingWindow.

    시간을 기준으로 Chunk 를 생성합니다.

    만약 Tumbling Window 를 5분 단위로 설정하게 되면, Data Stream 에서 발생하는 Event 를 5분 단위로 구분짓게 됩니다.

    아래 이미지는 Tumbling Window 를 표현하는 Flink Documentation 의 이미지 예시입니다.

    y 축의 user 1, 2, 3 는 keyBy 에 의해서 Partitioning 된 Chunk 의 첫번째 기준이구요.

    x 축의 시간에 따라서 Chunk 가 나뉘어지게 됩니다.

     

    아래 예시는 Andy 과 Bob 과 시간 정보를 담은 데이터 스트림입니다.

    Andy 는 1시 1분, 1시 5분, 1시 9분, Bob 은 1시 2분, 1시 6분, 1시 10분에 해당하는 데이터가 존재합니다.

    Tumbling Window 의 기준은 5분으로 설정하였습니다.

    이렇게 되면, (Andy, 1시 1분) 데이터를 시작으로 하나의 Chunk 가 생성됩니다.

    그리고 (Andy, 1시 5분) 데이터는 아직 5분이 경과하지 않았기 때문에 (Andy, 1시 1분) 과 같은 Chunk 에 묶이게 됩니다.

    이러한 원리로 4개의 Chunk 가 생성됩니다.

     

     

    그 외의 SessionWindow 와 SlidingWindow 은 다음 컨텐츠에서 다루도록 하겠습니다.

    내용이 너무 길어지네요.

     

    Watermark.

    그럼 생성된 Window 의 Chunk 는 언제 종료될까요 ?

    이 과정에서 Watermark 가 사용됩니다.

    Watermark 의 의미는 Watermark 가 명시하는 시각 이후로는 과거의 Event 가 유입되지 않을 것을 의미합니다.

    말이 어려운데요.

    DataSource 가 주기적으로 Watermark 라는 레코드를 데이트 스트림으로 전송하게 됩니다.

    그럼 각 Operator / Task 들은 Watermark 레코드를 처리하게 되는데요.

    Window Operator 가 Watermark 레코드를 전달받게 되면, Window Operator 는 이렇게 해석합니다.

     

    " 아! 이제 Watermark 가 명시한 시간보다 과거의 Event 는 유입되지 않겠군.

    그럼 내가 관리하는 Chunk 중에서 Watermark 이전의 Chunk 는 종료시키고 Downstream 으로 보내야겠어 ! "

    이 과정에서 State 를 정리하고 (Clear), Downstream Task 로 데이터를 전송합니다. (Flush)

     

    아래 이미지는 1시 9분 Watermark 가 유입되는 모습입니다.

    그럼 이제 1시 9분보다 과거인 1시 8분, 1시 1분 같은 Event 는 더 이상 유입되지 않는다는 것은 Window Operator 에게 알리게 됩니다.

    그리고 Window Operator 는 과감하게 Chunk 를 Clear & Flush 하게 됩니다.

    이렇게 되면, (Bob, 1시 10분) 데이터와 Chunk 는 Window 내에 머물게 됩니다.

    1시 10분은 Watermark 1시 9분보다 미래의 시간이니까요.

     

     

     

     

    마치며.

    다음 컨텐츠에서 Window Operator 의 구체적인 사용법에 대해서 작성해보려고 합니다.

    감사합니다.

     

     

    반응형
Designed by Tistory.