ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] SortMergeJoin 알아보기
    Spark 2024. 8. 17. 08:24
    반응형

    - 목차

     

    SortMergeJoin 이란 ?

    SortMergeJoin 은 SparkSQL 의 여러가지 Join 방식 중의 하나입니다.

    가장 대표적인 Join 방식의 하나로써 이름처럼 Sort 과 Merge 가 주된 알고리즘으로 사용됩니다.

    우선 간단한 예시와 실행 계획 그리고 DAG Visualization 을 확인해보겠습니다.

     

    import spark.implicits._
    import org.apache.spark.sql.types._
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    case class User(id: Int, name: String)
    case class Flag(name: String, bool: Boolean)
    
    val df1 = Seq(User(1, "a"), User(2, "b")).toDF()
    
    val df2 = Seq(Flag("a", true), Flag("b", false)).toDF()
    
    val joined = df1.join(df2, Seq("name"))
    
    joined.queryExecution.executedPlan
    
    // joined.queryExecution.debug.codegen()
    AdaptiveSparkPlan isFinalPlan=false
    +- Project [name#24, id#23, bool#33]
       +- SortMergeJoin [name#24], [name#32], Inner
          :- Sort [name#24 ASC NULLS FIRST], false, 0
          :  +- Exchange hashpartitioning(name#24, 200), ENSURE_REQUIREMENTS, [plan_id=36]
          :     +- LocalTableScan [id#23, name#24]
          +- Sort [name#32 ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(name#32, 200), ENSURE_REQUIREMENTS, [plan_id=37]
                +- LocalTableScan [name#32, bool#33]

     

     

    실행 계획을 살펴보면 위와 같이 Relation -> Exchange -> Sort -> Join 의 순서로 이어집니다.

    Exchange 는 Hash Partitioning 을 사용하며, Join Key 를 기준으로 파티셔닝을 하게 됩니다.

    그리고 이를 경계로 새로운 Reduce Stage 가 생성됩니다.

    이 Reduce Stage 에서 ShuffledRowDD -> MapPartitionsRDD -> ZippedPartitionsRDD 등으로 Task 가 연산을 처리합니다.

    여기서 각 부분에 대해서 자세히 알아보겠습니다.

     

    Reduce Stage 의 ShuffledRowRDD.

     

    ShuffledRowRDD 는 단순히 Map Task 에서 Join Key 를 기준으로 파티셔닝한 Shuffled Block 들을 가져오는 역할을 합니다.

    일종의 ShuffleReader, BlockStoreShuffleReader 로 표현되는 클래스들의 기능을 통해서 Shuffled Block 을 네트워크적으로 가져오게 되며,

    이를 InternalRow 형식으로 변형하여 다음 단계로 넘기게 됩니다.

    이 과정에서 다음 단계는 Sort 연산에 해당합니다.

     

    Reduce Stage 의 MapPartitionsRDD.

     

    MapPartitionsRDD 로 이름이 표현되긴 했지만, 이 부분에서 실질적인 정렬이 수행됩니다.

    SortExec.scala 의 doExecute 함수를 살펴보면 아래와 같이 MapPartitionsRDD 를 반환하게 됩니다.

      protected override def doExecute(): RDD[InternalRow] = {
        val peakMemory = longMetric("peakMemory")
        val spillSize = longMetric("spillSize")
        val sortTime = longMetric("sortTime")
    
        child.execute().mapPartitionsInternal { iter =>
          val sorter = createSorter()
    
          val metrics = TaskContext.get().taskMetrics()
          // Remember spill data size of this task before execute this operator so that we can
          // figure out how many bytes we spilled for this operator.
          val spillSizeBefore = metrics.memoryBytesSpilled
          val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
          sortTime += NANOSECONDS.toMillis(sorter.getSortTimeNanos)
          peakMemory += sorter.getPeakMemoryUsage
          spillSize += metrics.memoryBytesSpilled - spillSizeBefore
          metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
    
          sortedIterator
        }
      }

     

     

    그리고 이 과정에서 실질적인 정렬이 적용됩니다.

    UnsafeExternalRowSorter 라는 클래스를 통해서 안정적인 정렬 연산이 수행되구요.

    그 내부적으로 Disk Spill, File 기반의 Sort, k-ways merge 등의 기능이 수행됩니다.

     

    이렇게 정렬을 마친 후에 정렬된 데이터들은 다음 Node 로 이동합니다.

    다음 Node 는 SortMergeJoinExec 으로부터 생성된 ZippedPartitionsRDD 와 MapPartitionsRDD 에 해당합니다.

     

    k-ways merge.

    k-way merge는 여러 개의 정렬된 데이터를 하나로 합치는 정렬 방식입니다.

    예를 들어, 정렬된 여러 개의 작은 파일이 있다고 가정해 봅시다.
    각 파일에서 하나씩 데이터를 꺼내고, 이 중 가장 작은 값을 선택해 최종 결과 배열에 추가합니다.
    그 다음, 값을 꺼냈던 파일에서 다음 데이터를 가져와 다시 비교하는 과정을 반복합니다.

    이 방식은 정렬된 다수의 데이터 소스를 효율적으로 병합할 수 있어,
    외부 정렬(external sort) 이나 병렬 정렬 결과를 합칠 때 자주 사용됩니다.

     

    특히, MySQL 과 같은 데이터베이스에서 정렬 시에 File Sorting 이라는 이름으로 사용되곤합니다.

     

     

    SortMergeJoinExec.

     

    SortMergeJoinExec는 정렬된 두 입력 RDD를 받아, 내부에서 ZippedPartitionsRDDMapPartitionsRDD 를 통해 Join 연산을 수행합니다.

     

    ZippedPartitionsRDD 는 N 개의 RDD 를 입력으로 받아서 이를 List(rdd1, rdd2) 형식으로 변환합니다.

    ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning)

     

    그리고 이러한 데이터들이 MapPartitionsRDD 로 넘어가게 됩니다. 

    MapPartitionsRDD 는 RDD1 과 RDD2 의 Partition 데이터들을 Iterable[InternalRow] 형태로 전달받게 됩니다.

    그리고 이들은 아래와 같은 이중 반복문을 순회하면서 동일한 Key 를 가지는 Row 들끼리 Join 연산을 적용합니다.

     

    SortMergeJoinExec.scala 의 def doExecute 함수

      while (rightMatchesIterator != null) {
        if (!rightMatchesIterator.hasNext) {
          if (smjScanner.findNextInnerJoinRows()) {
            currentRightMatches = smjScanner.getBufferedMatches
            currentLeftRow = smjScanner.getStreamedRow
            rightMatchesIterator = currentRightMatches.generateIterator()
          } else {
            currentRightMatches = null
            currentLeftRow = null
            rightMatchesIterator = null
            return false
          }
        }
        joinRow(currentLeftRow, rightMatchesIterator.next())
        if (boundCondition(joinRow)) {
          numOutputRows += 1
          return true
        }
      }

     

    결론적으로

    1. Joining Key 를 기준으로 Sorting 을 수행한다.
    2. ZippedPartitionRDD 에서 2개의 RDD 가 하나의 Tuple2 형태로 묶이게 된다.
    3. SortMergeJoinExec 의 MapPartitionsRDD 연산에서 이중 반복문을 통해 Join 이 수행된다.

     

     

    https://westlife0615.tistory.com/42

     

    [Spark] Broadcast Join 연산 알아보기 ( spark.sql.autoBroadcastJoinThreshold )

    - 목차 들어가며.SparkSQL 에서 2개의 DataFrame 을 기준으로 Join 연산을 수행할 수 있습니다.이때에 BroadcastJoin, SortMergeJoin, ShuffledHashJoin 등의 Join 전략을 채택하여 실행 계획을 구성하며,특히 autoBroadca

    westlife0615.tistory.com

     

    반응형
Designed by Tistory.