-
[Spark] RDD Reduce 내부 동작 알아보기Spark 2024. 7. 9. 06:23반응형
- 목차
Reduce Action 은 Local Aggregation 을 수행한다.
reduce 는 RDD 의 대표적은 Action 연산 중 하나입니다.
Reduce Action 이 실행되면서 Driver 는 Task 들을 생성하여 Worker Node 의 Executor 에게 전달합니다.
아래의 스크립트는 0 ~ 9999 에 해당하는 숫자들 중에서 홀수와 짝수의 갯수를 Reduction 하는 Spark 스크립트입니다.
from pyspark.sql import SparkSession if __name__ == '__main__': spark = (SparkSession.builder .master("spark://127.0.0.1:7077") .appName("WordCount") .getOrCreate()) range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=5) count_tuple_rdd = range_rdd.map(lambda x: (1, 0) if x % 2 == 0 else (0, 1)) final_value = count_tuple_rdd.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1])) print(final_value) spark.stop()
위 스크립트의 RDD 를 분석해보면 0 부터 9999 까지의 숫자 데이터를 포함하는 ParallelCollectionRDD 로부터 Reduce 연산이 시작됩니다.
이 ParallelCollectionRDD 는 총 5개의 Partition 을 생성하고, 각 Partition 은 하나의 Task 가 됩니다.
즉, Partition 과 Task 는 1 대 1로 대응되며, Partition 은 Task 가 실행할 데이터의 부분 집합으로 생각하셔도 됩니다.
RDD 의 소스 데이터들이 어떤 방식으로 분배되는지 확실히 모르지만, Range 기반의 분배된다고 할 때에 각 Task 는 아래와 같은 구조를 취하게 됩니다.
Reduce 연산은 Task 내부에서 병렬적으로 처리됩니다.
그리고 이러한 Reduce 연산 방식을 Local Aggregation 이라고 부릅니다.
Wide Transformation 처럼 서로 다른 Executor 또는 Task 끼리 연산 결과를 공유하지 않습니다.
각 Task 의 Reduction 결과를 Driver 에게 전달.
Reduce 연산은 최종적으로 연산이 마무리된 스칼라 데이터를 Driver 에게 전달하지는 않습니다.
대신 각 Task 가 연산을 마친 결과를 개별적으로 Driver 에게 전달합니다.
즉, Partition 이 10개라면 10개의 Task 의 Reduction 결과들이 각각 Driver 에게 제공됩니다.
그리고 Reduction 연산의 마무리 작업을 Driver 가 수행합니다.
1. 각 Task 의 Reduction 연산 결과를 Driver 에게 전달
각 Task 는 Worker Node 의 Executor 내부에서 Thread 형식으로 실행됩니다.
실제로 Worker Node 내부에서 5개의 Task 가 아래와 같이 Thread 형식으로 실행됨을 확인할 수 있습니다.
"Executor task launch worker for task 0.0 in stage 0.0 (TID 0)" #46 daemon prio=5 os_prio=0 cpu=94.30ms elapsed=23.16s tid=0x0000fffe780d9800 nid=0x6416 waiting on condition [0x0000ffff3d3fd000] "Executor task launch worker for task 1.0 in stage 0.0 (TID 1)" #47 daemon prio=5 os_prio=0 cpu=67.81ms elapsed=23.16s tid=0x0000fffe780de000 nid=0x6417 waiting on condition [0x0000ffff3d1fd000] "Executor task launch worker for task 2.0 in stage 0.0 (TID 2)" #48 daemon prio=5 os_prio=0 cpu=184.07ms elapsed=23.16s tid=0x0000fffe780df800 nid=0x6418 waiting on condition [0x0000ffff3cffd000] "Executor task launch worker for task 3.0 in stage 0.0 (TID 3)" #49 daemon prio=5 os_prio=0 cpu=73.95ms elapsed=23.16s tid=0x0000fffe780e1800 nid=0x6419 waiting on condition [0x0000ffff3cdfd000] "Executor task launch worker for task 4.0 in stage 0.0 (TID 4)" #50 daemon prio=5 os_prio=0 cpu=68.30ms elapsed=23.16s tid=0x0000fffe780e6800 nid=0x641a waiting on condition [0x0000ffff3cbfd000]
그리고 각 Task 는 자신의 Reduction 결과를 Driver 에게 전달하게 되고, Driver 가 연산의 마무리 작업을 완료합니다.
따라서 많은 수의 Partition 이 사용되는 Reduction 연산의 경우에는 Driver 에 부담을 줄 수 있는 작업이 됩니다.
이러한 단점을 극복하고자 TreeReduce 와 같은 Action 이 존재합니다.
반응형'Spark' 카테고리의 다른 글
[Spark] RDD GroupBy 와 Shuffle 알아보기 (0) 2024.07.09 [Spark] RDD treeReduce 알아보기 (0) 2024.07.09 [Spark] RegisterApplication RPC 알아보기 (0) 2024.07.06 [Spark] Spark Cluster 를 구축하기 ( Docker ) (0) 2024.05.17 [Spark] Window 알아보기 ( lag, lead, sum ) (0) 2024.05.15