-
[Spark] Unresolved Logical Plan 알아보기Spark 2023. 12. 20. 07:31728x90반응형
- 목차
들어가며.
Spark 는 RDD 가 아닌 DataFrame 또는 SQL 를 활용하여 연산을 수행할 때에 자동적으로 Query Optimization 이 수행됩니다.
흔히 Predicate Pushdown, Column Pruning, Constant Folding 등의 유명한 최적화 방식들이 있는데요.
관련한 최적화 방식들은 다른 글에서 설명할 수 있도록 해보겠습니다.
돌아와서 Spark 는 Query Optimization 이 적용되며, 이를 책임지는 Spark 의 내부 요소를 Catalyst Optimizer 라고 부릅니다.
즉, Catalyst Optimizer 에 의해서 SQL Query 또는 DataFrame 연산은 최적화됩니다.
아래 이미지는 Catalyst Optimizer 에 의해서 수행되는 최적화 과정입니다.
이미지에서 알 수 있듯이, 최적화 과정이 매우 길고 복잡하죠 ?
이번 글에서는 Unresolved Logical Plan 에 대해서 깊이 있는 내용을 다루어보려고 합니다.
Unresolved 란 ?
Unresolved 란 프로그래밍적인 관점에서 적절하게 등록이 되지 않은 상태를 의미합니다.
먼저 Resolve, Unresolve 의 의미 또는 뉘앙스가 와닿아야한다고 생각하기 때문에 프로그래밍 영역에서 사용되는 여러 사례들을 먼저 설명드리겠습니다.
DNS.
Domain Name Service 의 약자인 DNS 에서 Resolve, Unresolve 라는 용어가 사용됩니다.
www.tistory.com 이라는 도메인이 어떤 IP 를 가지고 있는지 DNS 를 통해서 확인할 수 있습니다.
이때, tistory 가 DNS 에 등록되어 있어서 IP 를 제공받을 수 있다면 이는 Resolved 상태입니다.
반면 그렇지 않은 경우는 Unresolved 가 되겠죠 ?
Application 과 Dependency.
Spring 이나 FastAPI 로 Web API Server Application 구축하던지,
Spark 나 Flink Framework 를 통해서 Data Processing Application 을 만들던지 이러한 과정에서 외부 Dependency 가 사용됩니
다.
이러한 개발 과정에서 흔히 겪는 이슈 중의 하나가 존재하지 않는 라이브러리 또는 모듈에 의해서 Dependency 가 Injected 되지 않는 경험을 할 수 있습니다.
이때에도 Resolved, Unresolved 의 용어가 사용됩니다.
돌아와서 이처럼 Spark 또한 Resolved, Unresolved 라는 표현을 사용합니다.
SparkSession 을 통해서 하나의 DataFrame 이 생성되었고,
이 DataFrame 은 name, age, height 라는 세개의 칼럼이 존재한다고 가정하겠습니다.
아래의 pyspark 예시와 같이 name, age, height 를 조회하는 경우에는 실제로 존재하는 칼럼을 조회하고 있습니다.
이는 In-memory 나 On-disk 영역에 실제로 생성된 DataFrame 과 그 Column 을 조회하는 행위이기 때문에 매우 정상적입니다.
반면 id, weight, region 과 같이 존재하지 않는 칼럼을 조회하게 된다면 이는 에러를 유발할 수 있습니다.
data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ] schema = StructType([ StructField("name", StringType(), False), StructField("age", IntegerType(), False), StructField("height", IntegerType(), False), ]) df = spark.createDataFrame(data=data, schema=schema) // Resolved df = df.select(["name", "age", "height"]) // Unresolved df = df.select(["id", "weight", "region"])
Unresolved Logical Plan.
Unresolved Logical Plan 은 Catalyst Optimizer 에 의해서 생성되는 첫번째 Plan 입니다.
이 과정을 Parsing 단계라고 합니다.
단순히 DataFrame 의 연산이나 SQL Query 를 파싱하여 Plan 을 생성합니다.
그리고 이 상태를 Unresolved 하다고 표현합니다.
그 이유는 아직까지 Resolving 과정이 적용되지 않았고, Resolving 은 다음 단계에서 진행됩니다.
그래서 Unresolved Logical Plan 상태에서는
- Plan 에서 조회하는 Column 들이 실제 존재하는 칼럼인지
- Plan 에서 조회하는 Table 이 실제 존재하는 테이블인지
- Plan 에서 사용하는 Function 들이 실제 존재하는 함수인지 알 수 없습니다.
즉, 미지의 요소들이 많은 불안정한 상태라고 여길 수 있습니다.
아래의 예시는 Unresolved Logical Plan 에서 발생할 수 있는 AnalysisException 의 예시들을 작성하였습니다.
발생 가능한 케이스들은 Column, Table, Function 이 존재하지 않는 경우입니다.
from pyspark import SparkConf from pyspark.errors import AnalysisException from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType from pyspark.sql.functions import udf, lit conf = SparkConf().setAppName("test-unresolved-plan").setMaster("local[*]") conf = conf.set("spark.driver.bindAddress", "localhost") spark = SparkSession.builder.config(conf=conf).getOrCreate() data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ] schema = StructType([ StructField("name", StringType(), False), StructField("age", IntegerType(), False), StructField("height", IntegerType(), False), ]) df = spark.createDataFrame(data=data, schema=schema) try: # 존재하지 않는 칼럼 조회 df = df.select(["name", "non-exist-column"]).filter(df['height'] > 100) except AnalysisException as e: print(e) try: # 존재하지 않는 테이블 조회 df.createOrReplaceTempView('users') df = spark.sql("select name, age, height from non_exist_table") except AnalysisException as e: print(e) try: # 존재하지 않는 함수 조회 def filter_low_number(num, criterion): return num < criterion spark.udf.register("filter_numeric", filter_low_number, BooleanType()) df.createOrReplaceTempView('users') df = spark.sql("select name, age, height from users where non_exist_function(height, 160)") except AnalysisException as e: print(e) spark.stop()
explain 를 통해서 Plan 확인하기.
DataFrame 의 explain 함수를 통해서 Unresolved Logical Plan 을 확인할 수 있습니다.
Logical Plan 을 확인하는 방법은 아래와 같습니다.
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType conf = SparkConf().setAppName("test-unresolved-plan").setMaster("local[*]") conf = conf.set("spark.driver.bindAddress", "localhost") spark = SparkSession.builder.config(conf=conf).getOrCreate() data = [('Andy', 35, 184), ('Bruce', 25, 181), ('Chris', 45, 178), ('Daniel', 15, 154), ] schema = StructType([ StructField("name", StringType(), False), StructField("age", IntegerType(), False), StructField("height", IntegerType(), False), ]) df = spark.createDataFrame(data=data, schema=schema) df = df.select(["name", "height"]).filter(df['height'] > 100) df.createOrReplaceTempView('users') df = spark.sql("select name, height from users") def filter_low_number(num, criterion): return num < criterion spark.udf.register("filter_numeric", filter_low_number, BooleanType()) df.createOrReplaceTempView('users') df = spark.sql("select name, height from users where filter_numeric(height, 160)") df.explain(extended=True) spark.stop()
아래의 출력 결과는 explain 함수에 의해서 출력되는 Plan 들입니다.
explain 함수가 extended=True 인자와 함께 호출되면, 아래와 같이 모든 Logical, Physical Plan 들이 출력되게 됩니다.
Parsed Logical Plan 이라고 적힌 영역의 출력 내용이 Unresolved Logical Plan 에 해당합니다.
Unresolved Logical Plan 은 Parsing 단계에서 생성되는 실행 계획이기 때문에 Parsed Logical Plan 이라고도 불립니다.
== Parsed Logical Plan == 'Project ['name, 'height] +- 'Filter 'filter_numeric('height, 160) +- 'UnresolvedRelation [users], [], false == Analyzed Logical Plan == name: string, height: int Project [name#0, height#2] +- Filter filter_numeric(height#2, 160)#10 +- SubqueryAlias users +- View (`users`, [name#0,height#2]) +- Project [name#0, height#2] +- SubqueryAlias users +- View (`users`, [name#0,height#2]) +- Filter (height#2 > 100) +- Project [name#0, height#2] +- LogicalRDD [name#0, age#1, height#2], false == Optimized Logical Plan == Project [name#0, height#2] +- Filter pythonUDF0#13: boolean +- BatchEvalPython [filter_numeric(height#2, 160)#10], [pythonUDF0#13] +- Project [name#0, height#2] +- Filter (height#2 > 100) +- LogicalRDD [name#0, age#1, height#2], false == Physical Plan == *(2) Project [name#0, height#2] +- *(2) Filter pythonUDF0#13: boolean +- BatchEvalPython [filter_numeric(height#2, 160)#10], [pythonUDF0#13] +- *(1) Project [name#0, height#2] +- *(1) Filter (height#2 > 100) +- *(1) Scan ExistingRDD[name#0,age#1,height#2]
반응형'Spark' 카테고리의 다른 글
[Spark] Spark lit 알아보기 (0) 2023.12.24 Spark Stage 알아보기 (0) 2023.12.23 Spark RDD Lineage 알아보기 (0) 2023.12.16 Spark RDD Storage 알아보기 (Persist, Cache) (0) 2023.12.15 [Spark] parallelize 알아보기 (0) 2023.12.15