전체 글
-
[Catalyst Optimizer] SimplifyCasts Rule 알아보기Spark/Catalyst Optimizer 2024. 8. 12. 05:50
- 목차 들어가며.SimplifyCasts Rule 은 이름 그대로 불필요한 Cast 연산을 제거하는 최적화 룰입니다.SimplifyCasts Rule 의 구현 내용은 아래와 같습니다. case Cast(e, dataType, _, _) if e.dataType == dataType같은 타입으로 캐스팅한다면 캐스팅 연산 제거case (ArrayType(from, false), ArrayType(to, true))Null 을 포함하지 않는 Array 를 Null 을 포함하는 Array 로 캐스팅한다면 캐스팅 연산 제거.case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))Null 을 포함하지 않는 Map 을 Null 을 포함하는 M..
-
[Spark] DataFrameWriter PartitionBy 알아보기Spark 2024. 8. 12. 05:50
- 목차 partitionBy 이란 ?Spark DataFrame API 에서 write 명령어를 사용하면 손쉽게 DataFrameWriter 객체를 생성할 수 있습니다.이름에서 알 수 있듯이 DataFrameWriter 는 데이터를 외부 데이터소스에 추가하거나 파일을 생성하는 작업을 수행합니다.여기서 partitionBy 라는 함수가 사용되면, 생성하고자하는 파일을 물리적인 레벨에서 디렉토리를 세분화할 수 있습니다. 예를 들어, DataFrame 이 country 라는 칼럼을 가지고 이 칼럼은 KR 또는 US 라는 값을 가진다고 가정합니다.그리고 DataFrame.partitionBy("country").parquet("/data/output") 와 같이 명령을 실행해게 되면, 아래와 같이 Parti..
-
[Catalyst Optimier] ConstantPropagation 알아보기Spark/Catalyst Optimizer 2024. 8. 10. 16:27
- 목차 들어가며.Catalyst Optimizer 의 ConstantPropagation 는 "where a = 1 and b = a" 와 같이 상수가 적용된 Filter Expression 을 최적화합니다. 예를 들어, "where a = 1 and b = a" 와 같은 조건은 ConstantPropagation Rule 의 최적화에 의해서 "where a = 1 and b = 1" 와같이 최적화되며, Constant 인 1이 다른 Filter 로 전파되게 됩니다. ConstantPropagation Rule 은 아래와 같은 구조를 취합니다. Attribute 와 Literal 로 구성된 Filter 를 대상으로 ConstantPropagation 이 적용됩니다. def apply(plan: ..
-
[Catalyst Optimizer] CombineFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42
- 목차 들어가며.CombineFilters 는 Catalyst Optimizer 에서 여러개의 Filter 들을 하나의 Filter 로 묶기 위한 최적화 Rule 입니다. CombineFilters 는 object 로 구현되어 있구요. org.apache.spark.sql.catalyst.optimizer.Optimizer.scala 파일에 정의됩니다. object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(FILTER), ruleId)(applyLocally) v..
-
[Catalyst Optimizer] PruneFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42
- 목차 들어가며.Catalyst Optimizer 는 Rule Based Optimization 을 수행합니다.그 중 한가지 방식인 PruneFilters 는 실행이 무의미한 Filter Expression 들을 제거하는 역할을 합니다.이어지는 글에서 다양한 PruneFilters Rule 의 적용 예시를 살펴보도록 하겠습니다. PruneFilters Object 살펴보기.org.apache.spark.sql.catalyst.optimizer 패키지의 Optimizer.scala 파일 내부에 PruneFilters 클래스가 존재합니다.내용은 아래와 같습니다. filter(true) 와 같은 연산은 제거합니다. 그래서 child 에 해당하는 LocalRelation 이나 Join, Subquery 등이 ..
-
[Spark] RDD Cogroup 연산 알아보기Spark 2024. 7. 29. 05:38
- 목차 Cogroup 이란 ?Cogroup 연산은 여러 RDD 를 key 기준으로 그룹핑하여, 동일한 key에 매핑된 value 들을 각각의 RDD에서 모아오는 연산입니다.Cogroup 연산의 Input 과 Output 은 아래와 같습니다.// 2개의 InputRDD1: RDD[(K, V1)]RDD2: RDD[(K, V2)]// OutputRDD[(K, (Iterable[V1], Iterable[V2]))] 그리하여 아래와 같은 연산은 동일한 Key 를 가지는 각 Input RDD 의 Value 들이 Iterable 형식으로 표현됩니다.val rdd1 = sc.parallelize(Seq( ("a", 1), ("b", 2), ("a", 3)))val rdd2 = sc.parallelize(Seq( ..
-
[Spark] RangePartitioner 알아보기Spark 2024. 7. 29. 05:38
- 목차 Partitioner 란 ? Spark 는 Wide Dependency Transformation 을 경계로 Stage 가 분리됩니다. GroupByKey, Join, PartitionBy, CoGroup 등과 같은 연산은 데이터의 셔플링을 유발하며, 이를 경계로 Stage 가 분리되게 됩니다. 이때 이전 Stage 와 후속 Stage 로 나뉘며, 아래와 같은 모습의 DAG 시각화가 이루어집니다. 위 과정에서 첫번째 Stage 를 Map-Side Task 그리고 두번째 Stage 는 Reduce-Side Task 또는 Mapper Task, Reduer Task 라고 부르기도 합니다. Partitioner 는 Mapper Task 에서 데이터들을 어떻게 새로운 Partition 으로 분류할..
-
[Spark] RDD combineByKey 알아보기Spark 2024. 7. 29. 05:38
- 목차 들어가며.Spark RDD 연산에서 대부분의 셔플링 연산은 combineByKey 로 변형됩니다.예를 들어서 reduceByKey, aggregateByKey, countByValue, join 연산 등은 내부적으로 combineByKey 를 활용하게 됩니다.즉, Wide Transformation 을 유발하는 셔플링 연산은 내부적으로 반드시 combineByKey 를 활용합니다.이번 글에서는 combineByKey 의 기본적인 사용법과 다른 Wide Transformation 연산과의 관계에 대해서 알아봅니다. combineByKey 사용하기.아래의 예시는 1 부터 1000 까지의 숫자들 중에서 홀수와 짝수의 총 갯수를 파악하는 간단한 예시 코드입니다.그리고 combineByKey 를 사용하여..
-
[Spark] MapOutputTracker 알아보기Spark 2024. 7. 29. 05:37
- 목차 MapOutputTracker 란 ?스파크는 하둡의 MapReduce 처럼 Map 과 Reduce 라는 표현을 사용합니다.셔플링이 발생하는 Wide Transformation 을 경계로 이전 Stage 를 Map Task, 후속 Stage 에 Reduce Task 를 배치합니다.그리고 아래의 이미지처럼 Map Task 는 Reduce Task 에게 Shuffle Block 이라는 데이터들을 전달하게 됩니다. Shuffle Block 은 Map Task 가 처리한 데이터의 결과물이며, BlockManager 에 의해서 관리됩니다.MapOutputTracker 는 셔플링 과정에서 Map Task 가 생성한 결과물을 추적/관리하는 스파크의 구성요소이며,이는 Driver 와 Executor 에 각각 ..
-
[Spark] Block 알아보기Spark 2024. 7. 26. 17:21
- 목차 RDD Cache Block.먼저 알아볼 Block 의 종류는 RDD Cache Block 입니다.RDD 의 cache 나 persist 함수를 통해서 RDD 를 캐싱할 수 있게 됩니다. import org.apache.spark.storage.StorageLevelval baseRdd = sc.parallelize(1 to 10, 4).map { x => x * 2}val cachedRdd = baseRdd.persist(StorageLevel.DISK_ONLY)val sum1 = cachedRdd.sum()println(s"First sum: $sum1") Disk 에 저장된 Cache Block.캐싱된 RDD 는 Block 이라는 파일 형태로 Disk 에 저장됩니다.저장되는 File P..