ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] RDD treeReduce 알아보기
    Spark 2024. 7. 9. 06:23
    반응형

    - 목차

     

    일반적인 Reduce 연산.

    일반적인 Reduce 연산은 리스트를 스칼라 값으로 변환합니다.

    대표적으로 sum, count, min, max 등의 Aggregation 연산들은 리스트를 하나의 스칼라 값을 출력하게 됩니다.

    Spark 의 RDD Reduce 연산 또한 하나의 리스트를 분산시켜 Aggregation 연산을 수행한다는 점에서 이하동문입니다.

     

    하나의 리스트를 분할하여 Task 또는 Partition 을 만든 후 여러 Executor 로 전달합니다.

    각 Executor 는 Driver 로부터 전달받은 Task 를 처리합니다.

    처리된 Task 는 Driver 로 반환됩니다.

    이때 Task 의 갯수가 N 개라면 Driver 로 전달되는 결과는 N 개의 결과값을 받게 됩니다.

    그리고 Driver 는 N 개의 연산을 수행하여 최종적인 Reduce 연산을 마무리합니다.

     

     

     

    그래서 일반적인 Spark RDD 의 Reduce 연산은 Driver 에게 연산의 부담을 주게 됩니다.

    하지만 treeReduce 를 사용하면 Driver 에서 수행되는 마지막 연산의 부담을 줄어들게 할 수 있습니다.

     

    https://westlife0615.tistory.com/975

     

    [Spark] RDD Reduce 내부 동작 알아보기

    - 목차 Reduce Action 은 Local Aggregation 을 수행한다.reduce 는 RDD 의 대표적은 Action 연산 중 하나입니다.Reduce Action 이 실행되면서 Driver 는 Task 들을 생성하여 Worker Node 의 Executor 에게 전달합니다.아래의

    westlife0615.tistory.com

     

     

    treeReduce 의 동작 방식.

    Spark RDD 의 treeReduce 연산은 추가적인 Stage 를 만들어서 Reduction 연산을 수행합니다.

    아래 이미지와 같이 Worker Node 의 Executor 에서 최대한 많은 양의 Reduction 연산을 수행하기 위해서 새로운 Stage 를 추가하여 연산을 수행합니다.

    각 Stage 에 따른 Reduce 연산의 연결고리가 트리구조를 이루기 때문에 treeReduce 라고 불립니다.

     

     

     

    treeReduce 연산의 기본적인 사용 방법은 아래와 같습니다.

     

    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = (SparkSession.builder
                 .master("spark://127.0.0.1:7077")
                 .appName("WordCount")
                 .getOrCreate())
    
        range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=5)
        count_tuple_rdd = range_rdd.map(lambda x: (1, 0) if x % 2 == 0 else (0, 1))
        final_value = count_tuple_rdd.treeReduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
        print(final_value)
    
        spark.stop()

     

     

     

    depth 와 Stage.

     

    treeReduce 는 depth 를 설정할 수 있습니다.

    depth 는 Tree Reduction 을 수행할 Stage 의 갯수를 지정하는 옵션입니다.

    기본적으로 depth 는 2 로 설정되어 있습니다.

     

    Partition 10 & Depth 2.

    아래는 10개의 Partition 과 Depth 가 2인 Tree Reduction 입니다.

    range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=10)
    count_tuple_rdd = range_rdd.map(lambda x: (1, 0) if x % 2 == 0 else (0, 1))
    final_value = count_tuple_rdd.treeReduce(lambda a, b: (a[0] + b[0], a[1] + b[1]), 2)

     

     

    아래의 이미지와 같이 Stage 0 과 Stage 1 이 생성됩니다.

    그리고 Stage 0 는 10개의 Task 를 실행하고, Stage 1 은 2개의 Task 들을 실행합니다.

    이 경우에는 Driver 가 Stage 1 의 2개 Task 로부터 Reduction 결과값을 제공받습니다.

     

     

     

    Partition 50 & Depth 3.

    아래는 50개의 Partition 과 Depth 가 3인 Tree Reduction 에 대한 Job Graph 입니다.

    Depth 가 3이므로 총 3개의 Stage 가 생성됩니다.

    그리고 각각 50 -> 12 -> 3 관계의 Task 들이 실행됩니다.

     

     

     

    반응형
Designed by Tistory.