ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] explode 함수와 Generate 연산자 알아보기
    Spark 2024. 8. 15. 10:07
    반응형

    - 목차

     

    explode 함수란 ?

    Spark 의 explode 함수는 ArrayType 또는 MapType 의 타입의 칼럼의 값을 여러개의 Row 로 Flatten 하게 됩니다.

    즉, 1개의 Row 가 0개 또는 N개의 Row 들로 Flatten 될 수 있습니다.

    만약 ArrayType 의 칼럼의 값이 하나도 없다면, 변환되는 Row 의 갯수는 0개가 됩니다.

    그리고 3개의 값을 가지는 ArrayType 의 칼럼은 3개의 Row 들로 변환됩니다.

     

    이를 직접 코드로 살펴보면 아래와 같습니다.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    val rdd = spark.sparkContext.parallelize(Seq(
      Row("Andy", 45, Seq("Apple", "Banana")),
      Row("Bruce", 25, Seq("Candy", "Drink")),
      Row("Chris", 33, null),
      Row("Daniel", 21, Seq.empty[String])
    ))
    
    val schema = StructType(Seq(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false),
      StructField("eats", ArrayType(StringType), nullable = true)
    ))
    
    val df = spark.createDataFrame(rdd, schema)
    df.show(truncate = false)

     

    +------+---+---------------+
    |name  |age|eats           |
    +------+---+---------------+
    |Andy  |45 |[Apple, Banana]|
    |Bruce |25 |[Candy, Drink] |
    |Chris |33 |null           |
    |Daniel|21 |[]             |
    +------+---+---------------+

     

    여기서 Chris 와 Daniel 에 해당하는 Row 는 eats 칼럼의 값이 null 또는 빈 리스트입니다.

    만약 eats 칼럼을 대상으로 explode 함수를 사용하게 되면, 이 두가지 Row 는 사라지게 됩니다.

     

    import org.apache.spark.sql.functions._
    
    df.withColumn("food", explode(col("eats"))).show()
    +-----+---+---------------+------+
    | name|age|           eats|  food|
    +-----+---+---------------+------+
    | Andy| 45|[Apple, Banana]| Apple|
    | Andy| 45|[Apple, Banana]|Banana|
    |Bruce| 25| [Candy, Drink]| Candy|
    |Bruce| 25| [Candy, Drink]| Drink|
    +-----+---+---------------+------+

     

     

    explode_outer.

    null 또는 빈 리스트의 존재를 보장하고 싶다면, explode_outer 함수를 사용합니다.

    import org.apache.spark.sql.functions._
    
    df.withColumn("food", explode_outer(col("eats"))).show()
    +------+---+---------------+------+
    |  name|age|           eats|  food|
    +------+---+---------------+------+
    |  Andy| 45|[Apple, Banana]| Apple|
    |  Andy| 45|[Apple, Banana]|Banana|
    | Bruce| 25| [Candy, Drink]| Candy|
    | Bruce| 25| [Candy, Drink]| Drink|
    | Chris| 33|           null|  null|
    |Daniel| 21|             []|  null|
    +------+---+---------------+------+

     

    Generate 연산자란 ?

    explode 함수는 내부적으로 Generate 연산자로 변환됩니다.

    즉, Logical 또는 Physical Plan 관점에서 Generate 로써 표현되게 됩니다.

    아래의 예시들은 explode 함수를 사용한 경우의 실행계획을 나타냅니다.

     

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    val rdd = spark.sparkContext.parallelize(Seq(
      Row("Andy", 45, Seq("Apple", "Banana")),
      Row("Bruce", 25, Seq("Candy", "Drink")),
      Row("Chris", 33, null),
      Row("Daniel", 21, Seq.empty[String])
    ))
    
    val schema = StructType(Seq(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false),
      StructField("eats", ArrayType(StringType), nullable = true)
    ))
    
    val df = spark.createDataFrame(rdd, schema)
    val explodedDF = df.withColumn("food", explode_outer(col("eats")))
    explodedDF.explain("extended")

     

    == Parsed Logical Plan ==
    'Project [name#71, age#72, eats#73, generatorouter(explode('eats)) AS food#83]
    +- LogicalRDD [name#71, age#72, eats#73], false
    
    == Analyzed Logical Plan ==
    name: string, age: int, eats: array<string>, food: string
    Project [name#71, age#72, eats#73, food#84]
    +- Generate explode(eats#73), true, [food#84]
       +- LogicalRDD [name#71, age#72, eats#73], false
    
    == Optimized Logical Plan ==
    Generate explode(eats#73), true, [food#84]
    +- LogicalRDD [name#71, age#72, eats#73], false
    
    == Physical Plan ==
    *(1) Generate explode(eats#73), [name#71, age#72, eats#73], true, [food#84]
    +- *(1) Scan ExistingRDD[name#71,age#72,eats#73]

     

    위 실행계획을 보면, Optimized Logical Plan 와 Physical Plan  에서 Generate explode 라는 내용을 확인할 수 있습니다.

    Generate 연산자는 Filter, Project, Sort, Limit 등과 같이 1개의 Child Node 를 가지는 UnaryNode 로 표현됩니다.

    이것이 무슨 의미냐하면, LogicalRDD 라는 Child Node 로부터 데이터를 전달받아 연산을 수행하는 하나의 계획임을 뜻합니다.

    따라서 Generate 는 Child Node 로부터 데이터를 전달받아 ( InternalRow 또는 UnsafeRow ) 를 전달받아서 explode 라는 Flatten 연산을 수행합니다.

     

    그리고 아래와 같이 텅스텐 최적화에 의해서 WholeStageCodegen 처리된 결과도 확인이 가능합니다.

    아래 코드를 보면 74 라인부터 For Loop 구문이 보이는데요.

    이 영역에서 explode 연산이 실행됩니다.

    import org.apache.spark.sql.execution.debug._
    
    explodedDF.debugCodegen()
    Found 1 WholeStageCodegen subtrees.
    == Subtree 1 / 1 (maxMethodCodeSize:334; maxConstantPoolSize:157(0.24% used); numInnerClasses:0) ==
    *(1) Generate explode(eats#5), [name#3, age#4, eats#5], true, [food#10]
    +- *(1) Scan ExistingRDD[name#3,age#4,eats#5]
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private scala.collection.Iterator rdd_input_0;
    /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] rdd_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
    /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] rdd_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[3];
    
    /* 074 */           for (int generate_index_2 = 0; generate_index_2 < generate_numElements_2; generate_index_2++) {
    /* 075 */             if (generate_tmpInput_1.isNullAt(generate_index_2)) {
    /* 076 */               rdd_mutableStateArray_1[2].setNull8Bytes(generate_index_2);
    /* 077 */             } else {
    /* 078 */               rdd_mutableStateArray_1[2].write(generate_index_2, generate_tmpInput_1.getUTF8String(generate_index_2));
    /* 079 */             }
    /* 080 */
    /* 081 */           }

     

     

    유용한 명령어들.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.3.2 start-master.sh
    
    docker run -d --name worker1 --hostname worker1  --memory 5g --network spark bitnami/spark:3.3.2 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.3.2 spark-shell --master spark://master:7077 \
    --conf spark.ui.port=4040 \
    --conf spark.executor.memory=2g \
    --conf spark.executor.memoryOverhead=2g \
    --conf spark.executor.offHeap.enabled=true \
    --conf spark.executor.offHeap.size=1536m \
    --conf spark.memory.fraction=0.6 \
    --conf spark.memory.storageFraction=0.5

     

     

     

    반응형
Designed by Tistory.