-
[Spark] RDD GroupBy 와 Shuffle 알아보기Spark 2024. 7. 9. 06:23반응형
- 목차
GroupBy 와 Wide Transformation.
Spark 연산은 크게 Transformation 과 Action 으로 나뉩니다.
Transformation 은 Filter, Map, FlatMap 처럼 데이터를 직접적으로 변형하는 연산을 의미하며,
Action 은 Collect, Count, Reduce 와 같이 연산을 마무리짓고, 결과를 Driver 에게 응답하는 동작을 의미합니다.
더 나아가서 Transformation 은 Narrow Transformation 과 Wide Transformation 으로 분류되는데요.
Group By 는 대표적인 Wide Transformation 의 한 종류입니다.
Group By 의 연산의 동작을 시각적으로 표현하면 아래와 같습니다.
Group By 는 후속 Stage 를 만들고, 후속 Stage 의 Task 들에게 데이터를 전달합니다.
이때에 동일한 Key 를 가지는 데이터를 동일한 후속 Stage 의 Task 에게 전달해야만 합니다.
이러한 방식으로 공통의 타입 또는 Key 를 가지는 데이터들은 동일한 후속 Stage 의 Task 에서 처리되게 됩니다.
이렇듯 Group By 는 반드시 Shuffle 을 동반하는 연산 중의 하나이며, 이를 Wide Transformation 이라고 합니다.
그리고 이러한 연산은 후속되는 Stage 를 필요로 합니다.
Shuffle Data 알아보기.
Wide Transformation 은 다음 Stage 로 데이터를 전달하기 위해서 Shuffle File 또는 Shuffle Block 을 생성합니다.
이게 무슨 의미냐하면, Spark 는 이전 Stage 의 모든 Task 들의 실행이 종료되어야 다음 Stage 의 Task 들이 실행됩니다.
왜냐하면 실행 순서에 종속되기 때문입니다.
그리고 서로 다른 Task 들은 Shuffle Block File 을 통해서 데이터를 전달하게 됩니다.
Driver 의 실행 시에 spark.log.dir 이라는 설정을 통해서 Shuffle Data 가 저장되는 위치를 지정할 수 있습니다.
일반적으로 /tmp 디렉토리 하위에 Shuffle Block 들이 저장됩니다.
간단하게 이를 확인해보면 아래와 같은 네이밍을 가지는 Shuffle Data File 들이 생성되게 됩니다.
range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=2) even_or_odd_rdd = range_rdd.map(lambda x: 1 if x % 2 == 0 else 0) group_by_rdd = even_or_odd_rdd.groupBy(lambda x: x) count_rdd = group_by_rdd.mapValues(lambda v: len(v)) output = count_rdd.collect()
shuffle_0_0_0.checksum.ADLER32 shuffle_0_0_0.data shuffle_0_0_0.index shuffle_0_1_0.checksum.ADLER32 shuffle_0_1_0.data shuffle_0_1_0.index
위 상황을 정리해보면, 저는 Partition 이 2개인 RDD 를 대상으로 Group By 연산을 수행하였습니다.
그리고 Worker Node 의 /tmp/ Directory 하위에서 위와 같은 Shuffle Data File 들이 생성됩니다.
만약에 아래의 스크립트처럼 groupBy Task 에 Partition 을 7개로 늘린다면, 아래와 같이 7개의 Shuffle Block File 들이 생성되게 됩니다.
range_rdd = spark.sparkContext.parallelize(range(10000), numSlices=7) even_or_odd_rdd = range_rdd.map(lambda x: 1 if x % 2 == 0 else 0) group_by_rdd = even_or_odd_rdd.groupBy(lambda x: x) count_rdd = group_by_rdd.mapValues(lambda v: len(v)) output = count_rdd.collect()
shuffle_0_0_0.data shuffle_0_1_0.data shuffle_0_2_0.data shuffle_0_3_0.data shuffle_0_4_0.data shuffle_0_5_0.data shuffle_0_6_0.data
반응형'Spark' 카테고리의 다른 글
[Spark] RDD treeReduce 알아보기 (0) 2024.07.09 [Spark] RDD Reduce 내부 동작 알아보기 (0) 2024.07.09 [Spark] RegisterApplication RPC 알아보기 (0) 2024.07.06 [Spark] Spark Cluster 를 구축하기 ( Docker ) (0) 2024.05.17 [Spark] Window 알아보기 ( lag, lead, sum ) (0) 2024.05.15