-
[Spark] RDD combineByKey 알아보기Spark 2024. 7. 29. 05:38반응형
- 목차
- 들어가며.
- combineByKey 사용하기.
- createCombiner.
- mergeValue.
- mergeCombiners.
- reduceByKey 와 combineByKey 의 관계.
- 관련된 명령어들.
들어가며.
Spark RDD 연산에서 대부분의 셔플링 연산은 combineByKey 로 변형됩니다.
예를 들어서 reduceByKey, aggregateByKey, countByValue, join 연산 등은 내부적으로 combineByKey 를 활용하게 됩니다.
즉, Wide Transformation 을 유발하는 셔플링 연산은 내부적으로 반드시 combineByKey 를 활용합니다.
이번 글에서는 combineByKey 의 기본적인 사용법과 다른 Wide Transformation 연산과의 관계에 대해서 알아봅니다.
combineByKey 사용하기.
아래의 예시는 1 부터 1000 까지의 숫자들 중에서 홀수와 짝수의 총 갯수를 파악하는 간단한 예시 코드입니다.
그리고 combineByKey 를 사용하여 그 결과를 계산합니다.
val rdd = sc.parallelize(1 to 1000, 10).map {value => val key = value % 2 (key, value) } val count = rdd.combineByKey( (value: Int) => 1, // createCombiner (acc: Int, value: Int) => acc + 1, // mergeValue (valueA: Int, valueB: Int) => valueA + valueB // mergeCombiners ) val output = count.collect()
createCombiner.
combineByKey 는 총 세개의 함수를 통해서 셔플링 연산을 수행합니다.
첫번째 함수의 이름은 createCombiner 입니다.
이름처럼 combiner 를 생성합니다.
combiner 는 Key 별로 하나씩 생성되는 Accumulator 라고 생각하시면 됩니다.
만약 아래와 같은 PairRDD 가 있다고 가정할 때에 총 Key 는 A 부터 G 까지 7개가 존재합니다.
val data = Seq( ("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("D", 6), ("E", 7), ("C", 8), ("F", 9), ("G", 10) )
그리고 createCombiner 함수가 (value: Int) => 1 이라고 한다면,
아래와 같이 각 Key 에 대해서 값이 1 인 Combiner 목록이 생성되게 됩니다.
( ("A", 1), ("B", 1), ("C", 1), ("D", 1), ("E", 1), ("F", 1), ("G", 1) )
mergeValue.
combineByKey 에서 사용되는 두번째 함수는 mergeValue 함수입니다.
이 함수는 Map Side 에서 사용되는 Combiner 함수인데요.
Map Side Task 에서 RDD Partition 의 모든 데이터를 순회하며 각 데이터의 값을 mergeValue 의 정의에 따라 연산을 합니다.
예를 들어, 아래의 경우에서 Key "A" 를 가지는 데이터는 총 2개이죠 ?
val data = Seq( ("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("D", 6), ("E", 7), ("C", 8), ("F", 9), ("G", 10) )
그럼 "A" 를 Key 로 가지는 Data 들은 2번의 mergeValue 연산이 수행되어서 ("A", 2) 의 결과를 만들어냅니다.
따라서 아래와 같이 각 Key 별 Count 를 계산할 수 있게 됩니다.
( ("A", 2), ("B", 2), ("C", 2), ("D", 1), ("E", 1), ("F", 1), ("G", 1) )
여기서 중요한 점은 Map Side 의 각 Partition 별로 연산이 수행된다는 점입니다.
아래의 이미지와 같이 각 Partition 별로 createCombiner 와 mergeValue 함수가 수행되게 됩니다.
mergeCombiners.
마지막으로 mergeCombiners 함수가 적용됩니다.
이는 Reduce Task 에서 수행되는 연산으로 Map Side Task 에서 생성된 Accumulator 들을 병합합니다.
아래의 이미지와 같이 Partition 별로 생성된 Accumulator 들을 Reduce Task 에서 마무리 병합 과정을 수행하게 됩니다.
reduceByKey 와 combineByKey 의 관계.
reduceByKey 는 내부적으로 combineByKey 로 대체됩니다.
reduceByKey 는 combineByKey 의 mergeValue 와 mergeCombiners 함수가 동일한 특징을 가집니다.
예를 들어, Key 별 합계를 구하는 간단한 RDD 연산을 수행합니다.
아래와 같이 reduceByKey 는 combineByKey 로 대체 가능하며, 동일한 mergeValue 와 mergeCombiners 함수를 사용하는 것이 특징입니다.
val fruits = Seq( ("apple", 1000), ("banana", 1500), ("apple", 1200), ("orange", 1300), ("banana", 1400), ("kiwi", 1800), ("apple", 1100), ("mango", 2000), ("kiwi", 1700), ("grape", 1600) ) val fruitRdd = sc.parallelize(fruits) val sum_by_key = fruitRdd.reduceByKey((v1, v2) => v1 + v2) sum_by_key.collect() // Array[(String, Int)] = Array((orange,1300), (apple,3300), (mango,2000), (banana,2900), (kiwi,3500), (grape,1600)) val sum_by_key_2 = fruitRdd.combineByKey( (value: Int) => value, (acc: Int, value: Int) => acc + value, (v1: Int, v2:Int) => v1 + v2 ) sum_by_key_2.collect() // Array[(String, Int)] = Array((orange,1300), (apple,3300), (mango,2000), (banana,2900), (kiwi,3500), (grape,1600))
또한 foldByKey 와 reduceByKey 의 차이점은 초기값의 유무입니다.
따라서 foldByKey 또한 combineByKey 로 대체 가능합니다.
관련된 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark' 카테고리의 다른 글
[Spark] RDD Cogroup 연산 알아보기 (0) 2024.07.29 [Spark] RangePartitioner 알아보기 (0) 2024.07.29 [Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26 [Spark] s3a Protocol 에서 Driver 가 파일 존재를 확인하는 과정 (0) 2024.07.23