-
[Spark] RDD Cogroup 연산 알아보기Spark 2024. 7. 29. 05:38반응형
- 목차
Cogroup 이란 ?
Cogroup 연산은 여러 RDD 를 key 기준으로 그룹핑하여, 동일한 key에 매핑된 value 들을 각각의 RDD에서 모아오는 연산입니다.
Cogroup 연산의 Input 과 Output 은 아래와 같습니다.
// 2개의 Input RDD1: RDD[(K, V1)] RDD2: RDD[(K, V2)] // Output RDD[(K, (Iterable[V1], Iterable[V2]))]
그리하여 아래와 같은 연산은 동일한 Key 를 가지는 각 Input RDD 의 Value 들이 Iterable 형식으로 표현됩니다.
val rdd1 = sc.parallelize(Seq( ("a", 1), ("b", 2), ("a", 3) )) val rdd2 = sc.parallelize(Seq( ("a", 100), ("b", 200), ("c", 300) )) val cogrouped = rdd1.cogroup(rdd2) cogrouped.collect().foreach(println)
(a, (CompactBuffer(1, 3), CompactBuffer(100))) (b, (CompactBuffer(2), CompactBuffer(200))) (c, (CompactBuffer(), CompactBuffer(300)))
Cogroup 과 Shuffle Block.
먼저 Cogroup 연산은 Input RDD 의 Partition 의 갯수가 다르다면 셔플링을 발생시킵니다.
아래의 예시는 2개의 RDD 에 대해서 CoGroup 연산을 적용하며,
rdd1 은 10개의 Partition, rdd2 는 5개의 Partition 을 가집니다.
그리고 CoGroupRDD 의 파티션 갯수는 4개로 설정하였습니다.
val rdd1 = sc.parallelize(1 until 1000, 10).map { value => val key = value % 10 (key, value) } val rdd2 = sc.parallelize(1001 until 2000, 5).map { value => val key = value % 10 (key, value) } val cogrouped = rdd1.cogroup(rdd2, 4) cogrouped.collect().foreach(println)
위 코드가 실행되면 아래와 같이 총 15개의 Shuffle Block 파일들이 생성됩니다.
위 10개는 rdd1 으로부터 생성된 10개의 Shuffle Block 이구요.
아래의 5개 파일은 rdd2 로부터 생성된 5개의 Shuffle Block 입니다.
| `-- shuffle_0_8_0.data | `-- shuffle_0_7_0.data | `-- shuffle_0_9_0.data | |-- shuffle_0_5_0.data | `-- shuffle_0_11_0.data | `-- shuffle_0_14_0.data | `-- shuffle_0_13_0.data | `-- shuffle_0_12_0.data | `-- shuffle_0_10_0.data | `-- shuffle_0_6_0.data | `-- shuffle_1_3_0.data | `-- shuffle_1_1_0.data | `-- shuffle_1_2_0.data | `-- shuffle_1_4_0.data | `-- shuffle_1_0_0.data
아래의 DAG Visualization 과 같이 rdd1 과 rdd2 는 Stage 0, Stage 1 에서 데이터 처리 후에 각각 Shuffle Block 들을 생성하게 됩니다.
Stage 0 의 구체적은 RDD Lineage 는 아래와 같습니다.
ParallelCollectionRDD 에서 1 ~ 1000 까지의 데이터를 생성하며, MapPartitionsRDD 에서 PairRDD 로 변형하는 과정을 취합니다.
CoGroupedRDD .
2개의 RDD 들을 대상으로 cogroup 연산을 수행하게 되면 총 3개의 Stage 들이 생성되게 됩니다.
Stage 0 과 Stage 1 는 각각의 RDD 에 해당하는 Stage 이구요.
Stage 2 는 CoGroupedRDD 가 생성됩니다.
이는 ShuffledRDD 처럼 Wide Transformation 이후에 셔플링된 결과를 취합하는 대표적인 RDD 입니다.
위에서 만들어진 총 15개의 Shuffle Block 을 가져와 실질적인 CoGroup 연산을 수행하게 됩니다.
아래의 함수는 CoGroupedRDD 의 compute 함수의 내용입니다.
( 다소 생략된 내용입니다. )
아래 함수에서 중요한 포인트는 아래와 같습니다.
- getReader(shuffleDependency) 를 통해서 다른 Executor 또는 Worker Node 에 있는 원격의 Shuffle Block 들을 불러옵니다.
- map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
연산을 통하여 Key, (Iterable[V1], Iterable[V2]) 형식의 데이터를 만듭니다.
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { val split = s.asInstanceOf[CoGroupPartition] val numRdds = dependencies.length // .. 중략 val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match { case shuffleDependency: ShuffleDependency[_, _, _] => val metrics = context.taskMetrics().createTempShuffleReadMetrics() val it = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) } // .. 중략 val map = createExternalMap(numRdds) for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } }
Join 연산과 CoGroup 연산의 관계 알아보기.
실제로 CoGroup 연산은 자주 활용되진 않습니다.
하지만 Join 연산은 내부적으로 CoGroup 을 활용합니다.
아래의 간단한 Join 연산을 수행해보도록 하겠습니다.
val rdd1 = sc.parallelize(Seq( (1, "apple"), (2, "banana"), (3, "orange") )) val rdd2 = sc.parallelize(Seq( (1, 100), (2, 200), (4, 400) )) val joined = rdd1.join(rdd2) joined.collect().foreach(println)
그럼 아래의 이미지와 같이 간단한 DAG Visualization 을 확인할 수 있습니다.
Stage 5 인 Join Stage 를 좀 더 자세히 살펴봅니다.
Join 연산은 내부적으로 CoGroupedRDD 를 활용하구여.
실제 join 이라는 RDD 함수 또한 아래와 같이 cogroup 연산을 수행합니다.
그리고 flatMapValues 를 통하여 InnerJoin, LeftOuterJoin, FullOuterJoin 등의 실질적인 필터링 과정을 취하게 됩니다.
따라서 위 DAG Visualization 에서 MapPartitionsRDD 는 flatMapValues 에 설정된 Closure Function 을 실행하는 Mapper 단계라고 생각하시면 됩니다.
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) } def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } } def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } }
관련 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040 docker exec -it --user root --privileged worker1 bash apt update && apt install tree -y
반응형'Spark' 카테고리의 다른 글
[Spark] DataFrameWriter PartitionBy 알아보기 (0) 2024.08.12 [Spark] RangePartitioner 알아보기 (0) 2024.07.29 [Spark] RDD combineByKey 알아보기 (0) 2024.07.29 [Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26