-
[Spark] groupBy, RelationalGroupedDataset 알아보기Spark 2024. 1. 26. 23:30728x90반응형
- 목차
들어가며.
이번 글에서는 SparkSQL 의 Group By Transformation 에 대해서 상세히 알아보려고 합니다.
사용할 데이터는 Kaggle 의 Bank Customer Chunk Prediction 데이터를 사용하며,
Dataset 의 다운로드 링크는 아래와 같습니다.
https://www.kaggle.com/datasets/shubhammeshram579/bank-customer-churn-prediction
데이터 살펴보기.
이번에 다루어 볼 데이터셋은 은행 이탈 고객에 대한 데이터셋입니다.
Gender, Geography, Age 와 같은 고객의 메타데이터
Balance, Tenure, NumOfProducrs, HasCrCard 와 같은 금융 정보가 포함됩니다.
그리고 이탈 여부를 의미하는 Exited 인 데이터도 존재하죠.
즉, 고객의 개인 정보와 금융 정보에 해당하는 Feature 와 이탈 여부인 Label 로 구성됩니다.
< Schema >
root |-- RowNumber: integer (nullable = true) |-- CustomerId: integer (nullable = true) |-- Surname: string (nullable = true) |-- CreditScore: integer (nullable = true) |-- Geography: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: double (nullable = true) |-- Tenure: integer (nullable = true) |-- Balance: double (nullable = true) |-- NumOfProducts: integer (nullable = true) |-- HasCrCard: integer (nullable = true) |-- IsActiveMember: integer (nullable = true) |-- EstimatedSalary: double (nullable = true) |-- Exited: integer (nullable = true)
< show(5) >
+---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+ |RowNumber|CustomerId| Surname|CreditScore|Geography|Gender| Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited| +---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+ | 1| 15634602|Hargrave| 619| France|Female|42.0| 2| 0.0| 1| 1| 1| 101348.88| 1| | 2| 15647311| Hill| 608| Spain|Female|41.0| 1| 83807.86| 1| 0| 1| 112542.58| 0| | 3| 15619304| Onio| 502| France|Female|42.0| 8| 159660.8| 3| 1| 0| 113931.57| 1| | 4| 15701354| Boni| 699| France|Female|39.0| 1| 0.0| 2| 0| 0| 93826.63| 0| | 5| 15737888|Mitchell| 850| Spain|Female|43.0| 2|125510.82| 1| null| 1| 79084.1| 0| +---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+
Group By Transformation.
Group By 동작은 기본적으로 여러 Column 들에 적용할 수 있습니다.
1개의 Column 의 Group By 와 Column 별 Count 를 집계해보도록 하겠습니다.
Geography Group By and Count.
Kaggle 의 Bank Churn 데이터는 resources/files/Churn_Modelling.csv 위치에 저장하였습니다.
아래의 코드는 Geography 칼럼을 기준으로 Count 를 집계하는 예시코드입니다.
package test.spark.transformation import org.apache.spark.sql.SparkSession object TestGroupBy { def main (args: Array[String]): Unit = { val spark = SparkSession.builder.appName("").master("local[*]") .config("spark.driver.bindAddress", "localhost").getOrCreate() val fileLocation = this.getClass.getResource("/files/Churn_Modelling.csv").getFile val df = spark.read.format("csv").option("inferSchema", true).option("header", true).load(fileLocation) val groupByGeography = df.groupBy("Geography").count() groupByGeography.explain(true) groupByGeography.show(10) groupByGeography.printSchema() spark.close() } }
Group By Transformtaion 의 실행 계획은 아래와 같습니다.
먼저 Unresolved Logical Plan 을 확인하면 모든 칼럼에 대한 Relation 과 Geography 칼럼의 Aggregate 계획을 확인할 수 있습니다.
== Parsed Logical Plan == Aggregate [Geography#20], [Geography#20, count(1) AS count#59L] +- Relation [RowNumber#16,CustomerId#17,Surname#18,CreditScore#19,Geography#20,Gender#21,Age#22,Tenure#23,Balance#24,NumOfProducts#25,HasCrCard#26,IsActiveMember#27,EstimatedSalary#28,Exited#29] csv
그리고 Optimization 의 결과로써 Column 의 Projection 이 추가됩니다.
== Optimized Logical Plan == Aggregate [Geography#20], [Geography#20, count(1) AS count#59L] +- Project [Geography#20] +- Relation [RowNumber#16,CustomerId#17,Surname#18,CreditScore#19,Geography#20,Gender#21,Age#22,Tenure#23,Balance#24,NumOfProducts#25,HasCrCard#26,IsActiveMember#27,EstimatedSalary#28,Exited#29] csv
Physical Plan 은 아래와 같습니다.
File Scan 동작의 결과로써 File 의 데이터는 메모리로 로드됩니다.
그리고 Aggregation 과 Partitioning 을 통해서 Geography 칼럼의 카운트 집계가 수행됩니다.
Exchange hashpartitioning(Geography#20, 200) 에서 알 수 있듯이 200개의 Partition 으로 Task 가 나뉘어집니다.
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[Geography#20], functions=[count(1)], output=[Geography#20, count#59L]) +- Exchange hashpartitioning(Geography#20, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[Geography#20], functions=[partial_count(1)], output=[Geography#20, count#63L]) +- FileScan csv [Geography#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/ihyeong-ug/chak/scala/spark/target/scala-2.13/classes/file..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Geography:string>
RelationalGroupedDataset.
RelationalGroupedDataset 는 groupBy Transformation 이 적용된 DataFrame 의 상태를 의미합니다.
위 예시처럼 Group By Transformation 이 적용되면 DataFrame 은 RelationalGroupedDataset 상태로 변경되구요.
이 상태에서 agg, sum, count, avg 와 같은 집계 연산을 적용할 수 있습니다.
package test.spark.transformation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, floor} object TestGroupBy { def main (args: Array[String]): Unit = { val spark = SparkSession.builder.appName("").master("local[*]") .config("spark.driver.bindAddress", "localhost").getOrCreate() val fileLocation = this.getClass.getResource("/files/Churn_Modelling.csv").getFile val churnDF = spark.read.format("csv").option("inferSchema", true).option("header", true).load(fileLocation) val groupByGeography = churnDF.groupBy("Geography").count() val groupByGender = churnDF.groupBy("Gender").count() val groupByAge = churnDF.withColumn("Age", floor(col("Age") / 10) * 10).groupBy("Age").count() val groupByBalance = churnDF.groupBy("Exited").avg("Balance") println("groupByGeography") for (row <- groupByGeography.collect()) println(row) println("groupByGender") for (row <- groupByGender.collect()) println(row) println("groupByAge") for (row <- groupByAge.collect()) println(row) println("groupByBalance") for (row <- groupByBalance.collect()) println(row) spark.close() } }
### groupByGeography ### [Germany,2510] ### [France,5014] ### [null,1] ### [Spain,2477] ### groupByGender ### [Female,4544] ### [Male,5458] ### groupByAge ### [50,869] ### [null,1] ### [10,49] ### [80,13] ### [30,4346] ### [90,2] ### [20,1592] ### [70,136] ### [60,375] ### [40,2619] ### groupByBalance ### [1,91100.67219823351] ### [0,72752.50389753867]
반응형'Spark' 카테고리의 다른 글
[Spark] JDBC DataFrameReader 알아보기 (MySQL) (0) 2024.01.28 [Spark] S3 DataFrameReader 구현하기 (s3a) (0) 2024.01.28 [Spark] Union 알아보기 (0) 2024.01.26 [Spark] fold, reduce 알아보기 ( RDD, Action, Aggregation ) (0) 2024.01.19 [SparkSQL] CSV DataFrameReader 알아보기 (0) 2024.01.15