-
[Spark] RangePartitioner 알아보기Spark 2024. 7. 29. 05:38반응형
- 목차
Partitioner 란 ?
Spark 는 Wide Dependency Transformation 을 경계로 Stage 가 분리됩니다.
GroupByKey, Join, PartitionBy, CoGroup 등과 같은 연산은 데이터의 셔플링을 유발하며, 이를 경계로 Stage 가 분리되게 됩니다.
이때 이전 Stage 와 후속 Stage 로 나뉘며, 아래와 같은 모습의 DAG 시각화가 이루어집니다.
위 과정에서 첫번째 Stage 를 Map-Side Task 그리고 두번째 Stage 는 Reduce-Side Task 또는 Mapper Task, Reduer Task 라고 부르기도 합니다.
Partitioner 는 Mapper Task 에서 데이터들을 어떻게 새로운 Partition 으로 분류할지를 결정하는 방식이 됩니다.
예를 들어, Mapper Task 에서 아래와 같은 방식으로 동일한 Key 를 그룹지어 파티셔닝을 시도합니다.
이 과정에서 보통 2개의 Partitioner 를 사용합니다.
그리고 2개의 파티셔너는 각각 HashPartitioner 와 RangePartitioner 입니다.
이번 글에서는 RangePartitioner 의 동작에 대해서 자세히 알아보는 시간을 가지도록 하겠습니다.
RangePartitioner 란 ?
우선 RangePartitioner 가 어떻게 동작하는지 간단한 예시와 함께 살펴보도록 하겠습니다.
아래 코드는 1 ,3 ,5, 7, 9 인 Key 를 가지는 5개의 데이터를 파티셔닝한 결과입니다.
import org.apache.spark.{RangePartitioner, HashPartitioner} val rdd = sc.parallelize(Seq( (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g") )) // RangePartitioner val partitioned = rdd.partitionBy(new RangePartitioner(3, rdd)) partitioned.mapPartitionsWithIndex { case (index, iter) => iter.map(x => s"Partition: $index, $x") }.collect().foreach(println) // Partition: 0, (1,a) Partition: 0, (3,c) // Partition: 1, (5,e) Partition: 1, (7,g) Partition: 2, (9,i) // HashPartitioner val partitioned = rdd.partitionBy(new HashPartitioner(3)) partitioned.mapPartitionsWithIndex { case (index, iter) => iter.map(x => s"Partition: $index, $x") }.collect().foreach(println) // Partition: 0, (9,i) Partition: 0, (3,c) // Partition: 1, (1,a) Partition: 1, (7,g) Partition: 2, (5,e)
RangePartitioner 은 Key 의 순서대로 파티셔닝이 이루어집니다.
만약 Key 의 자료형이 숫자 기반인 경우에는 숫자값의 오름차순을 기준으로 파티셔닝이 됩니다.
( Key 가 문자타입이라면 사전순서대로 a-z 파티셔닝이 적용됩니다. )
총 3개의 파티션이 있다면 오름차순 순서로 파티셔닝됩니다.
반면 HashPartitioner 는 단순한 Modulo 또는 해싱 연산을 통해서 적절한 파티션에 배치되게 됩니다.
Sampling 과 Range 경계 찾기.
HashPartitioner 는 간단하게 동작합니다.
파티션의 갯수가 3 개라면 ( Key % 3 ) 와 같은 연산을 통해서 배치될 파티션을 결정할 수 있습니다.
1 → 1 % 3 = 1 → 파티션 1 2 → 2 % 3 = 2 → 파티션 2 3 → 3 % 3 = 0 → 파티션 0 4 → 4 % 3 = 1 → 파티션 1 5 → 5 % 3 = 2 → 파티션 2 6 → 6 % 3 = 0 → 파티션 0 ... 98 → 98 % 3 = 2 → 파티션 2 99 → 99 % 3 = 0 → 파티션 0 100 → 100 % 3 = 1 → 파티션 1
HashPartitioner와는 달리, RangePartitioner는 key의 정렬 순서에 기반한 범위 분할 방식을 사용합니다.
즉, 특정 key가 어느 파티션에 배정될지는 사전에 정의된 범위(boundary)에 따라 결정됩니다.1 ~ 4 → 파티션 0 5 ~ 7 → 파티션 1 8 이상 → 파티션 2
그리고 이러한 범위 경계(boundary)**를 지정하기 위해, RangePartitioner는 입력 RDD의 일부 데이터를 샘플링합니다.
이 샘플 데이터를 정렬한 후, 전체 key의 분포를 추정하여 각 파티션이 담당할 key 범위를 결정하게 됩니다.아래의 코드 예시는 RangePartitioner 를 객체화시키는 예시입니다.
Spark Application 에서 RangePartitioner 를 객체화시키는 행위만으로도 개별적인 Job 이 실행됩니다.
그리고 이 Job 은 내부적으로 일부 데이터를 샘플링하여 Partitioning 의 범위를 결정하게 됩니다.
import org.apache.spark.{RangePartitioner} val rdd = sc.parallelize(Seq( (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g") )) // RangePartitioner new RangePartitioner(3, rdd)
다음은 RangePartitioner 객체가 생성되는 과정에서 실행되는 주요 함수들입니다.
sketch 함수는 내부적으로 rdd.mapPartitionsWithIndex와 action인 collect() 를 호출하여,
입력 RDD로부터 일부 데이터를 샘플링한 후, key의 분포를 분석하여 범위 경계(boundary) 를 계산합니다.
이 경계 정보는 이후 각 key가 어느 파티션에 속할지를 결정하는 기준이 됩니다.실제로 각 key를 파티션에 배정할 때는,RangePartitioner 내부의 getPartition 함수가 호출되며,
앞서 계산된 경계(boundaries)를 기반으로 해당 key가 속할 파티션 번호를 찾아냅니다.def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) } def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } }
sortByKey.
sortByKey 는 RangePartitioner 를 사용하는 대표적인 연산입니다.
이는 내부적으로 RangePartitioner 를 활용합니다.
val rdd = sc.parallelize(Seq( (5, "e"), (1, "a"), (9, "i"), (3, "c"), (7, "g") )) val sortedRdd = rdd.sortByKey() sortedRdd.collect().foreach(println)
위 코드가 실행되면 총 2개의 Job 과 3개의 Stage 가 생성됩니다.
첫번째 Job 은 RangePartitioner 의 파티셔닝 경계 결정을 위한 Sampling 과정에 해당합니다.
데이터 샘플링 이후에 Partitioning Range Boundary 를 결정하게 됩니다.
그리고 두번째 Job 은 생성된 RangePartitioner 를 기준으로 SortByKey 연산을 수행합니다.
관련된 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark' 카테고리의 다른 글
[Spark] DataFrameWriter PartitionBy 알아보기 (0) 2024.08.12 [Spark] RDD Cogroup 연산 알아보기 (0) 2024.07.29 [Spark] RDD combineByKey 알아보기 (0) 2024.07.29 [Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26