ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Catalyst Optimizer] SimplifyCasts Rule 알아보기
    Spark/Catalyst Optimizer 2024. 8. 12. 05:50
    반응형

    - 목차

     

    들어가며.

    SimplifyCasts Rule 은 이름 그대로 불필요한 Cast 연산을 제거하는 최적화 룰입니다.

    SimplifyCasts Rule 의 구현 내용은 아래와 같습니다.

     

    1. case Cast(e, dataType, _, _) if e.dataType == dataType
      • 같은 타입으로 캐스팅한다면 캐스팅 연산 제거
    2. case (ArrayType(from, false), ArrayType(to, true))
      1. Null 을 포함하지 않는 Array 를 Null 을 포함하는 Array 로 캐스팅한다면 캐스팅 연산 제거.
    3. case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))
      1. Null 을 포함하지 않는 Map 을 Null 을 포함하는 Map 으로 캐스팅한다면 캐스팅 연산 제거.

     

    object SimplifyCasts extends Rule[LogicalPlan] {
      def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
        _.containsPattern(CAST), ruleId) {
        case Cast(e, dataType, _, _) if e.dataType == dataType => e
        case c @ Cast(e, dataType, _, _) => (e.dataType, dataType) match {
          case (ArrayType(from, false), ArrayType(to, true)) if from == to => e
          case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))
            if fromKey == toKey && fromValue == toValue => e
          case _ => c
          }
      }
    }

     

    동일한 타입으로 캐스팅.

    Scala Spark 에서 Range 를 기반으로 DataFrame 을 생성하게 되면 IntegerType 으로 변형됩니다.

    (1 to 100).toDF("value").schema
    res60: org.apache.spark.sql.types.StructType = StructType(StructField(value,IntegerType,false))

     

    따라서 아래의 예시에선 IntegerType 을 IntegerType 으로 변형하는 코드이므로 SimplifyCasts Rule 에 의해서 캐스팅이 제거됩니다.

    import org.apache.spark.sql.types._
    import spark.implicits._
    
    val df = (1 to 100).toDF("value").withColumn("value", col("value").cast(IntegerType))
    
    df.explain("extended")
    == Parsed Logical Plan ==
    'Project [cast('value as int) AS value#29]
    +- Project [value#24 AS value#27]
       +- LocalRelation [value#24]
    
    == Analyzed Logical Plan ==
    value: int
    Project [cast(value#27 as int) AS value#29]
    +- Project [value#24 AS value#27]
       +- LocalRelation [value#24]
    
    == Optimized Logical Plan ==
    LocalRelation [value#29]
    
    == Physical Plan ==
    LocalTableScan [value#29]

     

     

    반면 일반적인 CSV 파일을 읽게 되는 경우에는 지정된 타입이 없기 때문에 아래와 같이 StringType 으로 설정됩니다.

    val df = (spark.read
    	.format("csv")
    	.load("hdfs://namenode:8020/user/spark/num")
    	.withColumnRenamed("_c0", "value")
    )
    df.schema
    res61: org.apache.spark.sql.types.StructType = StructType(StructField(value,StringType,true))

     

    이러한 경우에는 아래처럼 SimplifyCasts Rule 에 의한 최적화가 되지 않습니다.

    왜냐하면 StringType 은 IntegerType 과 동일한 데이터 타입이 아니기 때문입니다.

     

    == Parsed Logical Plan ==
    'Project [cast('value as int) AS value#187]
    +- Project [_c0#183 AS value#185]
       +- Relation [_c0#183] csv
    
    == Analyzed Logical Plan ==
    value: int
    Project [cast(value#185 as int) AS value#187]
    +- Project [_c0#183 AS value#185]
       +- Relation [_c0#183] csv
    
    == Optimized Logical Plan ==
    Project [cast(_c0#183 as int) AS value#187]
    +- Relation [_c0#183] csv
    
    == Physical Plan ==
    *(1) Project [cast(_c0#183 as int) AS value#187]
    +- FileScan csv [_c0#183] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://namenode:8020/user/spark/num], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string>

     

     

    Complex Type 과 ContainsNull Casts.

    ArrayType 과 MapType 은 containsNull 을 기준으로 SimplifyCasts 최적화 여부가 결정됩니다.

    아래와 같이 Null 을 포함하지 않는 ArrayType Column 을 Null 을 포함하는 ArrayType Column 으로 변경하게 되면,

    SImplifyCasts Rule 이 적용되어 Cast 연산이 제거됩니다.

     

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    import spark.implicits._
    
    val data = Seq(
      Row(1, Seq(10, null)),
      Row(2, Seq(20, 30))
    )
    
    val arraySchema = StructType(Seq(
      StructField("id", IntegerType),
      StructField("array_col", ArrayType(IntegerType, containsNull = false))
    ))
    
    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), arraySchema)
    
    val castedDF = df.withColumn("array_casted", col("array_col").cast(ArrayType(IntegerType, containsNull = true)))
    
    castedDF.explain("extended")
    == Parsed Logical Plan ==
    'Project [id#435, array_col#436, cast('array_col as array<int>) AS array_casted#439]
    +- LogicalRDD [id#435, array_col#436], false
    
    == Analyzed Logical Plan ==
    id: int, array_col: array<int>, array_casted: array<int>
    Project [id#435, array_col#436, cast(array_col#436 as array<int>) AS array_casted#439]
    +- LogicalRDD [id#435, array_col#436], false
    
    == Optimized Logical Plan ==
    Project [id#435, array_col#436, array_col#436 AS array_casted#439]
    +- LogicalRDD [id#435, array_col#436], false
    
    == Physical Plan ==
    *(1) Project [id#435, array_col#436, array_col#436 AS array_casted#439]
    +- *(1) Scan ExistingRDD[id#435,array_col#436]

     

     

    MapType 또한 ArrayType 과 동일합니다.

    MapType 이 valueContainsNull = False 에서 True 로 캐스팅되는 경우에 Cast 연산이 제거됩니다.

     

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    import spark.implicits._
    
    val data = Seq(
      Row(1, Seq(10, 40)),
      Row(2, Seq(20, 30))
    )
    
    val mapSchema = StructType(Seq(
      StructField("id", IntegerType),
      StructField("map_col", MapType(IntegerType, IntegerType, valueContainsNull = false))
    ))
    
    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), mapSchema)
    
    val castedDF = df.withColumn("map_casted", col("map_col").cast(MapType(IntegerType, IntegerType, valueContainsNull = true)))
    
    castedDF.explain("extended")
    == Parsed Logical Plan ==
    'Project [id#479, map_col#480, cast('map_col as map<int,int>) AS map_casted#483]
    +- LogicalRDD [id#479, map_col#480], false
    
    == Analyzed Logical Plan ==
    id: int, map_col: map<int,int>, map_casted: map<int,int>
    Project [id#479, map_col#480, cast(map_col#480 as map<int,int>) AS map_casted#483]
    +- LogicalRDD [id#479, map_col#480], false
    
    == Optimized Logical Plan ==
    Project [id#479, map_col#480, map_col#480 AS map_casted#483]
    +- LogicalRDD [id#479, map_col#480], false
    
    == Physical Plan ==
    *(1) Project [id#479, map_col#480, map_col#480 AS map_casted#483]
    +- *(1) Scan ExistingRDD[id#479,map_col#480]

     

     

    유용한 명령어들.

    docker network create spark
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040

     

     

    반응형
Designed by Tistory.