-
[Spark] explode 함수와 Generate 연산자 알아보기Spark 2024. 8. 15. 10:07반응형
- 목차
explode 함수란 ?
Spark 의 explode 함수는 ArrayType 또는 MapType 의 타입의 칼럼의 값을 여러개의 Row 로 Flatten 하게 됩니다.
즉, 1개의 Row 가 0개 또는 N개의 Row 들로 Flatten 될 수 있습니다.
만약 ArrayType 의 칼럼의 값이 하나도 없다면, 변환되는 Row 의 갯수는 0개가 됩니다.
그리고 3개의 값을 가지는 ArrayType 의 칼럼은 3개의 Row 들로 변환됩니다.
이를 직접 코드로 살펴보면 아래와 같습니다.
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val rdd = spark.sparkContext.parallelize(Seq( Row("Andy", 45, Seq("Apple", "Banana")), Row("Bruce", 25, Seq("Candy", "Drink")), Row("Chris", 33, null), Row("Daniel", 21, Seq.empty[String]) )) val schema = StructType(Seq( StructField("name", StringType, nullable = false), StructField("age", IntegerType, nullable = false), StructField("eats", ArrayType(StringType), nullable = true) )) val df = spark.createDataFrame(rdd, schema) df.show(truncate = false)
+------+---+---------------+ |name |age|eats | +------+---+---------------+ |Andy |45 |[Apple, Banana]| |Bruce |25 |[Candy, Drink] | |Chris |33 |null | |Daniel|21 |[] | +------+---+---------------+
여기서 Chris 와 Daniel 에 해당하는 Row 는 eats 칼럼의 값이 null 또는 빈 리스트입니다.
만약 eats 칼럼을 대상으로 explode 함수를 사용하게 되면, 이 두가지 Row 는 사라지게 됩니다.
import org.apache.spark.sql.functions._ df.withColumn("food", explode(col("eats"))).show()
+-----+---+---------------+------+ | name|age| eats| food| +-----+---+---------------+------+ | Andy| 45|[Apple, Banana]| Apple| | Andy| 45|[Apple, Banana]|Banana| |Bruce| 25| [Candy, Drink]| Candy| |Bruce| 25| [Candy, Drink]| Drink| +-----+---+---------------+------+
explode_outer.
null 또는 빈 리스트의 존재를 보장하고 싶다면, explode_outer 함수를 사용합니다.
import org.apache.spark.sql.functions._ df.withColumn("food", explode_outer(col("eats"))).show()
+------+---+---------------+------+ | name|age| eats| food| +------+---+---------------+------+ | Andy| 45|[Apple, Banana]| Apple| | Andy| 45|[Apple, Banana]|Banana| | Bruce| 25| [Candy, Drink]| Candy| | Bruce| 25| [Candy, Drink]| Drink| | Chris| 33| null| null| |Daniel| 21| []| null| +------+---+---------------+------+
Generate 연산자란 ?
explode 함수는 내부적으로 Generate 연산자로 변환됩니다.
즉, Logical 또는 Physical Plan 관점에서 Generate 로써 표현되게 됩니다.
아래의 예시들은 explode 함수를 사용한 경우의 실행계획을 나타냅니다.
import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val rdd = spark.sparkContext.parallelize(Seq( Row("Andy", 45, Seq("Apple", "Banana")), Row("Bruce", 25, Seq("Candy", "Drink")), Row("Chris", 33, null), Row("Daniel", 21, Seq.empty[String]) )) val schema = StructType(Seq( StructField("name", StringType, nullable = false), StructField("age", IntegerType, nullable = false), StructField("eats", ArrayType(StringType), nullable = true) )) val df = spark.createDataFrame(rdd, schema) val explodedDF = df.withColumn("food", explode_outer(col("eats"))) explodedDF.explain("extended")
== Parsed Logical Plan == 'Project [name#71, age#72, eats#73, generatorouter(explode('eats)) AS food#83] +- LogicalRDD [name#71, age#72, eats#73], false == Analyzed Logical Plan == name: string, age: int, eats: array<string>, food: string Project [name#71, age#72, eats#73, food#84] +- Generate explode(eats#73), true, [food#84] +- LogicalRDD [name#71, age#72, eats#73], false == Optimized Logical Plan == Generate explode(eats#73), true, [food#84] +- LogicalRDD [name#71, age#72, eats#73], false == Physical Plan == *(1) Generate explode(eats#73), [name#71, age#72, eats#73], true, [food#84] +- *(1) Scan ExistingRDD[name#71,age#72,eats#73]
위 실행계획을 보면, Optimized Logical Plan 와 Physical Plan 에서 Generate explode 라는 내용을 확인할 수 있습니다.
Generate 연산자는 Filter, Project, Sort, Limit 등과 같이 1개의 Child Node 를 가지는 UnaryNode 로 표현됩니다.
이것이 무슨 의미냐하면, LogicalRDD 라는 Child Node 로부터 데이터를 전달받아 연산을 수행하는 하나의 계획임을 뜻합니다.
따라서 Generate 는 Child Node 로부터 데이터를 전달받아 ( InternalRow 또는 UnsafeRow ) 를 전달받아서 explode 라는 Flatten 연산을 수행합니다.
그리고 아래와 같이 텅스텐 최적화에 의해서 WholeStageCodegen 처리된 결과도 확인이 가능합니다.
아래 코드를 보면 74 라인부터 For Loop 구문이 보이는데요.
이 영역에서 explode 연산이 실행됩니다.
import org.apache.spark.sql.execution.debug._ explodedDF.debugCodegen()
Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 (maxMethodCodeSize:334; maxConstantPoolSize:157(0.24% used); numInnerClasses:0) == *(1) Generate explode(eats#5), [name#3, age#4, eats#5], true, [food#10] +- *(1) Scan ExistingRDD[name#3,age#4,eats#5] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator rdd_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] rdd_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3]; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] rdd_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[3]; /* 074 */ for (int generate_index_2 = 0; generate_index_2 < generate_numElements_2; generate_index_2++) { /* 075 */ if (generate_tmpInput_1.isNullAt(generate_index_2)) { /* 076 */ rdd_mutableStateArray_1[2].setNull8Bytes(generate_index_2); /* 077 */ } else { /* 078 */ rdd_mutableStateArray_1[2].write(generate_index_2, generate_tmpInput_1.getUTF8String(generate_index_2)); /* 079 */ } /* 080 */ /* 081 */ }
유용한 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.3.2 start-master.sh docker run -d --name worker1 --hostname worker1 --memory 5g --network spark bitnami/spark:3.3.2 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.3.2 spark-shell --master spark://master:7077 \ --conf spark.ui.port=4040 \ --conf spark.executor.memory=2g \ --conf spark.executor.memoryOverhead=2g \ --conf spark.executor.offHeap.enabled=true \ --conf spark.executor.offHeap.size=1536m \ --conf spark.memory.fraction=0.6 \ --conf spark.memory.storageFraction=0.5
반응형'Spark' 카테고리의 다른 글
[Spark] SortMergeJoin 알아보기 (0) 2024.08.17 [Spark] DataFrameReader 와 Parquet 최적화 알아보기 (Column Pruning, Predicate Pushdown) (0) 2024.08.15 [Spark] DataFrameWriter PartitionBy 알아보기 (0) 2024.08.12 [Spark] RDD Cogroup 연산 알아보기 (0) 2024.07.29 [Spark] RangePartitioner 알아보기 (0) 2024.07.29