-
[Catalyst Optimizer] PruneFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42반응형
- 목차
- 들어가며.
- PruneFilters Object 살펴보기.
- 항상 참인 Filter 제거.
- 항상 부정인 조건 Filter.
- 중복 조건 제거.
- 포함 관계의 Filter 는 제거되지 않는다.
- 관련된 명령어.
들어가며.
Catalyst Optimizer 는 Rule Based Optimization 을 수행합니다.
그 중 한가지 방식인 PruneFilters 는 실행이 무의미한 Filter Expression 들을 제거하는 역할을 합니다.
이어지는 글에서 다양한 PruneFilters Rule 의 적용 예시를 살펴보도록 하겠습니다.
PruneFilters Object 살펴보기.
org.apache.spark.sql.catalyst.optimizer 패키지의 Optimizer.scala 파일 내부에 PruneFilters 클래스가 존재합니다.
내용은 아래와 같습니다.
- filter(true) 와 같은 연산은 제거합니다.
- 그래서 child 에 해당하는 LocalRelation 이나 Join, Subquery 등이 반환됩니다.
- filter(false) 나 filter(null) 와 같은 연산은 빈 LocalRelation 을 반환.
- LocalRelation 은 range 나 parallelize 와 같이 메모리에 로드된 데이터들을 의미합니다.
- filter false 나 null 과 같은 상태는 항상 부정 결과를 만들어내므로 Empty 상태의 LocalRelation 이 반환됩니다.
- filter(col('a') == 4).filter(col('a') == 4) 와 같이 중복된 Filter 는 제거됩니다.
object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(FILTER), ruleId) { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child // If the filter condition always evaluate to null or false, // replace the input with an empty relation. case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => cond.deterministic && p.constraints.contains(cond) } if (prunedPredicates.isEmpty) { f } else if (remainingPredicates.isEmpty) { p } else { val newCond = remainingPredicates.reduce(And) Filter(newCond, p) } } }
항상 참인 Filter 제거.
아래와 같이 항상 참인 결과를 반환하는 Filter 는 PruneFilters Rule 의 제거 대상이 됩니다.
case Filter(Literal(true, BooleanType), child) => child
아래는 DataFrame 에 filter(lit(true)) 인 필터를 적용한 예시입니다.
실행 계획을 살펴보면 Optimized Logical Plan 에서 Filter Logical Plan 이 제거된 모습을 확인할 수 있습니다.
val df1 = spark.range(10).filter(lit(true)) df1.explain("extended")
== Parsed Logical Plan == Filter true +- Range (0, 10, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter true +- Range (0, 10, step=1, splits=Some(10)) == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(10)) == Physical Plan == *(1) Range (0, 10, step=1, splits=10)
항상 부정인 조건 Filter.
아래와 같이 항상 부정인 결과를 반환하는 Filter 는 PruneFilters Rule 의 최적화 대상이 됩니다.
ex. where false, where null
case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)
아래와 같이 코드를 실행합니다.
그리고 Optimized Logical Plan 을 확인하게 되면 LocalRelation <empty> 를 확인할 수 있습니다.
LocalRelation 은 모든 데이터를 메모리에서 유지 중인 상태의 LogicalPlan 입니다.
즉, Empty Table 이라고 이해하셔도 될 것 같구요.
Physical Plan 또한 LocalTableScan <empty> 상태로 이어집니다.
val df1 = spark.range(10).filter(lit(false)) df1.explain("extended") df1.collect()
== Parsed Logical Plan == Filter false +- Range (0, 10, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter false +- Range (0, 10, step=1, splits=Some(10)) == Optimized Logical Plan == LocalRelation <empty>, [id#12L] == Physical Plan == LocalTableScan <empty>, [id#12L]
그리고 collect 와 같은 Action 이 호출되어도 Task 가 실행되지 않습니다.
중복 조건 제거.
마지막으로 PruneFilters Rule 은 중복되는 Filter Expression 을 제거합니다.
case f @ Filter(fc, p: LogicalPlan)
예를 들어, 아래와 같이 id 칼럼이 1보다 큰 조건을 중복 적용한 예시가 있습니다.
그리고 출력되는 Optimized Logical Plan 에서는 Filter Expression 의 중복이 제거됨을 확인할 수 있습니다.
val df1 = spark.range(10).filter(col("id") > lit(1)).filter(col("id") > lit(1)) df1.explain("extended") df1.collect()
== Parsed Logical Plan == 'Filter ('id > 1) +- Filter (id#20L > cast(1 as bigint)) +- Range (0, 10, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter (id#20L > cast(1 as bigint)) +- Filter (id#20L > cast(1 as bigint)) +- Range (0, 10, step=1, splits=Some(10)) == Optimized Logical Plan == Filter (id#20L > 1) +- Range (0, 10, step=1, splits=Some(10)) == Physical Plan == *(1) Filter (id#20L > 1) +- *(1) Range (0, 10, step=1, splits=10)
포함 관계의 Filter 는 제거되지 않는다.
아쉽게도 아래와 같이 포함 관계의 Filter
val df1 = spark.range(10).filter(col("id") > lit(1)).filter(col("id") > lit(2)).filter(col("id") > lit(3)) df1.explain("extended") df1.collect()
== Parsed Logical Plan == 'Filter ('id > 3) +- Filter (id#34L > cast(2 as bigint)) +- Filter (id#34L > cast(1 as bigint)) +- Range (0, 10, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Filter (id#34L > cast(3 as bigint)) +- Filter (id#34L > cast(2 as bigint)) +- Filter (id#34L > cast(1 as bigint)) +- Range (0, 10, step=1, splits=Some(10)) == Optimized Logical Plan == Filter ((id#34L > 1) AND ((id#34L > 2) AND (id#34L > 3))) +- Range (0, 10, step=1, splits=Some(10)) == Physical Plan == *(1) Filter ((id#34L > 1) AND ((id#34L > 2) AND (id#34L > 3))) +- *(1) Range (0, 10, step=1, splits=10)
관련된 명령어.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark > Catalyst Optimizer' 카테고리의 다른 글
[Catalyst Optimizer] SimplifyCasts Rule 알아보기 (0) 2024.08.12 [Catalyst Optimier] ConstantPropagation 알아보기 (0) 2024.08.10 [Catalyst Optimizer] CombineFilters 알아보기 (0) 2024.08.09