ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Window 알아보기 ( lag, lead, sum )
    Spark 2024. 5. 15. 07:51
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Spark 의 Window 기능을 통해서 부분적인 Aggregation 연산을 수행할 수 있습니다.

    일반적인 Aggregation 연산은 전체 데이터셋을 대상으로 수행됩니다.

    일반적으로 사용하는 sum, avg, var, min/max 등은 모든 데이터들을 대상으로 합계, 평균, 분산, 최소/최대값을 계산합니다.

    그래서 N 개의 데이터를 대상으로 N 개보다 작은 수의 결과가 출력됩니다.

    예를 들어, 아래의 연산처럼 100 개의 데이터를 1개의 결과로 출력됩니다.

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark.range(100)
    print(f"""sum : {df.select(F.sum(F.col("id"))).collect()[0][0]}""")
    print(f"""count : {df.select(F.count(F.col("id"))).collect()[0][0]}""")
    print(f"""avg : {df.select(F.avg(F.col("id"))).collect()[0][0]}""")
    print(f"""var_pop : {df.select(F.var_pop(F.col("id"))).collect()[0][0]}""")
    print(f"""min : {df.select(F.min(F.col("id"))).collect()[0][0]}""")
    print(f"""max : {df.select(F.max(F.col("id"))).collect()[0][0]}""")
    spark.stop()
    sum : 4950
    count : 100
    avg : 49.5
    var_pop : 833.25
    min : 0
    max : 99

     

    그리고 GroupBy 연산이 적용된다면 Categorical Data 의 데이터 종류의 갯수만큼의 출력이 발생합니다.

    아래의 예시는 5개의 종류를 가지는 name 이라는 Categorical Data 를 대상으로 Aggregation 을 수행합니다.

    이 경우에는 Aggregation 연산은 N 개의 데이터를 기준으로 5개의 출력을 생산합니다.

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    id_array = np.arange(1, 100 + 1, 1)
    nominal_array = np.random.choice(a=["Andy", "Bob", "Chris", "Daniel", "Fabian"], size=100)
    
    rows = [(int(id), str(name)) for id, name in zip(id_array, nominal_array)]
    schema = types.StructType([
        types.StructField("id", types.IntegerType(), False),
        types.StructField("name", types.StringType(), False)
    ])
    df = spark.createDataFrame(data=rows, schema=schema)
    print(f"""sum : {df.groupby(F.col("name")).agg(F.sum(F.col("id"))).collect()[0:2]}""")
    print(f"""count : {df.groupby(F.col("name")).agg(F.count(F.col("id"))).collect()[0:2]}""")
    print(f"""avg : {df.groupby(F.col("name")).agg(F.avg(F.col("id"))).collect()[0:2]}""")
    print(f"""var_pop : {df.groupby(F.col("name")).agg(F.var_pop(F.col("id"))).collect()[0:2]}""")
    print(f"""min : {df.groupby(F.col("name")).agg(F.min(F.col("id"))).collect()[0:2]}""")
    print(f"""max : {df.groupby(F.col("name")).agg(F.max(F.col("id"))).collect()[0:2]}""")
    spark.stop()
    sum : [Row(name='Chris', sum(id)=888), Row(name='Fabian', sum(id)=1300)]
    count : [Row(name='Chris', count(id)=17), Row(name='Fabian', count(id)=29)]
    avg : [Row(name='Chris', avg(id)=52.23529411764706), Row(name='Fabian', avg(id)=44.827586206896555)]
    var_pop : [Row(name='Chris', var_pop(id)=1032.2975778546713), Row(name='Fabian', var_pop(id)=970.5564803804994)]
    min : [Row(name='Chris', min(id)=2), Row(name='Fabian', min(id)=4)]
    max : [Row(name='Chris', max(id)=100), Row(name='Fabian', max(id)=97)]

     

     

    Window 연산은 일반적인 Aggregation 연산과 다릅니다.

    Aggregation 은 전체 데이터셋을 대상으로 수행하는 반면 Window 연산은 조건에 맞는 부분 집합을 대상으로 연산을 수행합니다.

    그리고 N 개의 Input 데이터를 기준으로 N 개의 출력을 생성합니다.

    즉, N:N 의 매핑을 보장합니다.

     

    Window vs Aggregation.

    1. Aggregation 은 전체 데이터셋을 대상으로 집계 연산을 수행합니다.

    반면 Window 연산은 부분 집합을 기준으로 집계 연산을 수행합니다.

     

    2. Aggregation 연산은 N:M 비율로 Input -> Output 데이터를 변형합니다.

    M 은 N 보다 작거나 같습니다.

    반면, Window 연산은 N : N 비율로 Output 데이터가 생성됩니다.

     

     

    Window Operations.

    Window 연산 중 Lag, Lead, Sum 에 대해서 알아보도록 하겠습니다.

    Lag.

    Lag 는 어떤 레코드의 바로 이전 레코드를 획득할 수 있도록 하는 함수입니다.

    예를 들어, id 라는 이름의 칼럼을 가지는 DataFrame 이 존재하고 id 칼럼은 0부터 9까지 Integer 값을 가집니다.

    그리고 lag 함수를 사용하여 바로 직전의 id 값을 DataFrame 에 추가한다면 아래와 같은 결과가 출력됩니다.

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark.range(10)
    df = df.withColumn("prev_id", lag(col("id")).over(Window.orderBy(col("id").asc())))
    df.show()
    +---+-------+
    | id|prev_id|
    +---+-------+
    |  0|   null|
    |  1|      0|
    |  2|      1|
    |  3|      2|
    |  4|      3|
    |  5|      4|
    |  6|      5|
    |  7|      6|
    |  8|      7|
    |  9|      8|
    +---+-------+

     

    위와 같이 1번 id 를 가지는 레코드는 prev_id 로써 0 을 가집니다.

    즉, N번 id 를 가지는 레코드는 N-1 번의 prev_id 를 가지게 되죠.

    이러한 방식으로 현재 레코드는 이전 레코드의 값을 가질 수 있게 됩니다.

     

    위 예시에서 확인할 수 있는 over(Window.orderBy(col("id").asc()) 이라는 문장은

    id 를 기준으로 정렬된 부분 집합을 대상으로 Aggregation 을 수행함을 뜻합니다.

    그래서 3번 id 를 가지는 레코드는 0 ~ 2번까지의 부분 집합을 대상으로 Aggregation 을 수행하고,

    3번 id 를 가지는 레코드의 직전의 레코드 ( lagged ) 인 id 2 를 prev_id 로써 가지게 됩니다.

     

    Lead.

    Lag 와 마찬가지로 Lead 함수는 이후의 레코드의 정보를 획득할 수 있습니다.

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark.range(10)
    df = df.withColumn("prev_id", lag(col("id")).over(Window.orderBy(col("id").asc())))
    df = df.withColumn("next_id", lead(col("id")).over(Window.orderBy(col("id").asc())))
    df.show()
    +---+-------+-------+
    | id|prev_id|next_id|
    +---+-------+-------+
    |  0|   null|      1|
    |  1|      0|      2|
    |  2|      1|      3|
    |  3|      2|      4|
    |  4|      3|      5|
    |  5|      4|      6|
    |  6|      5|      7|
    |  7|      6|      8|
    |  8|      7|      9|
    |  9|      8|   null|
    +---+-------+-------+

     

     

    sum.

    sum 연산을 통해서 Accumulative Sum 값을 구할 수 있습니다.

    이 경우에는 id 가 N 인 레코드의 Accumulative Sum 를 획득하기 위해서

    0 ~ N - 1 까지의 id 를 가지는 레코드가 집계 연산을 위한 부분 집합으로써 활용됩니다.

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark.range(10)
    df = df.withColumn("prev_id", lag(col("id")).over(Window.orderBy(col("id").asc())))
    df = df.withColumn("next_id", lead(col("id")).over(Window.orderBy(col("id").asc())))
    df = df.withColumn("accumulative_id", sum(col("id")).over(Window.orderBy(col("id").asc())))
    df.show()
    +---+-------+-------+---------------+
    | id|prev_id|next_id|accumulative_id|
    +---+-------+-------+---------------+
    |  0|   null|      1|              0|
    |  1|      0|      2|              1|
    |  2|      1|      3|              3|
    |  3|      2|      4|              6|
    |  4|      3|      5|             10|
    |  5|      4|      6|             15|
    |  6|      5|      7|             21|
    |  7|      6|      8|             28|
    |  8|      7|      9|             36|
    |  9|      8|   null|             45|
    +---+-------+-------+---------------+

     

    응용 사례.

    아래 예시는 흔히 활용되는 응용 사례입니다.

    고객들의 Daily 행동 패턴을 파악하거나, 데이터 변경 추이를 파악하기 위해서 lag, lead 등이 활용됩니다.

    from datetime import datetime
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession, Window
    from pyspark.sql.functions import col, lag, lead, sum
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
    
    conf = SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("test-window-function")
    conf.set("spark.driver.bindAddress", "localhost")
    
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    rows = [
        ("Andy", datetime.strptime("2024-01-01", "%Y-%m-%d"), 1000),
        ("Andy", datetime.strptime("2024-01-02", "%Y-%m-%d"), -100),
        ("Andy", datetime.strptime("2024-01-03", "%Y-%m-%d"), 900),
        ("Andy", datetime.strptime("2024-01-04", "%Y-%m-%d"), 400),
        ("Bob", datetime.strptime("2024-01-01", "%Y-%m-%d"), 600),
        ("Bob", datetime.strptime("2024-01-02", "%Y-%m-%d"), -500),
        ("Bob", datetime.strptime("2024-01-03", "%Y-%m-%d"), 900),
        ("Bob", datetime.strptime("2024-01-04", "%Y-%m-%d"), -400),
    ]
    schema = StructType([
        StructField("name", StringType(), True),
        StructField("date", DateType(), True),
        StructField("amount", IntegerType(), True),
    ])
    df = spark.createDataFrame(rows, schema)
    df.show()
    
    win = Window.partitionBy(col("name")).orderBy(col("date").asc())
    df = df.withColumn("yesterday_amount", lag(col("amount")).over(win))
    df = df.withColumn("tomorrow_amount", lead(col("amount")).over(win))
    df = df.withColumn("accumulative_amount", sum(col("amount")).over(win))
    df.show()
    spark.stop()
    +----+----------+------+
    |name|      date|amount|
    +----+----------+------+
    |Andy|2024-01-01|  1000|
    |Andy|2024-01-02|  -100|
    |Andy|2024-01-03|   900|
    |Andy|2024-01-04|   400|
    | Bob|2024-01-01|   600|
    | Bob|2024-01-02|  -500|
    | Bob|2024-01-03|   900|
    | Bob|2024-01-04|  -400|
    +----+----------+------+
    
    +----+----------+------+----------------+---------------+-------------------+
    |name|      date|amount|yesterday_amount|tomorrow_amount|accumulative_amount|
    +----+----------+------+----------------+---------------+-------------------+
    |Andy|2024-01-01|  1000|            null|           -100|               1000|
    |Andy|2024-01-02|  -100|            1000|            900|                900|
    |Andy|2024-01-03|   900|            -100|            400|               1800|
    |Andy|2024-01-04|   400|             900|           null|               2200|
    | Bob|2024-01-01|   600|            null|           -500|                600|
    | Bob|2024-01-02|  -500|             600|            900|                100|
    | Bob|2024-01-03|   900|            -500|           -400|               1000|
    | Bob|2024-01-04|  -400|             900|           null|                600|
    +----+----------+------+----------------+---------------+-------------------+

     

    반응형
Designed by Tistory.