-
[Catalyst Optimier] ConstantPropagation 알아보기Spark/Catalyst Optimizer 2024. 8. 10. 16:27반응형
- 목차
들어가며.
Catalyst Optimizer 의 ConstantPropagation 는 "where a = 1 and b = a" 와 같이 상수가 적용된 Filter Expression 을 최적화합니다.
예를 들어, "where a = 1 and b = a" 와 같은 조건은 ConstantPropagation Rule 의 최적화에 의해서 "where a = 1 and b = 1" 와같이 최적화되며, Constant 인 1이 다른 Filter 로 전파되게 됩니다.
ConstantPropagation Rule 은 아래와 같은 구조를 취합니다.
Attribute 와 Literal 로 구성된 Filter 를 대상으로 ConstantPropagation 이 적용됩니다.
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAllPatterns(LITERAL, FILTER), ruleId) { case f: Filter => val (newCondition, _) = traverse(f.condition, replaceChildren = true, nullIsFalse = true) if (newCondition.isDefined) { f.copy(condition = newCondition.get) } else { f } } private def traverse(condition: Expression, replaceChildren: Boolean, nullIsFalse: Boolean) : (Option[Expression], EqualityPredicates) = condition match { case e @ EqualTo(left: AttributeReference, right: Literal) if safeToReplace(left, nullIsFalse) => (None, Seq(((left, right), e))) case e @ EqualTo(left: Literal, right: AttributeReference) if safeToReplace(right, nullIsFalse) => (None, Seq(((right, left), e))) case e @ EqualNullSafe(left: AttributeReference, right: Literal) if safeToReplace(left, nullIsFalse) => (None, Seq(((left, right), e))) case e @ EqualNullSafe(left: Literal, right: AttributeReference) if safeToReplace(right, nullIsFalse) => (None, Seq(((right, left), e))) case a: And => val (newLeft, equalityPredicatesLeft) = traverse(a.left, replaceChildren = false, nullIsFalse) val (newRight, equalityPredicatesRight) = traverse(a.right, replaceChildren = false, nullIsFalse) val equalityPredicates = equalityPredicatesLeft ++ equalityPredicatesRight val newSelf = if (equalityPredicates.nonEmpty && replaceChildren) { Some(And(replaceConstants(newLeft.getOrElse(a.left), equalityPredicates), replaceConstants(newRight.getOrElse(a.right), equalityPredicates))) } else { if (newLeft.isDefined || newRight.isDefined) { Some(And(newLeft.getOrElse(a.left), newRight.getOrElse(a.right))) } else { None } } (newSelf, equalityPredicates) case o: Or => // Ignore the EqualityPredicates from children since they are only propagated through And. val (newLeft, _) = traverse(o.left, replaceChildren = true, nullIsFalse) val (newRight, _) = traverse(o.right, replaceChildren = true, nullIsFalse) val newSelf = if (newLeft.isDefined || newRight.isDefined) { Some(Or(left = newLeft.getOrElse(o.left), right = newRight.getOrElse((o.right)))) } else { None } (newSelf, Seq.empty) case n: Not => // Ignore the EqualityPredicates from children since they are only propagated through And. val (newChild, _) = traverse(n.child, replaceChildren = true, nullIsFalse = false) (newChild.map(Not), Seq.empty) case _ => (None, Seq.empty) }
위 함수의 내용이 복잡할 수 있으므로 다양한 예시와 함께 ConstantPropagation Rule 에 대해서 알아보도록 하겠습니다.
csv 파일 만들기.
touch /tmp/file.csv echo 'a,b' > /tmp/file.csv for i in {1..100}; do a=$((RANDOM % 5 + 1)) b=$((RANDOM % 5 + 1)) echo "$a,$b" >> /tmp/file.csv done
where a = 1 and a = b.
아래의 예시는 2개의 Filter 가 사용되며, 하나는 Attribute = Literal 그리고 다른 하나는 Attribute = Attribute 관계를 취합니다.
import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("a", IntegerType, nullable = true), StructField("b", IntegerType, nullable = true) )) val df = spark.read.option("header", "true").option("inferSchema", "false").schema(schema).csv("/tmp/file.csv") val filtered = df.filter(col("a") === 1 && col("a") === col("b")) filtered.explain("extended")
== Parsed Logical Plan == 'Filter (('a = 1) AND ('a = 'b)) +- Relation [a#52,b#53] csv == Analyzed Logical Plan == a: int, b: int Filter ((a#52 = 1) AND (a#52 = b#53)) +- Relation [a#52,b#53] csv == Optimized Logical Plan == Filter ((isnotnull(a#52) AND isnotnull(b#53)) AND ((a#52 = 1) AND (1 = b#53))) +- Relation [a#52,b#53] csv == Physical Plan == *(1) Filter (((isnotnull(a#52) AND isnotnull(b#53)) AND (a#52 = 1)) AND (1 = b#53)) +- FileScan csv [a#52,b#53] Batched: false, DataFilters: [isnotnull(a#52), isnotnull(b#53), (a#52 = 1), (1 = b#53)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/tmp/file.csv], PartitionFilters: [], PushedFilters: [IsNotNull(a), IsNotNull(b), EqualTo(a,1), EqualTo(b,1)], ReadSchema: struct<a:int,b:int>
위 Plan 의 결과를 확인하면, Parsed Logical Plan 의 Filter (('a = 1) AND ('a = 'b)) 는
Filter ((isnotnull(a#52) AND isnotnull(b#53)) AND ((a#52 = 1) AND (1 = b#53))) 와 같이 최적화되게 됩니다.
즉, a 와 b 가 모두 1 인 데이터들을 필터링하는 표현식으로 치환되게 되죠.
where a = b and a = 1.
조건의 순서가 바뀌어도 ConstantPropagation 은 동일하게 적용됩니다.
import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("a", IntegerType, nullable = true), StructField("b", IntegerType, nullable = true) )) val df = spark.read.option("header", "true").option("inferSchema", "false").schema(schema).csv("/tmp/file.csv") val filtered = df.filter(col("a") === col("b") && col("a") === 1) filtered.explain("extended")
== Optimized Logical Plan == Filter ((isnotnull(b#57) AND isnotnull(a#56)) AND ((1 = b#57) AND (a#56 = 1))) +- Relation [a#56,b#57] csv
관련된 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark > Catalyst Optimizer' 카테고리의 다른 글
[Catalyst Optimizer] SimplifyCasts Rule 알아보기 (0) 2024.08.12 [Catalyst Optimizer] CombineFilters 알아보기 (0) 2024.08.09 [Catalyst Optimizer] PruneFilters 알아보기 (0) 2024.08.09