ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Catalyst Optimizer] PruneFilters 알아보기
    Spark/Catalyst Optimizer 2024. 8. 9. 22:42
    반응형

    - 목차

     

    들어가며.

    Catalyst Optimizer 는 Rule Based Optimization 을 수행합니다.

    그 중 한가지 방식인 PruneFilters 는 실행이 무의미한 Filter Expression 들을 제거하는 역할을 합니다.

    이어지는 글에서 다양한 PruneFilters Rule 의 적용 예시를 살펴보도록 하겠습니다.

     

    PruneFilters Object 살펴보기.

    org.apache.spark.sql.catalyst.optimizer 패키지의 Optimizer.scala 파일 내부에 PruneFilters 클래스가 존재합니다.

    내용은 아래와 같습니다. 

    1. filter(true) 와 같은 연산은 제거합니다.
      • 그래서 child 에 해당하는 LocalRelation 이나 Join, Subquery 등이 반환됩니다. 
    2. filter(false) 나 filter(null) 와 같은 연산은 빈 LocalRelation 을 반환. 
      1. LocalRelation 은 range 나 parallelize 와 같이 메모리에 로드된 데이터들을 의미합니다.
      2. filter false 나 null 과 같은 상태는 항상 부정 결과를 만들어내므로 Empty 상태의 LocalRelation 이 반환됩니다. 
    3. filter(col('a') == 4).filter(col('a') == 4) 와 같이 중복된 Filter 는 제거됩니다.

     

    object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
      def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
        _.containsPattern(FILTER), ruleId) {
        // If the filter condition always evaluate to true, remove the filter.
        case Filter(Literal(true, BooleanType), child) => child
        // If the filter condition always evaluate to null or false,
        // replace the input with an empty relation.
        case Filter(Literal(null, _), child) =>
          LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)
        case Filter(Literal(false, BooleanType), child) =>
          LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)
        // If any deterministic condition is guaranteed to be true given the constraints on the child's
        // output, remove the condition
        case f @ Filter(fc, p: LogicalPlan) =>
          val (prunedPredicates, remainingPredicates) =
            splitConjunctivePredicates(fc).partition { cond =>
              cond.deterministic && p.constraints.contains(cond)
            }
          if (prunedPredicates.isEmpty) {
            f
          } else if (remainingPredicates.isEmpty) {
            p
          } else {
            val newCond = remainingPredicates.reduce(And)
            Filter(newCond, p)
          }
      }
    }

     

     

    항상 참인 Filter 제거.

    아래와 같이 항상 참인 결과를 반환하는 Filter 는 PruneFilters Rule 의 제거 대상이 됩니다.

    case Filter(Literal(true, BooleanType), child) => child

     

    아래는 DataFrame 에 filter(lit(true)) 인 필터를 적용한 예시입니다.

    실행 계획을 살펴보면 Optimized Logical Plan 에서 Filter Logical Plan 이 제거된 모습을 확인할 수 있습니다.

     

    val df1 = spark.range(10).filter(lit(true))
    df1.explain("extended")
    == Parsed Logical Plan ==
    Filter true
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter true
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Range (0, 10, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Range (0, 10, step=1, splits=10)

     

     

    항상 부정인 조건 Filter.

    아래와 같이 항상 부정인 결과를 반환하는 Filter 는 PruneFilters Rule 의 최적화 대상이 됩니다.

    ex. where false, where null

    case Filter(Literal(null, _), child) =>
      LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)

     

    아래와 같이 코드를 실행합니다.

    그리고 Optimized Logical Plan 을 확인하게 되면 LocalRelation <empty> 를 확인할 수 있습니다.

    LocalRelation 은 모든 데이터를 메모리에서 유지 중인 상태의 LogicalPlan 입니다.

    즉, Empty Table 이라고 이해하셔도 될 것 같구요.

    Physical Plan 또한 LocalTableScan <empty> 상태로 이어집니다.

     

    val df1 = spark.range(10).filter(lit(false))
    df1.explain("extended")
    df1.collect()
    == Parsed Logical Plan ==
    Filter false
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter false
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    LocalRelation <empty>, [id#12L]
    
    == Physical Plan ==
    LocalTableScan <empty>, [id#12L]

     

    그리고 collect 와 같은 Action 이 호출되어도 Task 가 실행되지 않습니다.

     

    중복 조건 제거.

    마지막으로 PruneFilters Rule 은 중복되는 Filter Expression 을 제거합니다.

     

    case f @ Filter(fc, p: LogicalPlan)

     

     

    예를 들어, 아래와 같이 id 칼럼이 1보다 큰 조건을 중복 적용한 예시가 있습니다.

    그리고 출력되는 Optimized Logical Plan 에서는 Filter Expression 의 중복이 제거됨을 확인할 수 있습니다.

    val df1 = spark.range(10).filter(col("id") > lit(1)).filter(col("id") > lit(1))
    df1.explain("extended")
    df1.collect()
    == Parsed Logical Plan ==
    'Filter ('id > 1)
    +- Filter (id#20L > cast(1 as bigint))
       +- Range (0, 10, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter (id#20L > cast(1 as bigint))
    +- Filter (id#20L > cast(1 as bigint))
       +- Range (0, 10, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Filter (id#20L > 1)
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Filter (id#20L > 1)
    +- *(1) Range (0, 10, step=1, splits=10)

     

     

    포함 관계의 Filter 는 제거되지 않는다.

    아쉽게도 아래와 같이 포함 관계의 Filter

    val df1 = spark.range(10).filter(col("id") > lit(1)).filter(col("id") > lit(2)).filter(col("id") > lit(3))
        
    df1.explain("extended")
    df1.collect()
    == Parsed Logical Plan ==
    'Filter ('id > 3)
    +- Filter (id#34L > cast(2 as bigint))
       +- Filter (id#34L > cast(1 as bigint))
          +- Range (0, 10, step=1, splits=Some(10))
    
    == Analyzed Logical Plan ==
    id: bigint
    Filter (id#34L > cast(3 as bigint))
    +- Filter (id#34L > cast(2 as bigint))
       +- Filter (id#34L > cast(1 as bigint))
          +- Range (0, 10, step=1, splits=Some(10))
    
    == Optimized Logical Plan ==
    Filter ((id#34L > 1) AND ((id#34L > 2) AND (id#34L > 3)))
    +- Range (0, 10, step=1, splits=Some(10))
    
    == Physical Plan ==
    *(1) Filter ((id#34L > 1) AND ((id#34L > 2) AND (id#34L > 3)))
    +- *(1) Range (0, 10, step=1, splits=10)

     

    관련된 명령어.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040

     

    반응형
Designed by Tistory.