ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Catalyst Optimizer] CombineFilters 알아보기
    Spark/Catalyst Optimizer 2024. 8. 9. 22:42
    반응형

    - 목차

     

    들어가며.

    CombineFilters 는 Catalyst Optimizer 에서 여러개의 Filter 들을 하나의 Filter 로 묶기 위한 최적화 Rule 입니다.

    CombineFilters 는 object 로 구현되어 있구요.

    org.apache.spark.sql.catalyst.optimizer.Optimizer.scala 파일에 정의됩니다.

     

    object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
      def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
        _.containsPattern(FILTER), ruleId)(applyLocally)
    
      val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
        // The query execution/optimization does not guarantee the expressions are evaluated in order.
        // We only can combine them if and only if both are deterministic.
        case Filter(fc, nf @ Filter(nc, grandChild)) if nc.deterministic =>
          val (combineCandidates, nonDeterministic) =
            splitConjunctivePredicates(fc).partition(_.deterministic)
          val mergedFilter = (ExpressionSet(combineCandidates) --
            ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match {
            case Some(ac) =>
              Filter(And(nc, ac), grandChild)
            case None =>
              nf
          }
          nonDeterministic.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter)
      }
    }

     

    applyLocally 함수가 실질적인 CombineFilters Rule 의 동작 방식이구요.

    이는 case Filter(fc, nf @ Filter(nc, grandChild)) 의 패턴 매칭 조건을 따릅니다.

    case Filter(fc, nf @ Filter(nc, grandChild)) 는 Filter LogicalPlan 의 Child 가 Filter 인 상태를 의미합니다.

    즉, df.filter(col("a") > 1).filter(col("b") > 4) 와 같이 Filter 연산이 연속적으로 발생한 상황에서 적용됩니다.

     

    연속된 Filter 들은 하나의 Filter 로 합쳐진다.

    아래와 같이 연속된 Filter 연산이 연속적으로 적용된 DataFrame 은 CombineFilters 최적화의 대상이 됩니다.

    따라서 아래의 Optimized Logical Plan 을 확인해보면 세개의 조건이 하나의 Filter Plan 의 Expression 으로 병합됩니다.

     

    val df = spark.range(100)
    val df_1 = df.filter(col("id") > 10)
    val df_2 = df_1.filter(col("id") < 90)
    val df_3 = df_2.filter(col("id") % 2 === 0)
      
      
    df_3.explain("extended")
    
    df_3.collect()
    == Parsed Logical Plan ==
    'Filter (('id % 2) = 0)
    +- Filter (id#60L < cast(90 as bigint))
       +- Filter (id#60L > cast(10 as bigint))
          +- Range (0, 100, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter ((id#60L % cast(2 as bigint)) = cast(0 as bigint))
    +- Filter (id#60L < cast(90 as bigint))
       +- Filter (id#60L > cast(10 as bigint))
          +- Range (0, 100, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Filter ((id#60L > 10) AND ((id#60L < 90) AND ((id#60L % 2) = 0)))
    +- Range (0, 100, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Filter ((id#60L > 10) AND ((id#60L < 90) AND ((id#60L % 2) = 0)))
    +- *(1) Range (0, 100, step=1, splits=10)

     

     

    PushdownPredicates ?

    만약 Filter 연산이 연속적이지 않더라도 PushdownPredicates Rule 에 의해서 CombineFilters 는 적용이 가능합니다. 

     

    Filter 와 Project.

    아래와 같이 filter 와 withColumn 연산이 교차하며 수행되어도 Filter 최적화는 적용됩니다.

    이는 PushDownPredicates 와 관련된 최적화이며, Filter 와 Project LogicalPlan 의 관계에서 적용됩니다.

     

    아래의 예시는 Filter 와 Project 논리계획이 교차 적용된 연산입니다.

    그리고 Optimized Logical Plan 에서 확인할 수 있듯이, Project 와 Filter 의 Expression 는 모두 하나로 통합되게 됩니다.

    val df = spark.range(100)
    val df_1 = df.filter(col("id") > 10)
    var df_2 = df_1.withColumn("id_plus_1", col("id") + 1)
    val df_3 = df_2.filter(col("id") < 90)
    val df_4 = df_3.withColumn("id_plus_2", col("id") + 2)
    val df_5 = df_4.filter(col("id") % 2 === 0)
      
    df_5.explain("extended")
    
    df_5.collect()
    == Parsed Logical Plan ==
    'Filter (('id % 2) = 0)
    +- Project [id#120L, id_plus_1#124L, (id#120L + cast(2 as bigint)) AS id_plus_2#127L]
       +- Filter (id#120L < cast(90 as bigint))
          +- Project [id#120L, (id#120L + cast(1 as bigint)) AS id_plus_1#124L]
             +- Filter (id#120L > cast(10 as bigint))
                +- Range (0, 100, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint, id_plus_1: bigint, id_plus_2: bigint
    Filter ((id#120L % cast(2 as bigint)) = cast(0 as bigint))
    +- Project [id#120L, id_plus_1#124L, (id#120L + cast(2 as bigint)) AS id_plus_2#127L]
       +- Filter (id#120L < cast(90 as bigint))
          +- Project [id#120L, (id#120L + cast(1 as bigint)) AS id_plus_1#124L]
             +- Filter (id#120L > cast(10 as bigint))
                +- Range (0, 100, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Project [id#120L, (id#120L + 1) AS id_plus_1#124L, (id#120L + 2) AS id_plus_2#127L]
    +- Filter ((id#120L > 10) AND ((id#120L < 90) AND ((id#120L % 2) = 0)))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Project [id#120L, (id#120L + 1) AS id_plus_1#124L, (id#120L + 2) AS id_plus_2#127L]
    +- *(1) Filter ((id#120L > 10) AND ((id#120L < 90) AND ((id#120L % 2) = 0)))
       +- *(1) Range (0, 100, step=1, splits=10)

     

    이는 PushdownPredicates 최적화 룰과 관련있으며, Filter 와 Project 가 연결된 경우에 Project 를 상위로, Filter 를 하위 플랜으로 이동시킬 수 있습니다.

    case Filter(condition, project @ Project(fields, grandChild))
      if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
      val aliasMap = getAliasMap(project)
      project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

     

     

    Filter 와 Union.

    Filter 와 Union 도 마찬가지입니다.

    Filter LogicalPlan 의 Child 가 Union 인 경우에 Filter 는 하위로 그리고 Union 는 상위 Plan 으로 위치를 이동할 수 있습니다.

    즉, Filter 의 Pushdown 이 가능하죠.

    따라서 아래와 같은 Optimized Logical Plan 이 생성됩니다.

     

    val df = spark.range(100)
    val other_df = spark.range(100)
    
    val output_df = df.filter(col("id") > 10).union(other_df).filter(col("id") < 90)
      
    output_df.explain("extended")
    
    output_df.collect()
    == Parsed Logical Plan ==
    'Filter ('id < 90)
    +- Union false, false
       :- Filter (id#120L > cast(10 as bigint))
       :  +- Range (0, 100, step=1, splits=Some(10))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter (id#120L < cast(90 as bigint))
    +- Union false, false
       :- Filter (id#120L > cast(10 as bigint))
       :  +- Range (0, 100, step=1, splits=Some(10))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Union false, false
    :- Filter ((id#120L > 10) AND (id#120L < 90))
    :  +- Range (0, 100, step=1, splits=Some(10))
    +- Filter (id#131L < 90)
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Physical Plan ==
    Union
    :- *(1) Filter ((id#120L > 10) AND (id#120L < 90))
    :  +- *(1) Range (0, 100, step=1, splits=10)
    +- *(2) Filter (id#131L < 90)
       +- *(2) Range (0, 100, step=1, splits=10)

     

     

    비결정론적인 (Non-deterministic) 연산의 경우에는 CombineFilters 가 적용되지 않는다.

    아래와 같이 rand, uuid, unix_timestamp 등과 같은 결정론적이지 않은 함수들은 CombineFilters 최적화에서 제외됩니다.

     

    import org.apache.spark.sql.functions._
    
    val df = spark.range(100)
    val df_1 = df.filter(col("id") > rand())
    val df_2 = df_1.filter(col("id") !== unix_timestamp())
      
    df_2.explain("extended")
    
    df_2.collect()

     

    == Parsed Logical Plan ==
    'Filter NOT ('id = unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, None, false))
    +- Filter (cast(id#198L as double) > rand(1575882618332736568))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter NOT (id#198L = unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false))
    +- Filter (cast(id#198L as double) > rand(1575882618332736568))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Filter NOT (id#198L = 1743320825)
    +- Filter (cast(id#198L as double) > rand(1575882618332736568))
       +- Range (0, 100, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Filter NOT (id#198L = 1743320825)
    +- *(1) Filter (cast(id#198L as double) > rand(1575882618332736568))
       +- *(1) Range (0, 100, step=1, splits=10)

     

     

     

    반응형
Designed by Tistory.