ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] groupBy, RelationalGroupedDataset 알아보기
    Spark 2024. 1. 26. 23:30
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 SparkSQL 의 Group By Transformation 에 대해서 상세히 알아보려고 합니다.

    사용할 데이터는 Kaggle 의 Bank Customer Chunk Prediction 데이터를 사용하며,

    Dataset 의 다운로드 링크는 아래와 같습니다.

    https://www.kaggle.com/datasets/shubhammeshram579/bank-customer-churn-prediction

     

    Bank Customer Churn Prediction

    Predicting customer churn in banking industry using machine learning.

    www.kaggle.com

     

    데이터 살펴보기.

    이번에 다루어 볼 데이터셋은 은행 이탈 고객에 대한 데이터셋입니다.

    Gender, Geography, Age 와 같은 고객의 메타데이터

    Balance, Tenure, NumOfProducrs, HasCrCard 와 같은 금융 정보가 포함됩니다.

    그리고 이탈 여부를 의미하는 Exited 인 데이터도 존재하죠.

    즉, 고객의 개인 정보와 금융 정보에 해당하는 Feature 와 이탈 여부인 Label 로 구성됩니다.

     

    < Schema >

    root
     |-- RowNumber: integer (nullable = true)
     |-- CustomerId: integer (nullable = true)
     |-- Surname: string (nullable = true)
     |-- CreditScore: integer (nullable = true)
     |-- Geography: string (nullable = true)
     |-- Gender: string (nullable = true)
     |-- Age: double (nullable = true)
     |-- Tenure: integer (nullable = true)
     |-- Balance: double (nullable = true)
     |-- NumOfProducts: integer (nullable = true)
     |-- HasCrCard: integer (nullable = true)
     |-- IsActiveMember: integer (nullable = true)
     |-- EstimatedSalary: double (nullable = true)
     |-- Exited: integer (nullable = true)

     

    < show(5) >

    +---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+
    |RowNumber|CustomerId| Surname|CreditScore|Geography|Gender| Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
    +---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+
    |        1|  15634602|Hargrave|        619|   France|Female|42.0|     2|      0.0|            1|        1|             1|      101348.88|     1|
    |        2|  15647311|    Hill|        608|    Spain|Female|41.0|     1| 83807.86|            1|        0|             1|      112542.58|     0|
    |        3|  15619304|    Onio|        502|   France|Female|42.0|     8| 159660.8|            3|        1|             0|      113931.57|     1|
    |        4|  15701354|    Boni|        699|   France|Female|39.0|     1|      0.0|            2|        0|             0|       93826.63|     0|
    |        5|  15737888|Mitchell|        850|    Spain|Female|43.0|     2|125510.82|            1|     null|             1|        79084.1|     0|
    +---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+

     

     

    Group By Transformation.

    Group By 동작은 기본적으로 여러 Column 들에 적용할 수 있습니다.

    1개의 Column 의 Group By 와 Column 별 Count 를 집계해보도록 하겠습니다.

     

    Geography Group By and Count.

    Kaggle 의 Bank Churn 데이터는 resources/files/Churn_Modelling.csv 위치에 저장하였습니다.

    아래의 코드는 Geography 칼럼을 기준으로 Count 를 집계하는 예시코드입니다.

     

    package test.spark.transformation
    
    import org.apache.spark.sql.SparkSession
    
    object TestGroupBy {
      def main (args: Array[String]): Unit = {
        val spark = SparkSession.builder.appName("").master("local[*]")
          .config("spark.driver.bindAddress", "localhost").getOrCreate()
        val fileLocation = this.getClass.getResource("/files/Churn_Modelling.csv").getFile
        val df = spark.read.format("csv").option("inferSchema", true).option("header", true).load(fileLocation)
        val groupByGeography = df.groupBy("Geography").count()
    
        groupByGeography.explain(true)
        groupByGeography.show(10)
        groupByGeography.printSchema()
        spark.close()
      }
    
    }

     

    Group By Transformtaion 의 실행 계획은 아래와 같습니다.

    먼저 Unresolved Logical Plan 을 확인하면 모든 칼럼에 대한 Relation 과 Geography 칼럼의 Aggregate 계획을 확인할 수 있습니다.

    == Parsed Logical Plan ==
    Aggregate [Geography#20], [Geography#20, count(1) AS count#59L]
    +- Relation [RowNumber#16,CustomerId#17,Surname#18,CreditScore#19,Geography#20,Gender#21,Age#22,Tenure#23,Balance#24,NumOfProducts#25,HasCrCard#26,IsActiveMember#27,EstimatedSalary#28,Exited#29] csv

     

     

    그리고 Optimization 의 결과로써 Column 의 Projection 이 추가됩니다.

    == Optimized Logical Plan ==
    Aggregate [Geography#20], [Geography#20, count(1) AS count#59L]
    +- Project [Geography#20]
       +- Relation [RowNumber#16,CustomerId#17,Surname#18,CreditScore#19,Geography#20,Gender#21,Age#22,Tenure#23,Balance#24,NumOfProducts#25,HasCrCard#26,IsActiveMember#27,EstimatedSalary#28,Exited#29] csv

     

    Physical Plan 은 아래와 같습니다.

    File Scan 동작의 결과로써 File 의 데이터는 메모리로 로드됩니다.

    그리고 Aggregation 과 Partitioning 을 통해서 Geography 칼럼의 카운트 집계가 수행됩니다.

    Exchange hashpartitioning(Geography#20, 200) 에서 알 수 있듯이 200개의 Partition 으로 Task 가 나뉘어집니다.

    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- HashAggregate(keys=[Geography#20], functions=[count(1)], output=[Geography#20, count#59L])
       +- Exchange hashpartitioning(Geography#20, 200), ENSURE_REQUIREMENTS, [plan_id=30]
          +- HashAggregate(keys=[Geography#20], functions=[partial_count(1)], output=[Geography#20, count#63L])
             +- FileScan csv [Geography#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/ihyeong-ug/chak/scala/spark/target/scala-2.13/classes/file..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Geography:string>

     

    RelationalGroupedDataset.

    RelationalGroupedDataset 는 groupBy Transformation 이 적용된 DataFrame 의 상태를 의미합니다.

    위 예시처럼 Group By Transformation 이 적용되면 DataFrame 은 RelationalGroupedDataset 상태로 변경되구요.

    이 상태에서 agg, sum, count, avg 와 같은 집계 연산을 적용할 수 있습니다.

     

    package test.spark.transformation
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{col, floor}
    
    object TestGroupBy {
      def main (args: Array[String]): Unit = {
        val spark = SparkSession.builder.appName("").master("local[*]")
          .config("spark.driver.bindAddress", "localhost").getOrCreate()
        val fileLocation = this.getClass.getResource("/files/Churn_Modelling.csv").getFile
    
        val churnDF = spark.read.format("csv").option("inferSchema", true).option("header", true).load(fileLocation)
        val groupByGeography = churnDF.groupBy("Geography").count()
        val groupByGender = churnDF.groupBy("Gender").count()
        val groupByAge = churnDF.withColumn("Age", floor(col("Age") / 10) * 10).groupBy("Age").count()
        val groupByBalance = churnDF.groupBy("Exited").avg("Balance")
    
        println("groupByGeography")
        for (row <- groupByGeography.collect()) println(row)
        println("groupByGender")
        for (row <- groupByGender.collect()) println(row)
        println("groupByAge")
        for (row <- groupByAge.collect()) println(row)
        println("groupByBalance")
        for (row <- groupByBalance.collect()) println(row)
        spark.close()
      }
    
    }
    ### groupByGeography
    ### [Germany,2510]
    ### [France,5014]
    ### [null,1]
    ### [Spain,2477]
    
    ### groupByGender
    ### [Female,4544]
    ### [Male,5458]
    
    ### groupByAge
    ### [50,869]
    ### [null,1]
    ### [10,49]
    ### [80,13]
    ### [30,4346]
    ### [90,2]
    ### [20,1592]
    ### [70,136]
    ### [60,375]
    ### [40,2619]
    
    ### groupByBalance
    ### [1,91100.67219823351]
    ### [0,72752.50389753867]

     

    반응형
Designed by Tistory.