ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] RDD GroupBy 와 Shuffle 알아보기
    Spark 2024. 7. 9. 06:23
    반응형

    - 목차

     

     

    GroupBy 와 Wide Transformation.

    Spark 연산은 크게 Transformation 과 Action 으로 나뉩니다.

    Transformation 은 Filter, Map, FlatMap 처럼 데이터를 직접적으로 변형하는 연산을 의미하며,

    Action 은 Collect, Count, Reduce 와 같이 연산을 마무리짓고, 결과를 Driver 에게 응답하는 동작을 의미합니다.

    더 나아가서 Transformation 은 Narrow Transformation 과 Wide Transformation 으로 분류되는데요.

    Group By 는 대표적인 Wide Transformation 의 한 종류입니다.

     

    Group By 의 연산의 동작을 시각적으로 표현하면 아래와 같습니다.

    Group By 는 후속 Stage 를 만들고, 후속 Stage 의 Task 들에게 데이터를 전달합니다.

    이때에 동일한 Key 를 가지는 데이터를 동일한 후속 Stage 의 Task 에게 전달해야만 합니다.

    이러한 방식으로 공통의 타입 또는 Key 를 가지는 데이터들은 동일한 후속 Stage 의 Task 에서 처리되게 됩니다.

     

     

    이렇듯 Group By 는 반드시 Shuffle 을 동반하는 연산 중의 하나이며, 이를 Wide Transformation 이라고 합니다.

    그리고 이러한 연산은 후속되는 Stage 를 필요로 합니다.

     

     

    Shuffle Data 알아보기.

    Wide Transformation 은 다음 Stage 로 데이터를 전달하기 위해서 Shuffle File 또는 Shuffle Block 을 생성합니다.

    이게 무슨 의미냐하면, Spark 는 이전 Stage 의 모든 Task 들의 실행이 종료되어야 다음 Stage 의 Task 들이 실행됩니다.

    왜냐하면 실행 순서에 종속되기 때문입니다.

    그리고 서로 다른 Task 들은 Shuffle Block File 을 통해서 데이터를 전달하게 됩니다.

     

    Driver 의 실행 시에 spark.log.dir 이라는 설정을 통해서 Shuffle Data 가 저장되는 위치를 지정할 수 있습니다.

    일반적으로 /tmp 디렉토리 하위에 Shuffle Block 들이 저장됩니다.

    간단하게 이를 확인해보면 아래와 같은 네이밍을 가지는 Shuffle Data File 들이 생성되게 됩니다.

     

    range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=2)
    even_or_odd_rdd = range_rdd.map(lambda x: 1 if x % 2 == 0 else 0)
    group_by_rdd = even_or_odd_rdd.groupBy(lambda x: x)
    count_rdd = group_by_rdd.mapValues(lambda v: len(v))
    output = count_rdd.collect()
    shuffle_0_0_0.checksum.ADLER32
    shuffle_0_0_0.data
    shuffle_0_0_0.index
    shuffle_0_1_0.checksum.ADLER32
    shuffle_0_1_0.data
    shuffle_0_1_0.index

     

     

    위 상황을 정리해보면, 저는 Partition 이 2개인 RDD 를 대상으로 Group By 연산을 수행하였습니다.

    그리고 Worker Node 의 /tmp/ Directory 하위에서 위와 같은 Shuffle Data File 들이 생성됩니다. 

     

    만약에 아래의 스크립트처럼 groupBy Task 에 Partition 을 7개로 늘린다면, 아래와 같이 7개의 Shuffle Block File 들이 생성되게 됩니다.

    range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=7)
    even_or_odd_rdd = range_rdd.map(lambda x: 1 if x % 2 == 0 else 0)
    group_by_rdd = even_or_odd_rdd.groupBy(lambda x: x)
    count_rdd = group_by_rdd.mapValues(lambda v: len(v))
    output = count_rdd.collect()
    shuffle_0_0_0.data
    shuffle_0_1_0.data
    shuffle_0_2_0.data
    shuffle_0_3_0.data
    shuffle_0_4_0.data
    shuffle_0_5_0.data
    shuffle_0_6_0.data

     

     

    반응형
Designed by Tistory.