-
[Catalyst Optimizer] SimplifyCasts Rule 알아보기Spark/Catalyst Optimizer 2024. 8. 12. 05:50반응형
- 목차
들어가며.
SimplifyCasts Rule 은 이름 그대로 불필요한 Cast 연산을 제거하는 최적화 룰입니다.
SimplifyCasts Rule 의 구현 내용은 아래와 같습니다.
- case Cast(e, dataType, _, _) if e.dataType == dataType
- 같은 타입으로 캐스팅한다면 캐스팅 연산 제거
- case (ArrayType(from, false), ArrayType(to, true))
- Null 을 포함하지 않는 Array 를 Null 을 포함하는 Array 로 캐스팅한다면 캐스팅 연산 제거.
- case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))
- Null 을 포함하지 않는 Map 을 Null 을 포함하는 Map 으로 캐스팅한다면 캐스팅 연산 제거.
object SimplifyCasts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning( _.containsPattern(CAST), ruleId) { case Cast(e, dataType, _, _) if e.dataType == dataType => e case c @ Cast(e, dataType, _, _) => (e.dataType, dataType) match { case (ArrayType(from, false), ArrayType(to, true)) if from == to => e case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true)) if fromKey == toKey && fromValue == toValue => e case _ => c } } }
동일한 타입으로 캐스팅.
Scala Spark 에서 Range 를 기반으로 DataFrame 을 생성하게 되면 IntegerType 으로 변형됩니다.
(1 to 100).toDF("value").schema res60: org.apache.spark.sql.types.StructType = StructType(StructField(value,IntegerType,false))
따라서 아래의 예시에선 IntegerType 을 IntegerType 으로 변형하는 코드이므로 SimplifyCasts Rule 에 의해서 캐스팅이 제거됩니다.
import org.apache.spark.sql.types._ import spark.implicits._ val df = (1 to 100).toDF("value").withColumn("value", col("value").cast(IntegerType)) df.explain("extended")
== Parsed Logical Plan == 'Project [cast('value as int) AS value#29] +- Project [value#24 AS value#27] +- LocalRelation [value#24] == Analyzed Logical Plan == value: int Project [cast(value#27 as int) AS value#29] +- Project [value#24 AS value#27] +- LocalRelation [value#24] == Optimized Logical Plan == LocalRelation [value#29] == Physical Plan == LocalTableScan [value#29]
반면 일반적인 CSV 파일을 읽게 되는 경우에는 지정된 타입이 없기 때문에 아래와 같이 StringType 으로 설정됩니다.
val df = (spark.read .format("csv") .load("hdfs://namenode:8020/user/spark/num") .withColumnRenamed("_c0", "value") )
df.schema res61: org.apache.spark.sql.types.StructType = StructType(StructField(value,StringType,true))
이러한 경우에는 아래처럼 SimplifyCasts Rule 에 의한 최적화가 되지 않습니다.
왜냐하면 StringType 은 IntegerType 과 동일한 데이터 타입이 아니기 때문입니다.
== Parsed Logical Plan == 'Project [cast('value as int) AS value#187] +- Project [_c0#183 AS value#185] +- Relation [_c0#183] csv == Analyzed Logical Plan == value: int Project [cast(value#185 as int) AS value#187] +- Project [_c0#183 AS value#185] +- Relation [_c0#183] csv == Optimized Logical Plan == Project [cast(_c0#183 as int) AS value#187] +- Relation [_c0#183] csv == Physical Plan == *(1) Project [cast(_c0#183 as int) AS value#187] +- FileScan csv [_c0#183] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://namenode:8020/user/spark/num], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string>
Complex Type 과 ContainsNull Casts.
ArrayType 과 MapType 은 containsNull 을 기준으로 SimplifyCasts 최적화 여부가 결정됩니다.
아래와 같이 Null 을 포함하지 않는 ArrayType Column 을 Null 을 포함하는 ArrayType Column 으로 변경하게 되면,
SImplifyCasts Rule 이 적용되어 Cast 연산이 제거됩니다.
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import spark.implicits._ val data = Seq( Row(1, Seq(10, null)), Row(2, Seq(20, 30)) ) val arraySchema = StructType(Seq( StructField("id", IntegerType), StructField("array_col", ArrayType(IntegerType, containsNull = false)) )) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), arraySchema) val castedDF = df.withColumn("array_casted", col("array_col").cast(ArrayType(IntegerType, containsNull = true))) castedDF.explain("extended")
== Parsed Logical Plan == 'Project [id#435, array_col#436, cast('array_col as array<int>) AS array_casted#439] +- LogicalRDD [id#435, array_col#436], false == Analyzed Logical Plan == id: int, array_col: array<int>, array_casted: array<int> Project [id#435, array_col#436, cast(array_col#436 as array<int>) AS array_casted#439] +- LogicalRDD [id#435, array_col#436], false == Optimized Logical Plan == Project [id#435, array_col#436, array_col#436 AS array_casted#439] +- LogicalRDD [id#435, array_col#436], false == Physical Plan == *(1) Project [id#435, array_col#436, array_col#436 AS array_casted#439] +- *(1) Scan ExistingRDD[id#435,array_col#436]
MapType 또한 ArrayType 과 동일합니다.
MapType 이 valueContainsNull = False 에서 True 로 캐스팅되는 경우에 Cast 연산이 제거됩니다.
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import spark.implicits._ val data = Seq( Row(1, Seq(10, 40)), Row(2, Seq(20, 30)) ) val mapSchema = StructType(Seq( StructField("id", IntegerType), StructField("map_col", MapType(IntegerType, IntegerType, valueContainsNull = false)) )) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), mapSchema) val castedDF = df.withColumn("map_casted", col("map_col").cast(MapType(IntegerType, IntegerType, valueContainsNull = true))) castedDF.explain("extended")
== Parsed Logical Plan == 'Project [id#479, map_col#480, cast('map_col as map<int,int>) AS map_casted#483] +- LogicalRDD [id#479, map_col#480], false == Analyzed Logical Plan == id: int, map_col: map<int,int>, map_casted: map<int,int> Project [id#479, map_col#480, cast(map_col#480 as map<int,int>) AS map_casted#483] +- LogicalRDD [id#479, map_col#480], false == Optimized Logical Plan == Project [id#479, map_col#480, map_col#480 AS map_casted#483] +- LogicalRDD [id#479, map_col#480], false == Physical Plan == *(1) Project [id#479, map_col#480, map_col#480 AS map_casted#483] +- *(1) Scan ExistingRDD[id#479,map_col#480]
유용한 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark > Catalyst Optimizer' 카테고리의 다른 글
[Catalyst Optimier] ConstantPropagation 알아보기 (0) 2024.08.10 [Catalyst Optimizer] CombineFilters 알아보기 (0) 2024.08.09 [Catalyst Optimizer] PruneFilters 알아보기 (0) 2024.08.09 - case Cast(e, dataType, _, _) if e.dataType == dataType