Spark
-
[Spark] RDD GroupBy 와 Shuffle 알아보기Spark 2024. 7. 9. 06:23
- 목차 GroupBy 와 Wide Transformation.Spark 연산은 크게 Transformation 과 Action 으로 나뉩니다.Transformation 은 Filter, Map, FlatMap 처럼 데이터를 직접적으로 변형하는 연산을 의미하며,Action 은 Collect, Count, Reduce 와 같이 연산을 마무리짓고, 결과를 Driver 에게 응답하는 동작을 의미합니다.더 나아가서 Transformation 은 Narrow Transformation 과 Wide Transformation 으로 분류되는데요.Group By 는 대표적인 Wide Transformation 의 한 종류입니다. Group By 의 연산의 동작을 시각적으로 표현하면 아래와 같습니다.Group By 는..
-
[Spark] RDD treeReduce 알아보기Spark 2024. 7. 9. 06:23
- 목차 일반적인 Reduce 연산.일반적인 Reduce 연산은 리스트를 스칼라 값으로 변환합니다. 대표적으로 sum, count, min, max 등의 Aggregation 연산들은 리스트를 하나의 스칼라 값을 출력하게 됩니다. Spark 의 RDD Reduce 연산 또한 하나의 리스트를 분산시켜 Aggregation 연산을 수행한다는 점에서 이하동문입니다. 하나의 리스트를 분할하여 Task 또는 Partition 을 만든 후 여러 Executor 로 전달합니다. 각 Executor 는 Driver 로부터 전달받은 Task 를 처리합니다. 처리된 Task 는 Driver 로 반환됩니다. 이때 Task 의 갯수가 N 개라면 Driver 로 전달되는 결과는 N 개의 결과값을 받게 됩니다. 그리고 Driv..
-
[Spark] RDD Reduce 내부 동작 알아보기Spark 2024. 7. 9. 06:23
- 목차 Reduce Action 은 Local Aggregation 을 수행한다.reduce 는 RDD 의 대표적은 Action 연산 중 하나입니다.Reduce Action 이 실행되면서 Driver 는 Task 들을 생성하여 Worker Node 의 Executor 에게 전달합니다.아래의 스크립트는 0 ~ 9999 에 해당하는 숫자들 중에서 홀수와 짝수의 갯수를 Reduction 하는 Spark 스크립트입니다. from pyspark.sql import SparkSessionif __name__ == '__main__': spark = (SparkSession.builder .master("spark://127.0.0.1:7077") .appName("..
-
[Spark] RegisterApplication RPC 알아보기Spark 2024. 7. 6. 07:05
- 목차 RegisterApplication RPC 란 ?RegisterApplication 이란 Driver 가 Master Node 나 Resource Manager 에게 어플리케이션을 등록하기 위한 RPC 요청입니다.Spark Driver가 실행되면, Spark Master 와 통신하여 "새로운 Application을 등록하고 싶다" 라는 메시지를 보냅니다.즉, Driver 가 곧 Application 과 일대일로 매칭되는 단위라고 생각하셔도 됩니다. RegisterApplication RPC 요청은 아래의 두가지 데이터를 Master 에게 전달합니다.ApplicationDescription 은 Executor 의 스펙을 정의합니다.몇 개의 Executor 를 사용하며, 각 Executor 의 C..
-
[Spark] Spark Cluster 를 구축하기 ( Docker )Spark 2024. 5. 17. 22:31
- 목차 들어가며.이번 글에서는 Spark Cluster 를 구현하는 방법에 대해서 알아보려고 합니다.기본적으로 Docker Container 를 활용하여 Spark Cluster 를 구현하며,Spark Cluster 를 실행하기 위한 Dockerfile 과 Docker Compose 등의 상세한 내용들을 다룰 예정입니다. Spark Cluster 구축하기.Dockerfile 구성하기.가능한 Docker Hub 에 존재하는 Spark Docker Image 를 활용하려고 검색해보았지만, 개인적으로 마음에 드는 이미지가 존재하지 않아서 아래와 같이 ubuntu Base Image 로부터 Docker Image 를 생성합니다.아래 Dockerfile 의 내용은 Ubuntu Base Image 와 Hadoo..
-
[Spark] Window 알아보기 ( lag, lead, sum )Spark 2024. 5. 15. 07:51
- 목차 들어가며.Spark 의 Window 기능을 통해서 부분적인 Aggregation 연산을 수행할 수 있습니다.일반적인 Aggregation 연산은 전체 데이터셋을 대상으로 수행됩니다.일반적으로 사용하는 sum, avg, var, min/max 등은 모든 데이터들을 대상으로 합계, 평균, 분산, 최소/최대값을 계산합니다.그래서 N 개의 데이터를 대상으로 N 개보다 작은 수의 결과가 출력됩니다.예를 들어, 아래의 연산처럼 100 개의 데이터를 1개의 결과로 출력됩니다.spark = SparkSession.builder.config(conf=conf).getOrCreate()df = spark.range(100)print(f"""sum : {df.select(F.sum(F.col("id"))).coll..
-
[Spark] Spark 로 Web File Reader 구현하기 ( SparkFiles )Spark 2024. 3. 18. 07:04
- 목차 들어가며.Spark 를 활용하여 http 프로토콜로 호스팅되는 웹 파일을 다운로드해야하는 경우가 존재합니다.일반적인 Spark 의 File Reader 를 통해서 웹파일을 다운로드하는 것을 불가능합니다.이 과정에서 SparkFiles 모듈을 사용하며, 이는 여러 이점이 존재합니다.첫번째 이점은 Spark Application 의 메모리보다 큰 용량의 파일을 손쉽게 읽어들일 수 있습니다.Http Streaming 방식으로 웹파일을 읽어들이는 수고로움을 덜 수 있습니다.두번째 이점은 기존의 Spark 프로그래밍 패턴을 사용할 수 있습니다.외부의 모듈을 활용하게 되면, 외부 모듈의 사용법에 따라 프로그래밍을 수행해야합니다.하지만 SparkFiles 는 Spark 의 내장 모듈로써 Spark 의 프로그..
-
[Spark] Row 알아보기Spark 2024. 3. 3. 12:33
- 목차 들어가며. 이번 글에서는 Spark 의 Row 에 대해서 알아보려고 합니다. Row 는 DataFrame 을 구성하는 개별적인 데이터 레코드를 의미합니다. 그래서 Row 들의 모음이 곧 DataFrame 이라고 볼 수 있죠. DataFrame 을 생성하는 형태는 아래와 같습니다. SparkSession 의 createDataFrame 함수를 사용하여 DataFrame 을 만들게 되는데요. 이때에 필수적인 Arguments 가 Row 와 Schema 입니다. # row 생성. rows = [Row(name="Andy", age=32), Row(name="Bob", age=43)] # schema 생성. schema = StructType([ StructField("name", StringType(..
-
[Spark] approxCountDistinct 알아보기Spark 2024. 2. 22. 20:39
- 목차 들어가며.이번 글에서는 SparkSQL 의 approxCountDistinct 함수에 대해 알아보려고 합니다.approxCountDistinct 은 특정 칼럼의 값들이 몇개의 범주로 구성되어있는지를 파악하는 함수입니다.즉, Categorical Data 의 종류 갯수를 파악합니다.approxCountDistinct 는 이름에서 유추할 수 있듯이, 대략적인 결과를 획득하기 위한 함수인데요.큰 규모의 데이터셋에 대해서 약간의 오차를 허용하고 결과 획득 시간을 단축하기 위한 목적을 가집니다. 사용할 데이터.사용할 데이터는 Kaggle 의 Bank Marketing 데이터를 사용합니다.아래 링크에서 다운로드받으실 수 있습니다.https://www.kaggle.com/datasets/henriqueyam..
-
[Spark] Logical Plan 알아보기 1 (Catalyst Optimizer)Spark 2024. 1. 28. 10:32
- 목차 들어가며. 이번 글에서는 SparkSQL API 로 구성된 SQL 쿼리와 DataFrame 의 Transformation 들이 어떠한 방식으로 최적화되는지를 살펴보려고 합니다. SparkSQL API 로 구성한 일련의 코드들은 Action 에 의해서 실행이 될때에 아래와 같은 단계를 거쳐 최적화됩니다. 이 과정에서 큰 역할을 수행하는 구성요소가 바로 Catalyst Optimizer 이구요. Unresolved Logical Plan 부터 Physical Plans 를 생성하는 과정에 관여합니다. 이번 글에서는 Logical Plan 과 관련된 용어들과 최적화 과정에 대해서 이야기해보려고 합니다. JSON, Parquet 파일 생성하기. 먼저 Data Source 로 사용할 JSON, Parqu..