ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Unresolved Logical Plan 알아보기
    Spark 2023. 12. 20. 07:31
    728x90
    반응형

    - 목차

     

    들어가며.

    Spark 는 RDD 가 아닌 DataFrame 또는 SQL 를 활용하여 연산을 수행할 때에 자동적으로 Query Optimization 이 수행됩니다.

    흔히 Predicate Pushdown, Column Pruning, Constant Folding 등의 유명한 최적화 방식들이 있는데요.

    관련한 최적화 방식들은 다른 글에서 설명할 수 있도록 해보겠습니다.

    돌아와서 Spark 는 Query Optimization 이 적용되며, 이를 책임지는 Spark 의 내부 요소를 Catalyst Optimizer 라고 부릅니다.

    즉, Catalyst Optimizer 에 의해서 SQL Query 또는 DataFrame 연산은 최적화됩니다.

    아래 이미지는 Catalyst Optimizer 에 의해서 수행되는 최적화 과정입니다.

    출처 : https://www.databricks.com/glossary/catalyst-optimizer

     

    이미지에서 알 수 있듯이, 최적화 과정이 매우 길고 복잡하죠 ?

    이번 글에서는 Unresolved Logical Plan 에 대해서 깊이 있는 내용을 다루어보려고 합니다.

     

    Unresolved 란 ?

    Unresolved 란 프로그래밍적인 관점에서 적절하게 등록이 되지 않은 상태를 의미합니다.

    먼저 Resolve, Unresolve 의 의미 또는 뉘앙스가 와닿아야한다고 생각하기 때문에 프로그래밍 영역에서 사용되는 여러 사례들을 먼저 설명드리겠습니다.

     

    DNS.

    Domain Name Service 의 약자인 DNS 에서 Resolve, Unresolve 라는 용어가 사용됩니다.

    www.tistory.com 이라는 도메인이 어떤 IP 를 가지고 있는지 DNS 를 통해서 확인할 수 있습니다.

    이때, tistory 가 DNS 에 등록되어 있어서 IP 를 제공받을 수 있다면 이는 Resolved 상태입니다.

    반면 그렇지 않은 경우는 Unresolved 가 되겠죠 ?

     

    Application 과 Dependency.

    Spring 이나 FastAPI 로 Web API Server Application 구축하던지,

    Spark 나 Flink Framework 를 통해서 Data Processing Application 을 만들던지 이러한 과정에서 외부 Dependency 가 사용됩니

    다.

    이러한 개발 과정에서 흔히 겪는 이슈 중의 하나가 존재하지 않는 라이브러리 또는 모듈에 의해서 Dependency 가 Injected 되지 않는 경험을 할 수 있습니다.

    이때에도 Resolved, Unresolved 의 용어가 사용됩니다.

     

    돌아와서 이처럼 Spark 또한 Resolved, Unresolved 라는 표현을 사용합니다.

    SparkSession 을 통해서 하나의 DataFrame 이 생성되었고,

    이 DataFrame 은 name, age, height 라는 세개의 칼럼이 존재한다고 가정하겠습니다.

    아래의 pyspark 예시와 같이 name, age, height 를 조회하는 경우에는 실제로 존재하는 칼럼을 조회하고 있습니다.

    이는 In-memory 나 On-disk 영역에 실제로 생성된 DataFrame 과 그 Column 을 조회하는 행위이기 때문에 매우 정상적입니다.

    반면 id, weight, region 과 같이 존재하지 않는 칼럼을 조회하게 된다면 이는 에러를 유발할 수 있습니다.

    data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ]
    schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), False),
        StructField("height", IntegerType(), False),
    ])
    
    df = spark.createDataFrame(data=data, schema=schema)
    
    // Resolved
    df = df.select(["name", "age", "height"])
    
    // Unresolved
    df = df.select(["id", "weight", "region"])

     

     

    Unresolved Logical Plan.

    Unresolved Logical Plan 은 Catalyst Optimizer 에 의해서 생성되는 첫번째 Plan 입니다.

    이 과정을 Parsing 단계라고 합니다.

    단순히 DataFrame 의 연산이나 SQL Query 를 파싱하여 Plan 을 생성합니다.

    그리고 이 상태를 Unresolved 하다고 표현합니다.

    그 이유는 아직까지 Resolving 과정이 적용되지 않았고, Resolving 은 다음 단계에서 진행됩니다.

    그래서 Unresolved Logical Plan 상태에서는

    - Plan 에서 조회하는 Column 들이 실제 존재하는 칼럼인지

    - Plan 에서 조회하는 Table 이 실제 존재하는 테이블인지

    - Plan 에서 사용하는 Function 들이 실제 존재하는 함수인지 알 수 없습니다.

    즉, 미지의 요소들이 많은 불안정한 상태라고 여길 수 있습니다.

     

    아래의 예시는 Unresolved Logical Plan 에서 발생할 수 있는 AnalysisException 의 예시들을 작성하였습니다.

    발생 가능한 케이스들은 Column, Table, Function 이 존재하지 않는 경우입니다.

    from pyspark import SparkConf
    from pyspark.errors import AnalysisException
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
    from pyspark.sql.functions import udf, lit
    
    conf = SparkConf().setAppName("test-unresolved-plan").setMaster("local[*]")
    conf = conf.set("spark.driver.bindAddress", "localhost")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ]
    schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), False),
        StructField("height", IntegerType(), False),
    ])
    
    df = spark.createDataFrame(data=data, schema=schema)
    
    try:
        # 존재하지 않는 칼럼 조회
        df = df.select(["name", "non-exist-column"]).filter(df['height'] > 100)
    except AnalysisException as e:
        print(e)
    
    try:
        # 존재하지 않는 테이블 조회
        df.createOrReplaceTempView('users')
        df = spark.sql("select name, age, height from non_exist_table")
    except AnalysisException as e:
        print(e)
    
    try:
    
        # 존재하지 않는 함수 조회
        def filter_low_number(num, criterion):
            return num < criterion
        spark.udf.register("filter_numeric", filter_low_number, BooleanType())
        df.createOrReplaceTempView('users')
        df = spark.sql("select name, age, height from users where non_exist_function(height, 160)")
    except AnalysisException as e:
        print(e)
    
    spark.stop()

     

     

    explain 를 통해서 Plan 확인하기.

    DataFrame 의 explain 함수를 통해서 Unresolved Logical Plan 을 확인할 수 있습니다.

    Logical Plan 을 확인하는 방법은 아래와 같습니다.

     

    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
    
    conf = SparkConf().setAppName("test-unresolved-plan").setMaster("local[*]")
    conf = conf.set("spark.driver.bindAddress", "localhost")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ]
    schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), False),
        StructField("height", IntegerType(), False),
    ])
    
    df = spark.createDataFrame(data=data, schema=schema)
    
    df = df.select(["name", "height"]).filter(df['height'] > 100)
    df.createOrReplaceTempView('users')
    df = spark.sql("select name, height from users")
    def filter_low_number(num, criterion):
        return num < criterion
    spark.udf.register("filter_numeric", filter_low_number, BooleanType())
    df.createOrReplaceTempView('users')
    df = spark.sql("select name, height from users where filter_numeric(height, 160)")
    
    df.explain(extended=True)
    spark.stop()

     

    아래의 출력 결과는 explain 함수에 의해서 출력되는 Plan 들입니다.

    explain 함수가 extended=True 인자와 함께 호출되면, 아래와 같이 모든 Logical, Physical Plan 들이 출력되게 됩니다.

    Parsed Logical Plan 이라고 적힌 영역의 출력 내용이 Unresolved Logical Plan 에 해당합니다.

    Unresolved Logical Plan 은 Parsing 단계에서 생성되는 실행 계획이기 때문에 Parsed Logical Plan 이라고도 불립니다.

     

    == Parsed Logical Plan ==
    'Project ['name, 'height]
    +- 'Filter 'filter_numeric('height, 160)
       +- 'UnresolvedRelation [users], [], false
    
    == Analyzed Logical Plan ==
    name: string, height: int
    Project [name#0, height#2]
    +- Filter filter_numeric(height#2, 160)#10
       +- SubqueryAlias users
          +- View (`users`, [name#0,height#2])
             +- Project [name#0, height#2]
                +- SubqueryAlias users
                   +- View (`users`, [name#0,height#2])
                      +- Filter (height#2 > 100)
                         +- Project [name#0, height#2]
                            +- LogicalRDD [name#0, age#1, height#2], false
    
    == Optimized Logical Plan ==
    Project [name#0, height#2]
    +- Filter pythonUDF0#13: boolean
       +- BatchEvalPython [filter_numeric(height#2, 160)#10], [pythonUDF0#13]
          +- Project [name#0, height#2]
             +- Filter (height#2 > 100)
                +- LogicalRDD [name#0, age#1, height#2], false
    
    == Physical Plan ==
    *(2) Project [name#0, height#2]
    +- *(2) Filter pythonUDF0#13: boolean
       +- BatchEvalPython [filter_numeric(height#2, 160)#10], [pythonUDF0#13]
          +- *(1) Project [name#0, height#2]
             +- *(1) Filter (height#2 > 100)
                +- *(1) Scan ExistingRDD[name#0,age#1,height#2]

     

     

    반응형

    'Spark' 카테고리의 다른 글

    [Spark] Spark lit 알아보기  (0) 2023.12.24
    Spark Stage 알아보기  (0) 2023.12.23
    Spark RDD Lineage 알아보기  (0) 2023.12.16
    Spark RDD Storage 알아보기 (Persist, Cache)  (0) 2023.12.15
    [Spark] parallelize 알아보기  (0) 2023.12.15
Designed by Tistory.