ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Catalyst Optimier] ConstantPropagation 알아보기
    Spark/Catalyst Optimizer 2024. 8. 10. 16:27
    반응형

     

    - 목차

     

    들어가며.

    Catalyst Optimizer 의 ConstantPropagation 는 "where a = 1 and b = a"  와 같이 상수가 적용된 Filter Expression 을 최적화합니다.

    예를 들어, "where a = 1 and b = a" 와 같은 조건은 ConstantPropagation Rule 의 최적화에 의해서 "where a = 1 and b = 1" 와같이 최적화되며, Constant 인 1이 다른 Filter 로 전파되게 됩니다.

     

    ConstantPropagation Rule 은 아래와 같은 구조를 취합니다.

    Attribute 와 Literal 로 구성된 Filter 를 대상으로 ConstantPropagation 이 적용됩니다.

      def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
        _.containsAllPatterns(LITERAL, FILTER), ruleId) {
        case f: Filter =>
          val (newCondition, _) = traverse(f.condition, replaceChildren = true, nullIsFalse = true)
          if (newCondition.isDefined) {
            f.copy(condition = newCondition.get)
          } else {
            f
          }
      }
      
      private def traverse(condition: Expression, replaceChildren: Boolean, nullIsFalse: Boolean)
        : (Option[Expression], EqualityPredicates) =
        condition match {
          case e @ EqualTo(left: AttributeReference, right: Literal)
            if safeToReplace(left, nullIsFalse) =>
            (None, Seq(((left, right), e)))
          case e @ EqualTo(left: Literal, right: AttributeReference)
            if safeToReplace(right, nullIsFalse) =>
            (None, Seq(((right, left), e)))
          case e @ EqualNullSafe(left: AttributeReference, right: Literal)
            if safeToReplace(left, nullIsFalse) =>
            (None, Seq(((left, right), e)))
          case e @ EqualNullSafe(left: Literal, right: AttributeReference)
            if safeToReplace(right, nullIsFalse) =>
            (None, Seq(((right, left), e)))
          case a: And =>
            val (newLeft, equalityPredicatesLeft) =
              traverse(a.left, replaceChildren = false, nullIsFalse)
            val (newRight, equalityPredicatesRight) =
              traverse(a.right, replaceChildren = false, nullIsFalse)
            val equalityPredicates = equalityPredicatesLeft ++ equalityPredicatesRight
            val newSelf = if (equalityPredicates.nonEmpty && replaceChildren) {
              Some(And(replaceConstants(newLeft.getOrElse(a.left), equalityPredicates),
                replaceConstants(newRight.getOrElse(a.right), equalityPredicates)))
            } else {
              if (newLeft.isDefined || newRight.isDefined) {
                Some(And(newLeft.getOrElse(a.left), newRight.getOrElse(a.right)))
              } else {
                None
              }
            }
            (newSelf, equalityPredicates)
          case o: Or =>
            // Ignore the EqualityPredicates from children since they are only propagated through And.
            val (newLeft, _) = traverse(o.left, replaceChildren = true, nullIsFalse)
            val (newRight, _) = traverse(o.right, replaceChildren = true, nullIsFalse)
            val newSelf = if (newLeft.isDefined || newRight.isDefined) {
              Some(Or(left = newLeft.getOrElse(o.left), right = newRight.getOrElse((o.right))))
            } else {
              None
            }
            (newSelf, Seq.empty)
          case n: Not =>
            // Ignore the EqualityPredicates from children since they are only propagated through And.
            val (newChild, _) = traverse(n.child, replaceChildren = true, nullIsFalse = false)
            (newChild.map(Not), Seq.empty)
          case _ => (None, Seq.empty)
        }

     

     

    위 함수의 내용이 복잡할 수 있으므로 다양한 예시와 함께 ConstantPropagation Rule 에 대해서 알아보도록 하겠습니다.

     

    csv 파일 만들기.

    touch /tmp/file.csv
    echo 'a,b' > /tmp/file.csv
    
    for i in {1..100}; do
      a=$((RANDOM % 5 + 1))
      b=$((RANDOM % 5 + 1))
      echo "$a,$b" >> /tmp/file.csv
    done

     

    where a = 1 and a = b.

     

    아래의 예시는 2개의 Filter 가 사용되며, 하나는 Attribute = Literal 그리고 다른 하나는 Attribute = Attribute 관계를 취합니다.

    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("a", IntegerType, nullable = true),
      StructField("b", IntegerType, nullable = true)
    ))
    
    val df = spark.read.option("header", "true").option("inferSchema", "false").schema(schema).csv("/tmp/file.csv")
    
    val filtered = df.filter(col("a") === 1 && col("a") === col("b"))
    
    filtered.explain("extended")

     

    == Parsed Logical Plan ==
    'Filter (('a = 1) AND ('a = 'b))
    +- Relation [a#52,b#53] csv
    
    == Analyzed Logical Plan ==
    a: int, b: int
    Filter ((a#52 = 1) AND (a#52 = b#53))
    +- Relation [a#52,b#53] csv
    
    == Optimized Logical Plan ==
    Filter ((isnotnull(a#52) AND isnotnull(b#53)) AND ((a#52 = 1) AND (1 = b#53)))
    +- Relation [a#52,b#53] csv
    
    == Physical Plan ==
    *(1) Filter (((isnotnull(a#52) AND isnotnull(b#53)) AND (a#52 = 1)) AND (1 = b#53))
    +- FileScan csv [a#52,b#53] Batched: false, DataFilters: [isnotnull(a#52), isnotnull(b#53), (a#52 = 1), (1 = b#53)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/tmp/file.csv], PartitionFilters: [], PushedFilters: [IsNotNull(a), IsNotNull(b), EqualTo(a,1), EqualTo(b,1)], ReadSchema: struct<a:int,b:int>

     

    위 Plan 의 결과를 확인하면, Parsed Logical Plan 의 Filter (('a = 1) AND ('a = 'b)) 는 

    Filter ((isnotnull(a#52) AND isnotnull(b#53)) AND ((a#52 = 1) AND (1 = b#53))) 와 같이 최적화되게 됩니다.

    즉, a 와 b 가 모두 1 인 데이터들을 필터링하는 표현식으로 치환되게 되죠.

     

    where a = b and a = 1.

    조건의 순서가 바뀌어도 ConstantPropagation 은 동일하게 적용됩니다.

    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("a", IntegerType, nullable = true),
      StructField("b", IntegerType, nullable = true)
    ))
    
    val df = spark.read.option("header", "true").option("inferSchema", "false").schema(schema).csv("/tmp/file.csv")
    
    val filtered = df.filter(col("a") === col("b") && col("a") === 1)
    
    filtered.explain("extended")
    == Optimized Logical Plan ==
    Filter ((isnotnull(b#57) AND isnotnull(a#56)) AND ((1 = b#57) AND (a#56 = 1)))
    +- Relation [a#56,b#57] csv

     

     

    관련된 명령어들.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040

     

    반응형
Designed by Tistory.