-
[Spark] DataFrameWriter PartitionBy 알아보기Spark 2024. 8. 12. 05:50반응형
- 목차
- partitionBy 이란 ?
- partitionBy 사용 예시.
- partitionBy 와 생성되는 파일 수의 관계.
- maxRecordPerFile.
- repartition 과 partitionBy 의 관계.
- 관련된 명령어.
partitionBy 이란 ?
Spark DataFrame API 에서 write 명령어를 사용하면 손쉽게 DataFrameWriter 객체를 생성할 수 있습니다.
이름에서 알 수 있듯이 DataFrameWriter 는 데이터를 외부 데이터소스에 추가하거나 파일을 생성하는 작업을 수행합니다.
여기서 partitionBy 라는 함수가 사용되면, 생성하고자하는 파일을 물리적인 레벨에서 디렉토리를 세분화할 수 있습니다.
예를 들어, DataFrame 이 country 라는 칼럼을 가지고 이 칼럼은 KR 또는 US 라는 값을 가진다고 가정합니다.
그리고 DataFrame.partitionBy("country").parquet("/data/output") 와 같이 명령을 실행해게 되면,
아래와 같이 Partitioning Column 을 기준으로 Directory 가 생성되게 됩니다.
/data/output/ ├── country=KR/ │ ├── part-00000.parquet ├── country=US/ │ ├── part-00001.parquet
partitionBy 사용 예시.
partitionBy 를 사용하게 되면 Disk Level 에서 물리적인 파티셔닝이 가능합니다.
위 문장의 의미는 실제 생성되는 파일들이 partitionBy 에 사용된 Column 의 값으로 기준으로 분류가 됩니다.
Partitioning Column 으로 사용된 major 칼럼이 총 4개의 값을 가진다면, 결과적으로 4개의 폴더가 생성되게 됩니다.
import org.apache.spark.sql.types._ val studentSchema = StructType(Seq( StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("major", StringType, nullable = true) )) val df = (spark.read.format("csv") .schema(studentSchema) .option("header", "false") .option("inferSchema", "false") .option("delimiter", ",") .load("hdfs://namenode:8020/user/spark/input/students.csv")) df.write .format("csv") .partitionBy("major") .mode("overwrite") .save("hdfs://namenode:8020/user/spark/output")
/user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv
위의 경우에는 major 값은 각각 Biology, CS, Math, Physics 인 4개의 값을 가집니다.
partitionBy 와 생성되는 파일 수의 관계.
partitionBy 가 적용되면 Partitioning Column 을 기준으로 새로운 디렉토리가 생성됩니다.
그리고 Partitioning Directory 하위에 여러개의 File 들이 생성됩니다.
이때에 각 Partitioning Directory 마다 몇개의 파일들이 생성될까요 ?
이 기준은 DataFrame 의 Partition 갯수와 레코드 갯수에 의해서 결정됩니다.
val df = ((1 to 4).toDF("raw_key").repartitionByRange(4, col("raw_key")) .withColumn("digits", array(lit(1), lit(2), lit(3), lit(4))) .withColumn("key", explode(col("digits"))) .withColumn("value", lit("_")) .select("key", "value") ) df.write .format("csv") .partitionBy("key") .mode("overwrite") .save("hdfs://namenode:8020/user/spark/output11")
아래의 상태에 DataFrame 은 4개의 Partition 을 가지며, 각 파티션이 1, 2, 3, 4 라는 동일한 Key 를 가집니다.
df.rdd.glom().collect() res114: Array[Array[org.apache.spark.sql.Row]] = Array( Array([1, _], [2, _], [3, _], [4, _]), Array([1, _], [2, _], [3, _], [4, _]), Array([1, _], [2, _], [3, _], [4, _]), Array([1, _], [2, _], [3, _], [4, _]) ) df.show(5, truncate = false) +---+-----+ |key|value| +---+-----+ |1 |_ | |2 |_ | |3 |_ | |4 |_ | |1 |_ | +---+-----+ only showing top 5 rows
이 상황에서 partitionBy 를 적용하게 되면, 각 Partitioning Column 마다 총 4개의 File 이 생성됩니다.
/user/spark/output11/_SUCCESS /user/spark/output11/key=1 /spark/output11/key=1/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=1/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=1/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=1/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=2 /user/spark/output11/key=2/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=2/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=2/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=2/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=3 /user/spark/output11/key=3/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=3/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=3/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=3/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=4 /user/spark/output11/key=4/part-00000-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=4/part-00001-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=4/part-00002-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv /user/spark/output11/key=4/part-00003-6f26ced9-db47-4788-8e56-1a38a8f34420.c000.csv
위의 상황을 요약하자면,
여러개의 Task 또는 Partition 이 특정 파티셔닝 키에 해당하는 디렉토리 하위에 파일을 생성합니다.
Partition 1 : [1,2,3,4]
Partition 2 : [1,2,3,4]
Partition 3 : [1,2,3,4]
Partition 4 : [1,2,3,4]
와 같이 모든 파티션이 1, 2, 3, 4 라는 똑같은 Key 를 가지는 레코드를 가집니다.
따라서 각 파티셔닝 키의 폴더 하위에 모든 파티션이 파일을 생성하기 때문에 파티션 갯수만큼 파일이 생성되게 됩니다.
maxRecordPerFile.
maxRecordsPerFile 은 DataFrameWriter 에서 사용되는 옵션입니다.
maxRecordsPerFile 옵션은 생성되는 파일이 최대 몇개의 레코드를 저장할지 지정합니다.
예를 들어 아래와 같이 students.csv 파일의 major 별 수치를 확인해보면 각 major 당 200명의 Record 가 존재합니다.
import org.apache.spark.sql.types._ val studentSchema = StructType(Seq( StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("major", StringType, nullable = true) )) val df = (spark.read .format("csv") .schema(studentSchema) .option("inferSchema", "true") .option("header", "false") .option("path", "hdfs://namenode:8020/user/spark/input/students.csv") .load() ) df.groupBy("major").count().show()
+-------+-----+ | major|count| +-------+-----+ | Math| 232| | CS| 272| |Physics| 249| |Biology| 247| +-------+-----+
이러한 데이터셋을 대상으로 maxRecordsPerFile 을 200 과 300 으로 설정해보도록 하겠습니다.
df.write .format("csv") .partitionBy("major") .mode("overwrite") .option("maxRecordsPerFile", "300") // 또는 200 .save("hdfs://namenode:8020/user/spark/output3")
// maxRecordsPerFile 이 200 인 결과 /user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=Biology/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=CS/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=Math/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c000.csv /user/spark/output2/major=Physics/part-00000-d02c48b3-aeae-4d41-9442-6c23ab8faca3.c001.csv // maxRecordsPerFile 이 300 인 결과 /user/spark/output3/major=Biology/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv /user/spark/output3/major=CS/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv /user/spark/output3/major=Math/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv /user/spark/output3/major=Physics/part-00000-1f60dba0-46cb-4517-ab77-20effad2bc76.c000.csv
위와 같이 파일이 수용할 수 있는 최대 레코드 수를 제한함으로써 생성되는 파일의 갯수가 달라지게 됩니다.
repartition 과 partitionBy 의 관계.
repartition 을 사용하면, 파티셔닝 키 폴더 하위에 하나의 파일을 생성할 수 있습니다.
당연히 maxRecordsPerFile 설정에 의해서 파일의 갯수가 증가할 순 있겠지만,
논리적으로 repartition 와 partitionBy 의 칼럼을 일치시켜주면 파티셔닝 키마다 하나의 파일이 생성될 수 있습니다.
아래와 같이 partitionBy 가 단독으로 사용되는 경우에는 Partition 의 갯수만큼 파일이 생성됩니다.
val df = ((1 to 1000).toDF("raw_key").repartitionByRange(4, col("raw_key")) .withColumn("digits", array(lit(1), lit(2), lit(3), lit(4))) .withColumn("key", explode(col("digits"))) .withColumn("value", lit("_")) .select("key", "value") ) df.write .format("csv") .partitionBy("key") .mode("overwrite") .save("hdfs://namenode:8020/user/spark/output12")
/user/spark/output12/_SUCCESS /user/spark/output12/key=1 /user/spark/output12/key=1/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=1/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=1/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=1/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=2 /user/spark/output12/key=2/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=2/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=2/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=2/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=3 /user/spark/output12/key=3/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=3/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=3/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=3/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=4 /user/spark/output12/key=4/part-00000-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=4/part-00001-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=4/part-00002-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv /user/spark/output12/key=4/part-00003-ecd27fbc-b9c1-46e9-9f49-c417ae3a5acb.c000.csv
하지만 아래와 같이 DataFrame 에 repartition 연산을 추가하며, partitionBy 에 적용된 파티셔닝 키와 동일하게 repartition 을 시도합니다.
이렇게 되면 각 파티셔닝 키마다 Partition 이 분배되므로 파일 생성 시에 각 파티셔닝 키 마다 하나의 파티션이 존재하게 됩니다.
val df = ((1 to 1000).toDF("raw_key").repartitionByRange(4, col("raw_key")) .withColumn("digits", array(lit(1), lit(2), lit(3), lit(4))) .withColumn("key", explode(col("digits"))) .withColumn("value", lit("_")) .select("key", "value") ) df.repartition(col("key")) .write .format("csv") .partitionBy("key") .mode("overwrite") .save("hdfs://namenode:8020/user/spark/output13")
/user/spark/output13/_SUCCESS /user/spark/output13/key=1 /user/spark/output13/key=1/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv /user/spark/output13/key=2 /user/spark/output13/key=2/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv /user/spark/output13/key=3 /user/spark/output13/key=3/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv /user/spark/output13/key=4 /user/spark/output13/key=4/part-00000-8a527ff9-3b1d-4126-aec5-3db9e9cad8b5.c000.csv
관련된 명령어.
cat <<EOF> /tmp/core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://namenode:8020</value> </property> </configuration> EOF cat <<EOF> /tmp/start-namenode.sh #!/bin/bash NAMENODE_DIR="/tmp/hadoop-hadoop/dfs/name" if [ ! -d "$NAMENODE_DIR/current" ]; then echo "❗️NameNode directory not found. Formatting..." hdfs namenode -format -force else echo "✅ NameNode directory exists. Skipping format." fi echo "🚀 Starting NameNode..." hdfs namenode EOF chmod 755 /tmp/start-namenode.sh docker network create spark docker run -d --network spark --name namenode --hostname namenode -p 9870:9870 -v /tmp/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml -v /tmp/start-namenode.sh:/tmp/start-namenode.sh apache/hadoop:3.4.1 /tmp/start-namenode.sh docker run -d --network spark --name datanode --hostname datanode -v /tmp/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml apache/hadoop:3.4.1 hdfs datanode docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --network spark apache/hadoop:3.4.1 bash awk -v OFS=',' ' BEGIN { majors[0]="CS"; majors[1]="Math"; majors[2]="Physics"; majors[3]="Biology"; for (i = 1; i <= 1000000; i++) { id = 1000 + int(rand() * 9000); age = 18 + int(rand() * 10); name = "student" id; major = majors[int(rand() * 4)]; print id, name, age, major; } } ' > /tmp/students.csv export HADOOP_USER_NAME=hadoop hdfs dfs -fs hdfs://namenode:8020 -mkdir -p /user/spark/input/ export HADOOP_USER_NAME=hadoop hdfs dfs -fs hdfs://namenode:8020 -chown spark /user/spark hdfs dfs -fs hdfs://namenode:8020 -chown spark /user/spark/input export HADOOP_USER_NAME=spark hdfs dfs -fs hdfs://namenode:8020 -put /tmp/students.csv /user/spark/input/students.csv docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
반응형'Spark' 카테고리의 다른 글
[Spark] RDD Cogroup 연산 알아보기 (0) 2024.07.29 [Spark] RangePartitioner 알아보기 (0) 2024.07.29 [Spark] RDD combineByKey 알아보기 (0) 2024.07.29 [Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26