ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] RangePartitioner 알아보기
    Spark 2024. 7. 29. 05:38
    반응형

    - 목차

     

    Partitioner 란 ? 

    Spark 는 Wide Dependency Transformation 을 경계로 Stage 가 분리됩니다.

    GroupByKey, Join, PartitionBy, CoGroup 등과 같은 연산은 데이터의 셔플링을 유발하며, 이를 경계로 Stage 가 분리되게 됩니다.

    이때 이전 Stage 와 후속 Stage 로 나뉘며, 아래와 같은 모습의 DAG 시각화가 이루어집니다.

     

     

     

    위 과정에서 첫번째 Stage 를 Map-Side Task 그리고 두번째 Stage 는 Reduce-Side Task 또는 Mapper Task, Reduer Task 라고 부르기도 합니다.

    Partitioner 는 Mapper Task 에서 데이터들을 어떻게 새로운 Partition 으로 분류할지를 결정하는 방식이 됩니다.

    예를 들어, Mapper Task 에서 아래와 같은 방식으로 동일한 Key 를 그룹지어 파티셔닝을 시도합니다.

     

    이 과정에서 보통 2개의 Partitioner 를 사용합니다.

    그리고 2개의 파티셔너는 각각 HashPartitioner 와 RangePartitioner 입니다.

    이번 글에서는 RangePartitioner 의 동작에 대해서 자세히 알아보는 시간을 가지도록 하겠습니다.

     

    RangePartitioner 란 ?

    우선 RangePartitioner 가 어떻게 동작하는지 간단한 예시와 함께 살펴보도록 하겠습니다.

    아래 코드는 1 ,3 ,5, 7, 9 인 Key 를 가지는 5개의 데이터를 파티셔닝한 결과입니다.

     

    import org.apache.spark.{RangePartitioner, HashPartitioner}
    
    val rdd = sc.parallelize(Seq(
      (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g")
    ))
    
    // RangePartitioner
    val partitioned = rdd.partitionBy(new RangePartitioner(3, rdd))
    partitioned.mapPartitionsWithIndex {
      case (index, iter) => iter.map(x => s"Partition: $index, $x")
    }.collect().foreach(println)
    
    // Partition: 0, (1,a) Partition: 0, (3,c)
    // Partition: 1, (5,e) Partition: 1, (7,g) Partition: 2, (9,i)
    
    // HashPartitioner
    val partitioned = rdd.partitionBy(new HashPartitioner(3))
    partitioned.mapPartitionsWithIndex {
      case (index, iter) => iter.map(x => s"Partition: $index, $x")
    }.collect().foreach(println)
    
    // Partition: 0, (9,i) Partition: 0, (3,c)
    // Partition: 1, (1,a) Partition: 1, (7,g) Partition: 2, (5,e)

     

    RangePartitioner 은 Key 의 순서대로 파티셔닝이 이루어집니다.

    만약 Key 의 자료형이 숫자 기반인 경우에는 숫자값의 오름차순을 기준으로 파티셔닝이 됩니다.

    ( Key 가 문자타입이라면 사전순서대로 a-z 파티셔닝이 적용됩니다. )

    총 3개의 파티션이 있다면 오름차순 순서로 파티셔닝됩니다.

    반면 HashPartitioner 는 단순한 Modulo 또는 해싱 연산을 통해서 적절한 파티션에 배치되게 됩니다.

     

    Sampling 과 Range 경계 찾기.

    HashPartitioner 는 간단하게 동작합니다.

    파티션의 갯수가 3 개라면 ( Key % 3 ) 와 같은 연산을 통해서 배치될 파티션을 결정할 수 있습니다.

    1   → 1 % 3 = 1 → 파티션 1
    2   → 2 % 3 = 2 → 파티션 2
    3   → 3 % 3 = 0 → 파티션 0
    4   → 4 % 3 = 1 → 파티션 1
    5   → 5 % 3 = 2 → 파티션 2
    6   → 6 % 3 = 0 → 파티션 0
    ...
    98  → 98 % 3 = 2 → 파티션 2
    99  → 99 % 3 = 0 → 파티션 0
    100 → 100 % 3 = 1 → 파티션 1

     

     

    HashPartitioner와는 달리, RangePartitioner는 key의 정렬 순서에 기반한 범위 분할 방식을 사용합니다.
    즉, 특정 key가 어느 파티션에 배정될지는 사전에 정의된 범위(boundary)에 따라 결정됩니다.

    1 ~ 4    → 파티션 0
    5 ~ 7    → 파티션 1
    8 이상  → 파티션 2

     

    그리고 이러한 범위 경계(boundary)**를 지정하기 위해, RangePartitioner는 입력 RDD의 일부 데이터를 샘플링합니다.
    이 샘플 데이터를 정렬한 후, 전체 key의 분포를 추정하여 각 파티션이 담당할 key 범위를 결정하게 됩니다.

     

    아래의 코드 예시는 RangePartitioner 를 객체화시키는 예시입니다.

    Spark Application 에서 RangePartitioner 를 객체화시키는 행위만으로도 개별적인 Job 이 실행됩니다.

    그리고 이 Job 은 내부적으로 일부 데이터를 샘플링하여 Partitioning 의 범위를 결정하게 됩니다.

     

    import org.apache.spark.{RangePartitioner}
    
    val rdd = sc.parallelize(Seq(
      (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g")
    ))
    
    // RangePartitioner
    new RangePartitioner(3, rdd)

     

     

     

    다음은 RangePartitioner 객체가 생성되는 과정에서 실행되는 주요 함수들입니다.

    sketch 함수는 내부적으로 rdd.mapPartitionsWithIndex와 action인 collect() 를 호출하여,
    입력 RDD로부터 일부 데이터를 샘플링한 후, key의 분포를 분석하여 범위 경계(boundary) 를 계산합니다.
    이 경계 정보는 이후 각 key가 어느 파티션에 속할지를 결정하는 기준이 됩니다.

    실제로 각 key를 파티션에 배정할 때는,RangePartitioner 내부의 getPartition 함수가 호출되며,
    앞서 계산된 경계(boundaries)를 기반으로 해당 key가 속할 파티션 번호를 찾아냅니다.

     

      def sketch[K : ClassTag](
          rdd: RDD[K],
          sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
        val shift = rdd.id
        // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
        val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
          val seed = byteswap32(idx ^ (shift << 16))
          val (sample, n) = SamplingUtils.reservoirSampleAndCount(
            iter, sampleSizePerPartition, seed)
          Iterator((idx, n, sample))
        }.collect()
        val numItems = sketched.map(_._2).sum
        (numItems, sketched)
      }
      
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
          // If we have less than 128 partitions naive search
          while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
            partition += 1
          }
        } else {
          // Determine which binary search method to use only once.
          partition = binarySearch(rangeBounds, k)
          // binarySearch either returns the match location or -[insertion point]-1
          if (partition < 0) {
            partition = -partition-1
          }
          if (partition > rangeBounds.length) {
            partition = rangeBounds.length
          }
        }
        if (ascending) {
          partition
        } else {
          rangeBounds.length - partition
        }
      }

     

    sortByKey.

    sortByKey 는 RangePartitioner 를 사용하는 대표적인 연산입니다.

    이는 내부적으로 RangePartitioner 를 활용합니다.

    val rdd = sc.parallelize(Seq(
      (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g")
    ))
    
    val sortedRdd = rdd.sortByKey()
    
    sortedRdd.collect().foreach(println)

     

    위 코드가 실행되면 총 2개의 Job 과 3개의 Stage 가 생성됩니다. 

     

     

    첫번째 Job 은 RangePartitioner 의 파티셔닝 경계 결정을 위한 Sampling 과정에 해당합니다.

    데이터 샘플링 이후에 Partitioning Range Boundary 를 결정하게 됩니다.

     

     

    그리고 두번째 Job 은 생성된 RangePartitioner 를 기준으로 SortByKey 연산을 수행합니다.

     

     

     

    관련된 명령어들.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040

     

     

     

    반응형
Designed by Tistory.