ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] DataFrameWriter PartitionBy 알아보기
    Spark 2024. 8. 12. 05:50
    반응형

    - 목차

     

     

    partitionBy 이란 ?

    Spark DataFrame API 에서 write 명령어를 사용하면 손쉽게 DataFrameWriter 객체를 생성할 수 있습니다.

    이름에서 알 수 있듯이 DataFrameWriter 는 데이터를 외부 데이터소스에 추가하거나 파일을 생성하는 작업을 수행합니다.

    여기서 partitionBy 라는 함수가 사용되면, 생성하고자하는 파일을 물리적인 레벨에서 디렉토리를 세분화할 수 있습니다.

     

    예를 들어, DataFrame 이 country 라는 칼럼을 가지고 이 칼럼은 KR 또는 US 라는 값을 가진다고 가정합니다.

    그리고 DataFrame.partitionBy("country").parquet("/data/output") 와 같이 명령을 실행해게 되면, 

    아래와 같이 Partitioning Column 을 기준으로 Directory 가 생성되게 됩니다. 

    /data/output/
    ├── country=KR/
    │   ├── part-00000.parquet
    ├── country=US/
    │   ├── part-00001.parquet

     

    partitionBy 사용 예시.

    partitionBy 를 사용하게 되면 Disk Level 에서 물리적인 파티셔닝이 가능합니다.

    위 문장의 의미는 실제 생성되는 파일들이 partitionBy 에 사용된 Column 의 값으로 기준으로 분류가 됩니다.

    Partitioning Column 으로 사용된 major 칼럼이 총 4개의 값을 가진다면, 결과적으로 4개의 폴더가 생성되게 됩니다.

    import org.apache.spark.sql.types._
    
    val studentSchema = StructType(Seq(
      StructField("id", IntegerType, nullable = true),
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("major", StringType, nullable = true)
    ))
    
    val df = (spark.read.format("csv")
    .schema(studentSchema)
    .option("header", "false")
    .option("inferSchema", "false")
    .option("delimiter", ",")
    .load("hdfs://namenode:8020/user/spark/input/students.csv"))
    
    df.write
    	.format("csv")
    	.partitionBy("major")
    	.mode("overwrite")
    	.save("hdfs://namenode:8020/user/spark/output")
    /user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv

     

     

    위의 경우에는 major 값은 각각 Biology, CS, Math, Physics 인 4개의 값을 가집니다.

     

    partitionBy 와 생성되는 파일 수의 관계.

    partitionBy 가 적용되면 Partitioning Column 을 기준으로 새로운 디렉토리가 생성됩니다.

    그리고 Partitioning Directory 하위에 여러개의 File 들이 생성됩니다.

    이때에 각 Partitioning Directory 마다 몇개의 파일들이 생성될까요 ?

    이 기준은 DataFrame 의 Partition 갯수와 레코드 갯수에 의해서 결정됩니다.

     

    val df = ((1 to 4).toDF("raw_key").repartitionByRange(4, col("raw_key"))
    	.withColumn("digits", array(lit(1), lit(2), lit(3), lit(4)))
      .withColumn("key", explode(col("digits")))
      .withColumn("value", lit("_"))
      .select("key", "value")
    )
    
    df.write
    	.format("csv")
    	.partitionBy("key")
    	.mode("overwrite")
    	.save("hdfs://namenode:8020/user/spark/output11")

     

     

    아래의 상태에 DataFrame 은 4개의 Partition 을 가지며, 각 파티션이 1, 2, 3, 4 라는 동일한 Key 를 가집니다.

    df.rdd.glom().collect()
    res114: Array[Array[org.apache.spark.sql.Row]] = Array(
        Array([1, _], [2, _], [3, _], [4, _]), 
        Array([1, _], [2, _], [3, _], [4, _]), 
        Array([1, _], [2, _], [3, _], [4, _]), 
        Array([1, _], [2, _], [3, _], [4, _])
    )
    
    df.show(5, truncate = false)
    +---+-----+
    |key|value|
    +---+-----+
    |1  |_    |
    |2  |_    |
    |3  |_    |
    |4  |_    |
    |1  |_    |
    +---+-----+
    only showing top 5 rows

     

     

    이 상황에서 partitionBy 를 적용하게 되면, 각 Partitioning Column 마다 총 4개의 File 이 생성됩니다.

    /user/spark/output11/_SUCCESS
    /user/spark/output11/key=1
     /spark/output11/key=1/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=1/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=1/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=1/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=2
    /user/spark/output11/key=2/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=2/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=2/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=2/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=3
    /user/spark/output11/key=3/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=3/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=3/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=3/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=4
    /user/spark/output11/key=4/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=4/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=4/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
    /user/spark/output11/key=4/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv

     

     

    위의 상황을 요약하자면,

    여러개의 Task 또는 Partition 이 특정 파티셔닝 키에 해당하는 디렉토리 하위에 파일을 생성합니다.

    Partition 1 : [1,2,3,4]

    Partition 2 : [1,2,3,4]

    Partition 3 : [1,2,3,4]

    Partition 4 : [1,2,3,4]

    와 같이 모든 파티션이 1, 2, 3, 4 라는 똑같은 Key 를 가지는 레코드를 가집니다.

    따라서 각 파티셔닝 키의 폴더 하위에 모든 파티션이 파일을 생성하기 때문에 파티션 갯수만큼 파일이 생성되게 됩니다. 

     

    maxRecordPerFile.

    maxRecordsPerFile 은 DataFrameWriter 에서 사용되는 옵션입니다.

    maxRecordsPerFile 옵션은 생성되는 파일이 최대 몇개의 레코드를 저장할지 지정합니다.

     

    예를 들어 아래와 같이 students.csv 파일의 major 별 수치를 확인해보면 각 major 당 200명의 Record 가 존재합니다.

    import org.apache.spark.sql.types._
    
    val studentSchema = StructType(Seq(
      StructField("id", IntegerType, nullable = true),
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("major", StringType, nullable = true)
    ))
    
    val df = (spark.read
    	.format("csv")
    	.schema(studentSchema)
    	.option("inferSchema", "true")
    	.option("header", "false")
    	.option("path", "hdfs://namenode:8020/user/spark/input/students.csv")
    	.load()
    )
    
    df.groupBy("major").count().show()
    +-------+-----+
    |  major|count|
    +-------+-----+
    |   Math|  232|
    |     CS|  272|
    |Physics|  249|
    |Biology|  247|
    +-------+-----+

     

    이러한 데이터셋을 대상으로 maxRecordsPerFile 을 200 과 300 으로 설정해보도록 하겠습니다.

     

    df.write
    	.format("csv")
    	.partitionBy("major")
    	.mode("overwrite")
    	.option("maxRecordsPerFile", "300") // 또는 200
    	.save("hdfs://namenode:8020/user/spark/output3")

     

    
    // maxRecordsPerFile 이 200 인 결과
    
    /user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv
    /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv
    /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv
    /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
    /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv
    
    // maxRecordsPerFile 이 300 인 결과
    
    /user/spark/output3/major=Biology/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv
    /user/spark/output3/major=CS/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv
    /user/spark/output3/major=Math/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv
    /user/spark/output3/major=Physics/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv

     

    위와 같이 파일이 수용할 수 있는 최대 레코드 수를 제한함으로써 생성되는 파일의 갯수가 달라지게 됩니다.

     

     

    repartition 과 partitionBy 의 관계.

    repartition 을 사용하면, 파티셔닝 키 폴더 하위에 하나의 파일을 생성할 수 있습니다.

    당연히 maxRecordsPerFile 설정에 의해서 파일의 갯수가 증가할 순 있겠지만,

    논리적으로 repartition 와 partitionBy 의 칼럼을 일치시켜주면 파티셔닝 키마다 하나의 파일이 생성될 수 있습니다.

     

    아래와 같이 partitionBy 가 단독으로 사용되는 경우에는 Partition 의 갯수만큼 파일이 생성됩니다.

    val df = ((1 to 1000).toDF("raw_key").repartitionByRange(4, col("raw_key"))
    	.withColumn("digits", array(lit(1), lit(2), lit(3), lit(4)))
      .withColumn("key", explode(col("digits")))
      .withColumn("value", lit("_"))
      .select("key", "value")
    )
    
    df.write
    	.format("csv")
    	.partitionBy("key")
    	.mode("overwrite")
    	.save("hdfs://namenode:8020/user/spark/output12")

     

    /user/spark/output12/_SUCCESS
    /user/spark/output12/key=1
    /user/spark/output12/key=1/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=1/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=1/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=1/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=2
    /user/spark/output12/key=2/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=2/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=2/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=2/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=3
    /user/spark/output12/key=3/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=3/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=3/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=3/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=4
    /user/spark/output12/key=4/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=4/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=4/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
    /user/spark/output12/key=4/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv

     

     

    하지만 아래와 같이 DataFrame 에 repartition 연산을 추가하며, partitionBy 에 적용된 파티셔닝 키와 동일하게 repartition 을 시도합니다.

    이렇게 되면 각 파티셔닝 키마다 Partition 이 분배되므로 파일 생성 시에 각 파티셔닝 키 마다 하나의 파티션이 존재하게 됩니다.

     

    val df = ((1 to 1000).toDF("raw_key").repartitionByRange(4, col("raw_key"))
    	.withColumn("digits", array(lit(1), lit(2), lit(3), lit(4)))
      .withColumn("key", explode(col("digits")))
      .withColumn("value", lit("_"))
      .select("key", "value")
    )
    
    df.repartition(col("key"))
    	.write
    	.format("csv")
    	.partitionBy("key")
    	.mode("overwrite")
    	.save("hdfs://namenode:8020/user/spark/output13")
    /user/spark/output13/_SUCCESS
    /user/spark/output13/key=1
    /user/spark/output13/key=1/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv
    /user/spark/output13/key=2
    /user/spark/output13/key=2/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv
    /user/spark/output13/key=3
    /user/spark/output13/key=3/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv
    /user/spark/output13/key=4
    /user/spark/output13/key=4/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv

     

     

    관련된 명령어.

    cat <<EOF> /tmp/core-site.xml
    <configuration>
    	<property>
    		<name>fs.defaultFS</name>
    		<value>hdfs://namenode:8020</value>
    	</property>
    </configuration>
    EOF
    
    cat <<EOF> /tmp/start-namenode.sh
    #!/bin/bash
    
    NAMENODE_DIR="/tmp/hadoop-hadoop/dfs/name"
    
    if [ ! -d "$NAMENODE_DIR/current" ]; then
      echo "❗️NameNode directory not found. Formatting..."
      hdfs namenode -format -force
    else
      echo "✅ NameNode directory exists. Skipping format."
    fi
    
    echo "🚀 Starting NameNode..."
    hdfs namenode
    EOF
    
    chmod 755 /tmp/start-namenode.sh
    
    docker network create spark
    docker run -d --network spark --name namenode --hostname namenode -p 9870:9870 -v /tmp/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml -v /tmp/start-namenode.sh:/tmp/start-namenode.sh apache/hadoop:3.4.1 /tmp/start-namenode.sh
    docker run -d --network spark --name datanode --hostname datanode -v /tmp/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml apache/hadoop:3.4.1 hdfs datanode
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    
    
    
    
    
    
    docker run -it --rm --network spark apache/hadoop:3.4.1 bash
    
    awk -v OFS=',' '
      BEGIN {
        majors[0]="CS"; majors[1]="Math"; majors[2]="Physics"; majors[3]="Biology";
        for (i = 1; i <= 1000000; i++) {
          id = 1000 + int(rand() * 9000);
          age = 18 + int(rand() * 10);
          name = "student" id;
          major = majors[int(rand() * 4)];
          print id, name, age, major;
        }
      }
    ' > /tmp/students.csv
    
    
    export HADOOP_USER_NAME=hadoop 
    hdfs dfs -fs hdfs://namenode:8020 -mkdir -p /user/spark/input/
    export HADOOP_USER_NAME=hadoop 
    hdfs dfs -fs hdfs://namenode:8020 -chown spark /user/spark
    hdfs dfs -fs hdfs://namenode:8020 -chown spark /user/spark/input
    
    
    export HADOOP_USER_NAME=spark
    hdfs dfs -fs hdfs://namenode:8020 -put /tmp/students.csv /user/spark/input/students.csv
    
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040

     

     

     

     

     

     

    반응형

    'Spark' 카테고리의 다른 글

Designed by Tistory.