-
[Spark] Union 알아보기Spark 2024. 1. 26. 23:30728x90반응형
- 목차
들어가며.
Union Transformation 은 두개 이상의 DataFrame 을 하나로 합칩니다.
DataFrame 을 합친다는 관점에서 Union 은 Join Transformation 과 유사합니다.
하지만 Join Transformation 과의 차이점은 Row 기반으로 Combining 을 수행하는지 , Colum 기반으로 수행하는지에 대한 차이가 있습니다.
Union Transformation 의 중요한 점은 Row 관점에서의 Dataset 변형이라는 점입니다.
일반적으로 데이터의 변형은 Schema 가 변경된다던지, Data 의 value 의 Update 등의 뜻합니다.
Union 은 데이터 그 자체의 변형은 없지만, Dataset 관점에서 바라볼 때, Dataset 의 크기가 증가하게 되고,
이를 처리하는 Worker 의 갯수나 리소스에 영향을 끼칩니다.
즉, Union 자체도 하나의 Transformation 이라는 관점을 가지는 것이 중요합니다.
Spark 의 DataFrame 은 Immutable 한 객체입니다.
여러 DataFrame 들을 하나의 DataFrame 으로 합치기 위해서는 Row 를 추가하는 방식이 사용되진 않습니다.
아래의 예시와 같이 df1 와 df2 를 Union 하여 새로운 df3 을 생성하는 방식이 사용됩니다.
package job import org.apache.spark.SparkConf import org.apache.spark.sql._ object TestUnion { val sparkConf = new SparkConf() .setAppName("test-union") .setMaster("local[*]") .set("spark.driver.bindAddress", "localhost") def main(args: Array[String]): Unit = { val spark = SparkSession.builder().config(conf = sparkConf).getOrCreate() import spark.implicits._ val df1 = Seq(("Andy", 32), ("Brad", 45)).toDF("name", "age") val df2 = Seq(("Chris", 12), ("Dennis", 64)).toDF("name", "age") var df3 = df1.union(df2) df3.show() spark.stop() } }
Join Operation.
Join Operation 은 서로 다른 두 DataFrame 들이 name 이라는 공통된 Column 을 기준으로 병합됩니다.
그래서 생성되는 새로운 DataFrame 은 기존의 DataFrame Schema 와 공통된 Schema 를 가지지 않습니다.
이러한 관점에서 Join 은 Schema 를 변형시키는 Transformation 으로 볼 수 있습니다.
코드 관점에서 볼때, 새로운 Schema, Serialization 의 내용이 필요해집니다.
Union Operation.
Union Operation 은 Join 과 달리 결이 비슷한 두 DataFrame 이 Schema 를 유지한 채로 결합됩니다.
Schema 가 유지되므로 Column 이 아닌 두 DataFrame 의 모든 Row 들이 합쳐집니다.
Union Transformation 구현하기.
아래 예시 코드는 10만개의 Record 들을 가지는 두 DataFrame 을 Union 하는 연산을 구현한 코드입니다.
( usersDF1 와 usersDF2 인 두 DataFrame 들을 Union 합니다. )
package test.spark.transformation import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} import scala.util.Random object TestUnion { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test").master("local[*]").config("spark.driver.bindAddress", "localhost").getOrCreate val usersList: Seq[Row] = for {id <- 0 until 200000} yield { val name: String = Random.alphanumeric.filter(_.isValidChar).take(10).mkString val age: Int = Random.alphanumeric.filter(_.isDigit).take(2).mkString.toInt Row(id, name, age) } val schema = StructType.fromDDL("id INTEGER, name STRING, age INTEGER") implicit val encoder = RowEncoder(schema) val usersDF1 = spark.createDataset(usersList.take(100000)) val usersDF2 = spark.createDataset(usersList.takeRight(100000)) val unionDF = usersDF1.union(usersDF2) unionDF.explain(true) val firstRecord = unionDF.select("id", "name", "age").sort(col("id").asc).take(1) val lastRecord = unionDF.select("id", "name", "age").sort(col("id").desc).take(1) println("### firstRecord : " + firstRecord.apply(0).toString) println("### lastRecord : " + lastRecord.apply(0).toString) spark.close } }
### firstRecord : [0,VKipTrYw9x,78] ### lastRecord : [199999,497L0djpgQ,58]
실행계획은 다음과 같습니다.
Execution Plan.
Logical Plan 에서는 각 DataFrame 의 Relation 을 뜻하는 LocalRelation 이 2개로 구성됩니다.
그리고 DataSource 가 Runtime 에서 구현된 Seq[Row] 이므로 실제 실행 계획은 TableScan 으로 설정됩니다.
== Parsed Logical Plan == Union false, false :- LocalRelation [id#3, name#4, age#5] +- LocalRelation [id#6, name#7, age#8] == Analyzed Logical Plan == id: int, name: string, age: int Union false, false :- LocalRelation [id#3, name#4, age#5] +- LocalRelation [id#6, name#7, age#8] == Optimized Logical Plan == Union false, false :- LocalRelation [id#3, name#4, age#5] +- LocalRelation [id#6, name#7, age#8] == Physical Plan == Union :- LocalTableScan [id#3, name#4, age#5] +- LocalTableScan [id#6, name#7, age#8]
반응형'Spark' 카테고리의 다른 글
[Spark] S3 DataFrameReader 구현하기 (s3a) (0) 2024.01.28 [Spark] groupBy, RelationalGroupedDataset 알아보기 (0) 2024.01.26 [Spark] fold, reduce 알아보기 ( RDD, Action, Aggregation ) (0) 2024.01.19 [SparkSQL] CSV DataFrameReader 알아보기 (0) 2024.01.15 [Spark] Spark lit 알아보기 (0) 2023.12.24