ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Logical Plan 알아보기 1 (Catalyst Optimizer)
    Spark 2024. 1. 28. 10:32
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 SparkSQL API 로 구성된 SQL 쿼리와 DataFrame 의 Transformation 들이 어떠한 방식으로 최적화되는지를 살펴보려고 합니다.

    SparkSQL API 로 구성한 일련의 코드들은 Action 에 의해서 실행이 될때에 아래와 같은 단계를 거쳐 최적화됩니다.

    출처 : https://www.databricks.com/kr/glossary/catalyst-optimizer

     

    이 과정에서 큰 역할을 수행하는 구성요소가 바로 Catalyst Optimizer 이구요.

    Unresolved Logical Plan 부터 Physical Plans 를 생성하는 과정에 관여합니다.

     

    이번 글에서는 Logical Plan 과 관련된 용어들과 최적화 과정에 대해서 이야기해보려고 합니다.

     

    JSON, Parquet 파일 생성하기.

    먼저 Data Source 로 사용할 JSON, Parquet 파일을 생성합니다.

    Python Pandas 로 구성된 코드이구요.

    id, name, age, gender, birth 로 구성된 사용자 데이터를 생성합니다.

     

    < requirements.txt >

    pandas==1.5.0
    pyarrow==14.0.2

     

    < users.json & users.parquet Generator >

    import pandas as pd
    import string, random, os
    from datetime import datetime, timedelta
    
    json_output_path = os.path.abspath(os.path.join("../users.json"))
    parquet_output_path = os.path.abspath(os.path.join("../users.parquet"))
    user_list = []
    
    for _ in range(0, 100):
        for index in range(0, 100):
            random_list = random.sample(string.ascii_letters, 10)
            id = index
            name = ''.join(random_list)
            age = random.randint(1, 100)
            gender = random.choice(["Male", "Female"])
            birth = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=random.randint(10, 100) * 365)
            birth_ymd = birth.strftime("%Y-%m-%d")
            user_list.append((id, name, age, gender, birth_ymd))
    
        df = pd.DataFrame(user_list, columns=["id", "name", "age", "gender", "birth"])
        json_file = open(json_output_path, mode="a")
        json_file.write(df.to_json(orient="records", lines=True))
        json_file.close()
        df.to_parquet(parquet_output_path, engine="pyarrow")
    
        print("finish iteration")
    print("finish totally")

     

     

    여담이지만 1만건의 레코드를 생성한 파일의 사이즈는 Json 파일이 37MB, Paruqet 파일이 159KB 이네요.

     37M Jan 28 09:40 users.json
    159K Jan 28 09:40 users.parquet

     

     

    Unresolved Logical Plan.

    Catalyst Optimizer 에 의한 첫번째 최적화의 결과로 Unresolved Logical Plan 이 생성됩니다.

    Unresolved Logical Plan 은 아직까지 Resolved 되지 않음을 뜻하는데요.

    Resolve 단계에 대해서 먼저 알아보도록 하겠습니다.

     

    Resolve.

    Resolve 라는 용어는 DNS 에서 사용합니다.

    DNS 서버를 통하여 Domain Name 으로부터 IP 주소를 획득하는 과정을 의미하는데요.

    예를 들어, DNS 서버를 통해서 "www.xxxx.com" 인 도메인의 IP 인 6.43.1.34 를 찾을 수 있습니다.

     

    이처럼 SparkSQL Optimization 과정에서 Resolve 는 DataFrame 의 쿼리와 실제 DataSource 를 연결하는 과정입니다.

    SparkSQL 쿼리에서 사용된 name, age, gender 이라는 Column 은 users.json 파일의 스키마와 일치합니다.

    SparkSQL 쿼리에서 사용된 users 라는 Table 또한 users.json 의 파일명과 일치하죠.

    이처럼 Resolve 과정을 통해서 SparkSQL 쿼리와 DataSource 인 파일을 연결지을 수 있습니다.

     

     

    Unresolved.

    Unresolved 단계는 실제 DataSource 를 고려하지 않고, SparkSQL 의 쿼리나 DataFrame 의 상태를 파싱하는 단계입니다.

    Syntax 에러는 없는지 체크하고, 사용된 Column 과 Table, 그리고 Filter 조건들을 파싱합니다.

    아래의 코드는 Unresolved Logical Plan 을 출력합니다.

     

    package test.spark.source
    
    import org.apache.spark.sql.SparkSession
    
    object TestLogicalPlan {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("json-test")
          .master("local[*]")
          .config("spark.driver.bindAddress", "localhost")
          .getOrCreate()
    
        val usersJsonFilePath = this.getClass.getResource("/files/users.json")
        val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
        val df = spark.read.format("json").schema(userSchema).json(usersJsonFilePath.getPath)
          .select("id", "name")
          .filter("gender = 'Male'")
    
        df.explain(extended=true)
        spark.close()
      }
    
    }
    == Parsed Logical Plan ==
    'Filter ('gender = Male)
    +- Project [id#0L, name#1]
       +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json

     

    Parsed Logical Plan 이라는 이름으로 Unresolved Logical Plan 이 출력되구요.

    Filter, Project, Relation 과 같은 요소들로 구성됩니다.

     

    Relation.

    Relation 은 DataSource 와 DataFrame 사이의 관계되는 Column 들을 의미합니다.

    아래의 경우처럼 Schema 가 id, name, age, gender, birth 로 정의될 때에 Relation 은 Schema 의 모든 Column 을 내포합니다.

     

    val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
    val df = spark.read.format("json").schema(userSchema).json(usersJsonFilePath.getPath)
    == Parsed Logical Plan ==
    Relation [id#0L,name#1,age#2,gender#3,birth#4] json

     

    반면, Schema 를 id 와 name 으로 제한하게 되면, Relation 역시 id 와 name 으로만 구성됩니다.

    val userSchema = "id LONG, name STRING"
    val df = spark.read.format("json").schema(userSchema).json(usersJsonFilePath.getPath)
    == Parsed Logical Plan ==
    Relation [id#0L,name#1] json

     

     

    Project.

    Project 는 Select 와 관련됩니다.

    Project 의 의미가 "투영하다" 인 뜻을 가지는데, DataFrame Reader 의 관점에서 Project 는 DataSource 의 특정 Column 들만 Extraction 하겠다는 의미입니다.

    그래서 Select Transformation 으로 지정한 Column 들이 곧 Projection 의 대상이 됩니다.

     

    예를 들어, select("id", "name") Transformation 으로 id 와 name 칼럼을 조회하는 코드를 작성해보겠습니다.

    이 경우에 아래와 같이 Relation 은 모든 Column 들을 지칭하지만, Project 는 id 와 name Column 만을 사용합니다.

     

    < Projection >

        val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
        val df = spark.read.format("json").schema(userSchema).json(usersJsonFilePath.getPath)
          .select("id", "name")
    == Parsed Logical Plan ==
    'Project ['id, 'name]
    +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json

     

    Filter.

    Filter Transformation 을 사용하게 되면 Unresolved Logical Plan 은 Filter 정보가 추가됩니다.

    val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
    val df = spark.read.format("json").schema(userSchema).load(usersFilePath.getPath)
      .select("id", "name")
      .filter("gender = 'Male'")
    == Parsed Logical Plan ==
    'Filter ('gender = Male)
    +- Project [id#0L, name#1]
       +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json

     

    LocalLimit & GlobalLimit.

    DataFrame 또는 SQL 에 Limit Transformation 을 적용하게 된다면, 아래와 같은 Logical Plan 을 확인하실 수 있습니다.

    df.limit(5) 가 적용된 Logical Plan 의 결과는 Global Limit 와 Local Limit 으로 나타나게 됩니다.

     

    val usersJsonFilePath = this.getClass.getResource("/files/users.json")
    val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
    
    val df = spark.read.format("json").schema(userSchema).load(usersJsonFilePath.getPath)
      .select("id", "name")
      .filter("gender = 'Male'")
      .limit(5)
    
    df.explain(extended=true)
    == Parsed Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- Project [id#0L, name#1]
          +- Filter (gender#3 = Male)
             +- Project [id#0L, name#1, gender#3]
                +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json

     

    LocalLimit 의 적용 범위는 하나의 DataFrame 으로 제한됩니다.

    만약 Union 또는 Join 과 같이 두개 이상의 DataFrame 이 합쳐지는 경우에는 각각의 DataFrame 에 LocalLimit 이 적용되고,

    최종적인 결과에 GlobalLimit 이 적용됩니다.

    그리고 이러한 현상을 Limit Pushdown 이라고 합니다.

    아래의 예시는 Union 와 Limit 이 적용된 코드 예시와 Logical Plan 입니다.

     

    val usersJsonFilePath = this.getClass.getResource("/files/users.json")
    val userSchema = "id LONG, name STRING, age INTEGER, gender STRING, birth DATE"
    
    val df = spark.read.format("json").schema(userSchema).load(usersJsonFilePath.getPath)
      .select("id", "name")
      .filter("gender = 'Male'")
    
    val df2 = spark.read.format("json").schema(userSchema).load(usersJsonFilePath.getPath)
      .select("id", "name")
      .filter("gender = 'Male'")
    
    val df3 = df.union(df2).limit(10)
    df3.explain(extended=true)
    df3.show()
    == Parsed Logical Plan ==
    GlobalLimit 10
    +- LocalLimit 10
       +- Union false, false
          :- Project [id#0L, name#1]
          :  +- Filter (gender#3 = Male)
          :     +- Project [id#0L, name#1, gender#3]
          :        +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json
          +- Project [id#12L, name#13]
             +- Filter (gender#15 = Male)
                +- Project [id#12L, name#13, gender#15]
                   +- Relation [id#12L,name#13,age#14,gender#15,birth#16] json
                   
                   
    == Optimized Logical Plan ==
    GlobalLimit 10
    +- LocalLimit 10
       +- Union false, false
          :- LocalLimit 10
          :  +- Project [id#0L, name#1]
          :     +- Filter (isnotnull(gender#3) AND (gender#3 = Male))
          :        +- Relation [id#0L,name#1,age#2,gender#3,birth#4] json
          +- LocalLimit 10
             +- Project [id#12L, name#13]
                +- Filter (isnotnull(gender#15) AND (gender#15 = Male))
                   +- Relation [id#12L,name#13,age#14,gender#15,birth#16] json

     

    위와 같이 LocalLimit 은 Limit Pushdonw 되어 Union 이 적용될 각각의 DataFrame 에 각각 적용됩니다.

     

     

    마치며.

    이어지는 글에서 SparkSQL 의 Logical Plan 에 대해서 추가적으로 알아보도록 하겠습니다.

    읽어주셔서 감사합니다.

     

    반응형
Designed by Tistory.