분류 전체보기
-
[Spark] Pivot 내부 동작 원리 알아보기Spark 2024. 8. 17. 08:24
- 목차 Pivot 은 GroupBy 연산과 함께 사용된다.Spark에서 pivot()은 단독으로 호출할 수 없습니다.즉, 아래와 같은 코드는 컴파일조차되지 않습니다.val df = spark.read.table("sales")df.pivot("product") // 오류 발생 pivot() 은 DataFrame 이 아닌 RelationalGroupedDataset 에서만 호출할 수 있으며,RelationalGroupedDataset 은 groupBy() 연산을 수행한 결과로 만들어지는 형태의 객체입니다. 이러한 OOP 구조적인 관점 뿐만 아니라 Pivot 의 의미적인 관점에서도 groupBy 와 함께 사용되는 것이 타당합니다. 아래의 DataFrame API 연산과 SQL 코드는 동일한 기능을 수행..
-
[Spark] SortMergeJoin 알아보기Spark 2024. 8. 17. 08:24
- 목차 SortMergeJoin 이란 ?SortMergeJoin 은 SparkSQL 의 여러가지 Join 방식 중의 하나입니다.가장 대표적인 Join 방식의 하나로써 이름처럼 Sort 과 Merge 가 주된 알고리즘으로 사용됩니다.우선 간단한 예시와 실행 계획 그리고 DAG Visualization 을 확인해보겠습니다. import spark.implicits._import org.apache.spark.sql.types._spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)case class User(id: Int, name: String)case class Flag(name: String, bool: Boolean)val df1 = Seq(User..
-
[Spark] DataFrameReader 와 Parquet 최적화 알아보기 (Column Pruning, Predicate Pushdown)Spark 2024. 8. 15. 10:07
- 목차 들어가며.Parquet 파일과 Spark 의 DataFrameReader 사이의 PredicatePushdown 에 대해서 알아봅니다. Parquet File 구조.웹상에 존재하는 여러가지 Parquet File 의 구조 이미지를 기반으로 설명을 진행합니다. Parquet File 은 위 이미지처럼 RowGroup, Column, Page 그리고 Footer 로 구성됩니다.이러한 레이아웃의 각 구조를 알아보겠습니다. Magic Number.Parquet 파일의 시작과 끝에는 Magic Number 라는 정보가 위치합니다.사실상 이는 Parquet 임을 의미하는 정보이고, 4 bytes 를 차지한다고 하네요.( Magic Number 는PAR1 으로 저장됨. ) RowGroup.Parquet 파..
-
[Spark] explode 함수와 Generate 연산자 알아보기Spark 2024. 8. 15. 10:07
- 목차 explode 함수란 ?Spark 의 explode 함수는 ArrayType 또는 MapType 의 타입의 칼럼의 값을 여러개의 Row 로 Flatten 하게 됩니다.즉, 1개의 Row 가 0개 또는 N개의 Row 들로 Flatten 될 수 있습니다.만약 ArrayType 의 칼럼의 값이 하나도 없다면, 변환되는 Row 의 갯수는 0개가 됩니다.그리고 3개의 값을 가지는 ArrayType 의 칼럼은 3개의 Row 들로 변환됩니다. 이를 직접 코드로 살펴보면 아래와 같습니다.import org.apache.spark.sql.types._import org.apache.spark.sql.Rowval rdd = spark.sparkContext.parallelize(Seq( Row("Andy", 4..
-
[NodeJS] Libuv ThreadPool 과 비동기 처리 관계 알아보기 ( UV_THREADPOOL_SIZE )Language/Nodejs 2024. 8. 13. 06:26
- 목차 Libuv Thread Pool 이란 ?Node.js 는 하나의 Thread 에서 동작하는 구조로 유명하지만 실제로는 libuv 의 Thread Pool 에 의해서 여러 쓰레드들이 함께 실행됩니다.특히 Node.js 에 내장된 비동기 함수들이 여러 Thread Pool 에서 멀티 쓰레드 구조로 실행됩니다.실제로 Node.js 프로세스를 실행하게 되면 아래와 같이 여러개의 Thread 가 확인됩니다. root@2153f4656997:/usr/src/app# ps -T -p 29 PID SPID TTY TIME CMD 29 29 pts/0 00:00:00 node 29 30 pts/0 00:00:00 node 29 31 pts/0 00:..
-
[Catalyst Optimizer] SimplifyCasts Rule 알아보기Spark/Catalyst Optimizer 2024. 8. 12. 05:50
- 목차 들어가며.SimplifyCasts Rule 은 이름 그대로 불필요한 Cast 연산을 제거하는 최적화 룰입니다.SimplifyCasts Rule 의 구현 내용은 아래와 같습니다. case Cast(e, dataType, _, _) if e.dataType == dataType같은 타입으로 캐스팅한다면 캐스팅 연산 제거case (ArrayType(from, false), ArrayType(to, true))Null 을 포함하지 않는 Array 를 Null 을 포함하는 Array 로 캐스팅한다면 캐스팅 연산 제거.case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))Null 을 포함하지 않는 Map 을 Null 을 포함하는 M..
-
[Spark] DataFrameWriter PartitionBy 알아보기Spark 2024. 8. 12. 05:50
- 목차 partitionBy 이란 ?Spark DataFrame API 에서 write 명령어를 사용하면 손쉽게 DataFrameWriter 객체를 생성할 수 있습니다.이름에서 알 수 있듯이 DataFrameWriter 는 데이터를 외부 데이터소스에 추가하거나 파일을 생성하는 작업을 수행합니다.여기서 partitionBy 라는 함수가 사용되면, 생성하고자하는 파일을 물리적인 레벨에서 디렉토리를 세분화할 수 있습니다. 예를 들어, DataFrame 이 country 라는 칼럼을 가지고 이 칼럼은 KR 또는 US 라는 값을 가진다고 가정합니다.그리고 DataFrame.partitionBy("country").parquet("/data/output") 와 같이 명령을 실행해게 되면, 아래와 같이 Parti..
-
[Catalyst Optimier] ConstantPropagation 알아보기Spark/Catalyst Optimizer 2024. 8. 10. 16:27
- 목차 들어가며.Catalyst Optimizer 의 ConstantPropagation 는 "where a = 1 and b = a" 와 같이 상수가 적용된 Filter Expression 을 최적화합니다. 예를 들어, "where a = 1 and b = a" 와 같은 조건은 ConstantPropagation Rule 의 최적화에 의해서 "where a = 1 and b = 1" 와같이 최적화되며, Constant 인 1이 다른 Filter 로 전파되게 됩니다. ConstantPropagation Rule 은 아래와 같은 구조를 취합니다. Attribute 와 Literal 로 구성된 Filter 를 대상으로 ConstantPropagation 이 적용됩니다. def apply(plan: ..
-
[Catalyst Optimizer] CombineFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42
- 목차 들어가며.CombineFilters 는 Catalyst Optimizer 에서 여러개의 Filter 들을 하나의 Filter 로 묶기 위한 최적화 Rule 입니다. CombineFilters 는 object 로 구현되어 있구요. org.apache.spark.sql.catalyst.optimizer.Optimizer.scala 파일에 정의됩니다. object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(FILTER), ruleId)(applyLocally) v..
-
[Catalyst Optimizer] PruneFilters 알아보기Spark/Catalyst Optimizer 2024. 8. 9. 22:42
- 목차 들어가며.Catalyst Optimizer 는 Rule Based Optimization 을 수행합니다.그 중 한가지 방식인 PruneFilters 는 실행이 무의미한 Filter Expression 들을 제거하는 역할을 합니다.이어지는 글에서 다양한 PruneFilters Rule 의 적용 예시를 살펴보도록 하겠습니다. PruneFilters Object 살펴보기.org.apache.spark.sql.catalyst.optimizer 패키지의 Optimizer.scala 파일 내부에 PruneFilters 클래스가 존재합니다.내용은 아래와 같습니다. filter(true) 와 같은 연산은 제거합니다. 그래서 child 에 해당하는 LocalRelation 이나 Join, Subquery 등이 ..