-
[Spark] s3a Protocol 에서 Driver 가 파일 존재를 확인하는 과정Spark 2024. 7. 23. 06:11반응형
- 목차
들어가며.
Spark Driver 는 s3a Protocol 을 사용함으로써 S3 에 존재하는 Object File 을 Read 할 수 있습니다.
S3 에 존재하는 파일을 실질적으로 다운로드하는 주체는 Spark 의 Worker Node 이지만,
Driver 는 S3 에 지정된 File 이 존재하는지 유무를 먼저 체크하게 됩니다.
이번 글에서는 Driver 가 어떠한 방식으로 S3 로부터 File 존재 유무를 체크하는지 알아보는 시간을 가지겠습니다.
Driver 는 S3 ListObjectV2 API 를 활용한다.
먼저 아래와 같은 형식으로 Spark Application 스크립트를 준비합니다.
그리고 S3 를 Minio 로 대체합니다.
from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum spark = SparkSession.builder \ .appName("Driver_Prefetch_Files") \ .master("spark://master:7077") \ .config("spark.hadoop.fs.s3a.access.key", "ROOTUSER") \ .config("spark.hadoop.fs.s3a.secret.key", "123456789") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \ .config("spark.hadoop.fs.s3a.path.style.access", "true") \ .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.3,com.amazonaws/aws-java-sdk-bundle:1.12.262,org.apache.hadoop:hadoop-common:3.2.3") \ .getOrCreate() s3_path = "s3a://newyork/yellow_tripdata_2019-01.csv" df = spark.read.csv(s3_path) df.show(100) spark.stop()
그리고 아래는 Spark Driver 가 S3 에게 요청하는 2개의 TCP Packet 을 일부 가져왔습니다.
총 2개의 HTTP Request 를 요청합니다.
첫번째 요청은 ListObjectV2 요청을 전송합니다.
delimeter, prefix, max-keys 등의 파라미터와 함께 Spark Application 이 Fetch 해야할 데이터가 존재하는지 목록을 요청합니다.
그리고 두번째 요청은 Head Method 를 통해서 Fetch 해야할 Object File 을 FindOne 요청을 보냅니다.
이를 통해서 S3 Bucket 내부에 실제로 데이터가 존재하는지 체크할 수 있습니다.
00:57:58.783017 IP modest_swanson.spark.59492 > minio.9000: Flags [P.], seq 0:907, ack 1, win 502, options [nop,nop,TS val 3771253838 ecr 2048661565], length 907 0x0000: 4500 03bf 4a45 4000 4006 94c0 ac13 0007 E...JE@.@....... 0x0010: ac13 0006 e864 2328 80ce d7ef f3f6 6f46 .....d#(......oF 0x0020: 8018 01f6 5be5 0000 0101 080a e0c8 c44e ....[..........N 0x0030: 7a1c 183d 4745 5420 2f6e 6577 796f 726b z..=GET./newyork 0x0040: 2f3f 6c69 7374 2d74 7970 653d 3226 6465 /?list-type=2&de 0x0050: 6c69 6d69 7465 723d 2532 4626 6d61 782d limiter=%2F&max- 0x0060: 6b65 7973 3d35 3030 3026 7072 6566 6978 keys=5000&prefix 0x0070: 3d79 656c 6c6f 775f 7472 6970 6461 7461 =yellow_tripdata 0x0080: 5f32 3031 392d 3031 2e63 7376 2532 4626 _2019-01.csv%2F& 0x0090: 6665 7463 682d 6f77 6e65 723d 6661 6c73 fetch-owner=fals 0x00a0: 6520 4854 5450 2f31 2e31 0d0a 486f 7374 e.HTTP/1.1..Host 0x00b0: 3a20 6d69 6e69 6f3a 3930 3030 0d0a 616d :.minio:9000..am 00:57:58.786001 IP modest_swanson.spark.59492 > minio.9000: Flags [P.], seq 907:1748, ack 771, win 496, options [nop,nop,TS val 3771253841 ecr 2048661566], length 841 0x0000: 4500 037d 4a47 4000 4006 9500 ac13 0007 E..}JG@.@....... 0x0010: ac13 0006 e864 2328 80ce db7a f3f6 7248 .....d#(...z..rH 0x0020: 8018 01f0 5ba3 0000 0101 080a e0c8 c451 ....[..........Q 0x0030: 7a1c 183e 4845 4144 202f 6e65 7779 6f72 z..>HEAD./newyor 0x0040: 6b2f 7965 6c6c 6f77 5f74 7269 7064 6174 k/yellow_tripdat 0x0050: 615f 3230 3139 2d30 312e 6373 7620 4854 a_2019-01.csv.HT 0x0060: 5450 2f31 2e31 0d0a 486f 7374 3a20 6d69 TP/1.1..Host:.mi 0x0070: 6e69 6f3a 3930 3030 0d0a 616d 7a2d 7364 nio:9000..amz-sd 0x0080: 6b2d 696e 766f 6361 7469 6f6e 2d69 643a k-invocation-id: 0x0090: 2065 6430 6365 3163 312d 3861 3630 2d37 .ed0ce1c1-8a60-7 0x00a0: 3031 332d 3862 6436 2d33 3661 3034 3634 013-8bd6-36a0464
S3 Bucket 에 파일이 존재하지 않는다면 ?
Driver 가 존재하지 않는 파일을 S3 에게 요청하게 된다면, 아래와 같은 상태 결과를 확인할 수 있습니다.
Driver 는 실행 계획을 준비하는 과정에서 아래와 같이 "Path does not exist" 인 예외 상황을 출력하며
pyspark.sql.utils.AnalysisException: Path does not exist: s3a://newyork/not_exists.csv
S3 로부터 File 이 존재하지 않는다는 응답을 받게 됩니다.
- <KeyCount>0</KeyCount>
- The sepcified key does not exist
22:36:08.325296 IP minio.9000 > modest_swanson.spark.42860: Flags [P.], seq 1:755, ack 892, win 53607, options [nop,nop,TS val 2050817946 ecr 3773410217], length 754 // .. 생략 0x0210: 0d0a 0d0a 3c3f 786d 6c20 7665 7273 696f ....<?xml.versio 0x0220: 6e3d 2231 2e30 2220 656e 636f 6469 6e67 n="1.0".encoding 0x0230: 3d22 5554 462d 3822 3f3e 0a3c 4c69 7374 ="UTF-8"?>.<List 0x0240: 4275 636b 6574 5265 7375 6c74 2078 6d6c BucketResult.xml 0x0250: 6e73 3d22 6874 7470 3a2f 2f73 332e 616d ns="http://s3.am 0x0260: 617a 6f6e 6177 732e 636f 6d2f 646f 632f azonaws.com/doc/ 0x0270: 3230 3036 2d30 332d 3031 2f22 3e3c 4e61 2006-03-01/"><Na 0x0280: 6d65 3e6e 6577 796f 726b 3c2f 4e61 6d65 me>newyork</Name 0x0290: 3e3c 5072 6566 6978 3e6e 6f74 5f65 7869 ><Prefix>not_exi 0x02a0: 7374 732e 6373 762f 3c2f 5072 6566 6978 sts.csv/</Prefix 0x02b0: 3e3c 4b65 7943 6f75 6e74 3e30 3c2f 4b65 ><KeyCount>0</Ke 0x02c0: 7943 6f75 6e74 3e3c 4d61 784b 6579 733e yCount><MaxKeys> 0x02d0: 323c 2f4d 6178 4b65 7973 3e3c 4465 6c69 2</MaxKeys><Deli 0x02e0: 6d69 7465 723e 2f3c 2f44 656c 696d 6974 miter>/</Delimit 0x02f0: 6572 3e3c 4973 5472 756e 6361 7465 643e er><IsTruncated> 0x0300: 6661 6c73 653c 2f49 7354 7275 6e63 6174 false</IsTruncat 0x0310: 6564 3e3c 2f4c 6973 7442 7563 6b65 7452 ed></ListBucketR 0x0320: 6573 756c 743e esult> 22:36:08.332189 IP minio.9000 > modest_swanson.spark.42862: Flags [P.], seq 1:543, ack 808, win 53013, options [nop,nop,TS val 2050817953 ecr 3773410224], length 542 // .. 생략 0x0190: 4e6f 5375 6368 4b65 790d 0a58 2d4d 696e NoSuchKey..X-Min 0x01a0: 696f 2d45 7272 6f72 2d44 6573 633a 2022 io-Error-Desc:." 0x01b0: 5468 6520 7370 6563 6966 6965 6420 6b65 The.specified.ke 0x01c0: 7920 646f 6573 206e 6f74 2065 7869 7374 y.does.not.exist 0x01d0: 2e22 0d0a 582d 5261 7465 6c69 6d69 742d ."..X-Ratelimit- 0x01e0: 4c69 6d69 743a 2031 3535 300d 0a58 2d52 Limit:.1550..X-R 0x01f0: 6174 656c 696d 6974 2d52 656d 6169 6e69 atelimit-Remaini
관련된 명령어들.
docker network create spark docker run -d --name minio --hostname minio --network spark -e MINIO_ROOT_USER=ROOTUSER -e MINIO_ROOT_PASSWORD=123456789 -p 9001:9001 bitnami/minio:latest docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --cpus 1 --memory 1g --network spark bitnami/spark:3.2.3-s3a start-worker.sh spark://master:7077 wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar -P $SPARK_HOME/jars/ wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.2/hadoop-common-3.3.2.jar -P $SPARK_HOME/jars/ wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -P $SPARK_HOME/jars/ docker commit worker1 bitnami/spark:3.2.3-s3a docker run -it --rm --user root --network spark --privileged -p 4040:4040 bitnami/spark:3.2.3 bash docker run -it --rm apache/hadoop:3.4.1 bash /opt/spark/sbin/start-master.sh -i 0.0.0.0 -p 8080
반응형'Spark' 카테고리의 다른 글
[Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26 [Spark] RDD GroupBy 와 Shuffle 알아보기 (0) 2024.07.09 [Spark] RDD treeReduce 알아보기 (0) 2024.07.09 [Spark] RDD Reduce 내부 동작 알아보기 (0) 2024.07.09