-
[Catalyst Optimizer] CombineFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42반응형
- 목차
- 들어가며.
- 연속된 Filter 들은 하나의 Filter 로 합쳐진다.
- PushdownPredicates ?
- Filter 와 Project.
- Filter 와 Union.
- 비결정론적인 (Non-deterministic) 연산의 경우에는 CombineFilters 가 적용되지 않는다.
들어가며.
CombineFilters 는 Catalyst Optimizer 에서 여러개의 Filter 들을 하나의 Filter 로 묶기 위한 최적화 Rule 입니다.
CombineFilters 는 object 로 구현되어 있구요.
org.apache.spark.sql.catalyst.optimizer.Optimizer.scala 파일에 정의됩니다.
object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(FILTER), ruleId)(applyLocally) val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { // The query execution/optimization does not guarantee the expressions are evaluated in order. // We only can combine them if and only if both are deterministic. case Filter(fc, nf @ Filter(nc, grandChild)) if nc.deterministic => val (combineCandidates, nonDeterministic) = splitConjunctivePredicates(fc).partition(_.deterministic) val mergedFilter = (ExpressionSet(combineCandidates) -- ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { case Some(ac) => Filter(And(nc, ac), grandChild) case None => nf } nonDeterministic.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter) } }
applyLocally 함수가 실질적인 CombineFilters Rule 의 동작 방식이구요.
이는 case Filter(fc, nf @ Filter(nc, grandChild)) 의 패턴 매칭 조건을 따릅니다.
case Filter(fc, nf @ Filter(nc, grandChild)) 는 Filter LogicalPlan 의 Child 가 Filter 인 상태를 의미합니다.
즉, df.filter(col("a") > 1).filter(col("b") > 4) 와 같이 Filter 연산이 연속적으로 발생한 상황에서 적용됩니다.
연속된 Filter 들은 하나의 Filter 로 합쳐진다.
아래와 같이 연속된 Filter 연산이 연속적으로 적용된 DataFrame 은 CombineFilters 최적화의 대상이 됩니다.
따라서 아래의 Optimized Logical Plan 을 확인해보면 세개의 조건이 하나의 Filter Plan 의 Expression 으로 병합됩니다.
val df = spark.range(100) val df_1 = df.filter(col("id") > 10) val df_2 = df_1.filter(col("id") < 90) val df_3 = df_2.filter(col("id") % 2 === 0) df_3.explain("extended") df_3.collect()
== Parsed Logical Plan == 'Filter (('id % 2) = 0) +- Filter (id#60L < cast(90 as bigint)) +- Filter (id#60L > cast(10 as bigint)) +- Range (0, 100, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter ((id#60L % cast(2 as bigint)) = cast(0 as bigint)) +- Filter (id#60L < cast(90 as bigint)) +- Filter (id#60L > cast(10 as bigint)) +- Range (0, 100, step=1, splits=Some(10)) == Optimized Logical Plan == Filter ((id#60L > 10) AND ((id#60L < 90) AND ((id#60L % 2) = 0))) +- Range (0, 100, step=1, splits=Some(10)) == Physical Plan == *(1) Filter ((id#60L > 10) AND ((id#60L < 90) AND ((id#60L % 2) = 0))) +- *(1) Range (0, 100, step=1, splits=10)
PushdownPredicates ?
만약 Filter 연산이 연속적이지 않더라도 PushdownPredicates Rule 에 의해서 CombineFilters 는 적용이 가능합니다.
Filter 와 Project.
아래와 같이 filter 와 withColumn 연산이 교차하며 수행되어도 Filter 최적화는 적용됩니다.
이는 PushDownPredicates 와 관련된 최적화이며, Filter 와 Project LogicalPlan 의 관계에서 적용됩니다.
아래의 예시는 Filter 와 Project 논리계획이 교차 적용된 연산입니다.
그리고 Optimized Logical Plan 에서 확인할 수 있듯이, Project 와 Filter 의 Expression 는 모두 하나로 통합되게 됩니다.
val df = spark.range(100) val df_1 = df.filter(col("id") > 10) var df_2 = df_1.withColumn("id_plus_1", col("id") + 1) val df_3 = df_2.filter(col("id") < 90) val df_4 = df_3.withColumn("id_plus_2", col("id") + 2) val df_5 = df_4.filter(col("id") % 2 === 0) df_5.explain("extended") df_5.collect()
== Parsed Logical Plan == 'Filter (('id % 2) = 0) +- Project [id#120L, id_plus_1#124L, (id#120L + cast(2 as bigint)) AS id_plus_2#127L] +- Filter (id#120L < cast(90 as bigint)) +- Project [id#120L, (id#120L + cast(1 as bigint)) AS id_plus_1#124L] +- Filter (id#120L > cast(10 as bigint)) +- Range (0, 100, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint, id_plus_1: bigint, id_plus_2: bigint Filter ((id#120L % cast(2 as bigint)) = cast(0 as bigint)) +- Project [id#120L, id_plus_1#124L, (id#120L + cast(2 as bigint)) AS id_plus_2#127L] +- Filter (id#120L < cast(90 as bigint)) +- Project [id#120L, (id#120L + cast(1 as bigint)) AS id_plus_1#124L] +- Filter (id#120L > cast(10 as bigint)) +- Range (0, 100, step=1, splits=Some(10)) == Optimized Logical Plan == Project [id#120L, (id#120L + 1) AS id_plus_1#124L, (id#120L + 2) AS id_plus_2#127L] +- Filter ((id#120L > 10) AND ((id#120L < 90) AND ((id#120L % 2) = 0))) +- Range (0, 100, step=1, splits=Some(10)) == Physical Plan == *(1) Project [id#120L, (id#120L + 1) AS id_plus_1#124L, (id#120L + 2) AS id_plus_2#127L] +- *(1) Filter ((id#120L > 10) AND ((id#120L < 90) AND ((id#120L % 2) = 0))) +- *(1) Range (0, 100, step=1, splits=10)
이는 PushdownPredicates 최적화 룰과 관련있으며, Filter 와 Project 가 연결된 경우에 Project 를 상위로, Filter 를 하위 플랜으로 이동시킬 수 있습니다.
case Filter(condition, project @ Project(fields, grandChild)) if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) => val aliasMap = getAliasMap(project) project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
Filter 와 Union.
Filter 와 Union 도 마찬가지입니다.
Filter LogicalPlan 의 Child 가 Union 인 경우에 Filter 는 하위로 그리고 Union 는 상위 Plan 으로 위치를 이동할 수 있습니다.
즉, Filter 의 Pushdown 이 가능하죠.
따라서 아래와 같은 Optimized Logical Plan 이 생성됩니다.
val df = spark.range(100) val other_df = spark.range(100) val output_df = df.filter(col("id") > 10).union(other_df).filter(col("id") < 90) output_df.explain("extended") output_df.collect()
== Parsed Logical Plan == 'Filter ('id < 90) +- Union false, false :- Filter (id#120L > cast(10 as bigint)) : +- Range (0, 100, step=1, splits=Some(10)) +- Range (0, 100, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter (id#120L < cast(90 as bigint)) +- Union false, false :- Filter (id#120L > cast(10 as bigint)) : +- Range (0, 100, step=1, splits=Some(10)) +- Range (0, 100, step=1, splits=Some(10)) == Optimized Logical Plan == Union false, false :- Filter ((id#120L > 10) AND (id#120L < 90)) : +- Range (0, 100, step=1, splits=Some(10)) +- Filter (id#131L < 90) +- Range (0, 100, step=1, splits=Some(10)) == Physical Plan == Union :- *(1) Filter ((id#120L > 10) AND (id#120L < 90)) : +- *(1) Range (0, 100, step=1, splits=10) +- *(2) Filter (id#131L < 90) +- *(2) Range (0, 100, step=1, splits=10)
비결정론적인 (Non-deterministic) 연산의 경우에는 CombineFilters 가 적용되지 않는다.
아래와 같이 rand, uuid, unix_timestamp 등과 같은 결정론적이지 않은 함수들은 CombineFilters 최적화에서 제외됩니다.
import org.apache.spark.sql.functions._ val df = spark.range(100) val df_1 = df.filter(col("id") > rand()) val df_2 = df_1.filter(col("id") !== unix_timestamp()) df_2.explain("extended") df_2.collect()
== Parsed Logical Plan == 'Filter NOT ('id = unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, None, false)) +- Filter (cast(id#198L as double) > rand(1575882618332736568)) +- Range (0, 100, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter NOT (id#198L = unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false)) +- Filter (cast(id#198L as double) > rand(1575882618332736568)) +- Range (0, 100, step=1, splits=Some(10)) == Optimized Logical Plan == Filter NOT (id#198L = 1743320825) +- Filter (cast(id#198L as double) > rand(1575882618332736568)) +- Range (0, 100, step=1, splits=Some(10)) == Physical Plan == *(1) Filter NOT (id#198L = 1743320825) +- *(1) Filter (cast(id#198L as double) > rand(1575882618332736568)) +- *(1) Range (0, 100, step=1, splits=10)
반응형'Spark > Catalyst Optimizer' 카테고리의 다른 글
[Catalyst Optimizer] SimplifyCasts Rule 알아보기 (0) 2024.08.12 [Catalyst Optimier] ConstantPropagation 알아보기 (0) 2024.08.10 [Catalyst Optimizer] PruneFilters 알아보기 (0) 2024.08.09