ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Union 알아보기
    Spark 2024. 1. 26. 23:30
    728x90
    반응형

    - 목차

     

    들어가며.

    Union Transformation 은 두개 이상의 DataFrame 을 하나로 합칩니다.

    DataFrame 을 합친다는 관점에서 Union 은 Join Transformation 과 유사합니다.

    하지만 Join Transformation 과의 차이점은 Row 기반으로 Combining 을 수행하는지 , Colum 기반으로 수행하는지에 대한 차이가 있습니다.

     

    Union Transformation 의 중요한 점은 Row 관점에서의 Dataset 변형이라는 점입니다.

    일반적으로 데이터의 변형은 Schema 가 변경된다던지, Data 의 value 의 Update 등의 뜻합니다.

    Union 은 데이터 그 자체의 변형은 없지만, Dataset 관점에서 바라볼 때, Dataset 의 크기가 증가하게 되고,

    이를 처리하는 Worker 의 갯수나 리소스에 영향을 끼칩니다.

    즉, Union 자체도 하나의 Transformation 이라는 관점을 가지는 것이 중요합니다.

     

    Spark 의 DataFrame 은 Immutable 한 객체입니다.

    여러 DataFrame 들을 하나의 DataFrame 으로 합치기 위해서는 Row 를 추가하는 방식이 사용되진 않습니다.

    아래의 예시와 같이 df1 와 df2 를 Union 하여 새로운 df3 을 생성하는 방식이 사용됩니다.

    package job
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    
    object TestUnion {
      val sparkConf = new SparkConf()
        .setAppName("test-union")
        .setMaster("local[*]")
        .set("spark.driver.bindAddress", "localhost")
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().config(conf = sparkConf).getOrCreate()
        import spark.implicits._
        val df1 = Seq(("Andy", 32), ("Brad", 45)).toDF("name", "age")
        val df2 = Seq(("Chris", 12), ("Dennis", 64)).toDF("name", "age")
        var df3 = df1.union(df2)
        df3.show()
        spark.stop()
    
    
      }
    }

     

    Join Operation.

    Join Operation 은 서로 다른 두 DataFrame 들이 name 이라는 공통된 Column 을 기준으로 병합됩니다.

    그래서 생성되는 새로운 DataFrame 은 기존의 DataFrame Schema 와 공통된 Schema 를 가지지 않습니다.

    이러한 관점에서 Join 은 Schema 를 변형시키는 Transformation 으로 볼 수 있습니다.

    코드 관점에서 볼때, 새로운 Schema, Serialization 의 내용이 필요해집니다.

     

     

     

    Union Operation.

    Union Operation 은 Join 과 달리 결이 비슷한 두 DataFrame 이 Schema 를 유지한 채로 결합됩니다.

    Schema 가 유지되므로 Column 이 아닌 두 DataFrame 의 모든 Row 들이 합쳐집니다.

     

     

    Union Transformation 구현하기.

    아래 예시 코드는 10만개의 Record 들을 가지는 두 DataFrame 을 Union 하는 연산을 구현한 코드입니다.

    ( usersDF1 와 usersDF2 인 두 DataFrame 들을 Union 합니다. )

    package test.spark.transformation
    
    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.functions.col
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{Row, SparkSession}
    
    import scala.util.Random
    
    object TestUnion {
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder.appName("test").master("local[*]").config("spark.driver.bindAddress", "localhost").getOrCreate
    
        val usersList: Seq[Row] = for {id <- 0 until 200000} yield {
          val name: String = Random.alphanumeric.filter(_.isValidChar).take(10).mkString
          val age: Int = Random.alphanumeric.filter(_.isDigit).take(2).mkString.toInt
          Row(id, name, age)
        }
    
        val schema = StructType.fromDDL("id INTEGER, name STRING, age INTEGER")
        implicit val encoder = RowEncoder(schema)
    
        val usersDF1 = spark.createDataset(usersList.take(100000))
        val usersDF2 = spark.createDataset(usersList.takeRight(100000))
    
        val unionDF = usersDF1.union(usersDF2)
        unionDF.explain(true)
        val firstRecord = unionDF.select("id", "name", "age").sort(col("id").asc).take(1)
        val lastRecord = unionDF.select("id", "name", "age").sort(col("id").desc).take(1)
        println("### firstRecord : " + firstRecord.apply(0).toString)
        println("### lastRecord : " + lastRecord.apply(0).toString)
        spark.close
      }
    
    }
    ### firstRecord : [0,VKipTrYw9x,78]
    ### lastRecord : [199999,497L0djpgQ,58]

     

    실행계획은 다음과 같습니다.

     

    Execution Plan.

    Logical Plan 에서는 각 DataFrame 의 Relation 을 뜻하는 LocalRelation 이 2개로 구성됩니다.

    그리고 DataSource 가 Runtime 에서 구현된 Seq[Row] 이므로 실제 실행 계획은 TableScan 으로 설정됩니다.

     

    == Parsed Logical Plan ==
    Union false, false
    :- LocalRelation [id#3, name#4, age#5]
    +- LocalRelation [id#6, name#7, age#8]
    
    == Analyzed Logical Plan ==
    id: int, name: string, age: int
    Union false, false
    :- LocalRelation [id#3, name#4, age#5]
    +- LocalRelation [id#6, name#7, age#8]
    
    == Optimized Logical Plan ==
    Union false, false
    :- LocalRelation [id#3, name#4, age#5]
    +- LocalRelation [id#6, name#7, age#8]
    
    == Physical Plan ==
    Union
    :- LocalTableScan [id#3, name#4, age#5]
    +- LocalTableScan [id#6, name#7, age#8]

     

     

    반응형
Designed by Tistory.