ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] s3a Protocol 에서 Driver 가 파일 존재를 확인하는 과정
    Spark 2024. 7. 23. 06:11
    반응형

    - 목차

     

    들어가며.

    Spark Driver 는 s3a Protocol 을 사용함으로써 S3 에 존재하는 Object File 을 Read 할 수 있습니다.

    S3 에 존재하는 파일을 실질적으로 다운로드하는 주체는 Spark 의 Worker Node 이지만,

    Driver 는 S3 에 지정된 File 이 존재하는지 유무를 먼저 체크하게 됩니다. 

    이번 글에서는 Driver 가 어떠한 방식으로 S3 로부터 File 존재 유무를 체크하는지 알아보는 시간을 가지겠습니다.

     

     

     

    Driver 는 S3 ListObjectV2 API 를 활용한다.

    먼저 아래와 같은 형식으로 Spark Application 스크립트를 준비합니다.

    그리고 S3 를 Minio 로 대체합니다.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sum
    
    spark = SparkSession.builder \
        .appName("Driver_Prefetch_Files") \
        .master("spark://master:7077") \
        .config("spark.hadoop.fs.s3a.access.key", "ROOTUSER") \
        .config("spark.hadoop.fs.s3a.secret.key", "123456789") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.3,com.amazonaws/aws-java-sdk-bundle:1.12.262,org.apache.hadoop:hadoop-common:3.2.3") \
        .getOrCreate()
    
    
    s3_path = "s3a://newyork/yellow_tripdata_2019-01.csv"
    df = spark.read.csv(s3_path)
    df.show(100)
    spark.stop()

     

    그리고 아래는 Spark Driver 가 S3 에게 요청하는 2개의 TCP Packet 을 일부 가져왔습니다.

    총 2개의 HTTP Request 를 요청합니다.

    첫번째 요청은 ListObjectV2 요청을 전송합니다.

    delimeter, prefix, max-keys 등의 파라미터와 함께 Spark Application 이 Fetch 해야할 데이터가 존재하는지 목록을 요청합니다.

    그리고 두번째 요청은 Head Method 를 통해서 Fetch 해야할 Object File 을 FindOne 요청을 보냅니다.

    이를 통해서 S3 Bucket 내부에 실제로 데이터가 존재하는지 체크할 수 있습니다.

     

    00:57:58.783017 IP modest_swanson.spark.59492 > minio.9000: Flags [P.], seq 0:907, ack 1, win 502, options [nop,nop,TS val 3771253838 ecr 2048661565], length 907
    	0x0000:  4500 03bf 4a45 4000 4006 94c0 ac13 0007  E...JE@.@.......
    	0x0010:  ac13 0006 e864 2328 80ce d7ef f3f6 6f46  .....d#(......oF
    	0x0020:  8018 01f6 5be5 0000 0101 080a e0c8 c44e  ....[..........N
    	0x0030:  7a1c 183d 4745 5420 2f6e 6577 796f 726b  z..=GET./newyork
    	0x0040:  2f3f 6c69 7374 2d74 7970 653d 3226 6465  /?list-type=2&de
    	0x0050:  6c69 6d69 7465 723d 2532 4626 6d61 782d  limiter=%2F&max-
    	0x0060:  6b65 7973 3d35 3030 3026 7072 6566 6978  keys=5000&prefix
    	0x0070:  3d79 656c 6c6f 775f 7472 6970 6461 7461  =yellow_tripdata
    	0x0080:  5f32 3031 392d 3031 2e63 7376 2532 4626  _2019-01.csv%2F&
    	0x0090:  6665 7463 682d 6f77 6e65 723d 6661 6c73  fetch-owner=fals
    	0x00a0:  6520 4854 5450 2f31 2e31 0d0a 486f 7374  e.HTTP/1.1..Host
    	0x00b0:  3a20 6d69 6e69 6f3a 3930 3030 0d0a 616d  :.minio:9000..am
        
    00:57:58.786001 IP modest_swanson.spark.59492 > minio.9000: Flags [P.], seq 907:1748, ack 771, win 496, options [nop,nop,TS val 3771253841 ecr 2048661566], length 841
    	0x0000:  4500 037d 4a47 4000 4006 9500 ac13 0007  E..}JG@.@.......
    	0x0010:  ac13 0006 e864 2328 80ce db7a f3f6 7248  .....d#(...z..rH
    	0x0020:  8018 01f0 5ba3 0000 0101 080a e0c8 c451  ....[..........Q
    	0x0030:  7a1c 183e 4845 4144 202f 6e65 7779 6f72  z..>HEAD./newyor
    	0x0040:  6b2f 7965 6c6c 6f77 5f74 7269 7064 6174  k/yellow_tripdat
    	0x0050:  615f 3230 3139 2d30 312e 6373 7620 4854  a_2019-01.csv.HT
    	0x0060:  5450 2f31 2e31 0d0a 486f 7374 3a20 6d69  TP/1.1..Host:.mi
    	0x0070:  6e69 6f3a 3930 3030 0d0a 616d 7a2d 7364  nio:9000..amz-sd
    	0x0080:  6b2d 696e 766f 6361 7469 6f6e 2d69 643a  k-invocation-id:
    	0x0090:  2065 6430 6365 3163 312d 3861 3630 2d37  .ed0ce1c1-8a60-7
    	0x00a0:  3031 332d 3862 6436 2d33 3661 3034 3634  013-8bd6-36a0464

     

     

    S3 Bucket 에 파일이 존재하지 않는다면 ? 

    Driver 가 존재하지 않는 파일을 S3 에게 요청하게 된다면, 아래와 같은 상태 결과를 확인할 수 있습니다.

    Driver 는 실행 계획을 준비하는 과정에서 아래와 같이 "Path does not exist" 인 예외 상황을 출력하며

    pyspark.sql.utils.AnalysisException: Path does not exist: s3a://newyork/not_exists.csv

     

    S3 로부터 File 이 존재하지 않는다는 응답을 받게 됩니다.

    • <KeyCount>0</KeyCount>
    • The sepcified key does not exist
    22:36:08.325296 IP minio.9000 > modest_swanson.spark.42860: Flags [P.], seq 1:755, ack 892, win 53607, options [nop,nop,TS val 2050817946 ecr 3773410217], length 754
    	// .. 생략
    	0x0210:  0d0a 0d0a 3c3f 786d 6c20 7665 7273 696f  ....<?xml.versio
    	0x0220:  6e3d 2231 2e30 2220 656e 636f 6469 6e67  n="1.0".encoding
    	0x0230:  3d22 5554 462d 3822 3f3e 0a3c 4c69 7374  ="UTF-8"?>.<List
    	0x0240:  4275 636b 6574 5265 7375 6c74 2078 6d6c  BucketResult.xml
    	0x0250:  6e73 3d22 6874 7470 3a2f 2f73 332e 616d  ns="http://s3.am
    	0x0260:  617a 6f6e 6177 732e 636f 6d2f 646f 632f  azonaws.com/doc/
    	0x0270:  3230 3036 2d30 332d 3031 2f22 3e3c 4e61  2006-03-01/"><Na
    	0x0280:  6d65 3e6e 6577 796f 726b 3c2f 4e61 6d65  me>newyork</Name
    	0x0290:  3e3c 5072 6566 6978 3e6e 6f74 5f65 7869  ><Prefix>not_exi
    	0x02a0:  7374 732e 6373 762f 3c2f 5072 6566 6978  sts.csv/</Prefix
    	0x02b0:  3e3c 4b65 7943 6f75 6e74 3e30 3c2f 4b65  ><KeyCount>0</Ke
    	0x02c0:  7943 6f75 6e74 3e3c 4d61 784b 6579 733e  yCount><MaxKeys>
    	0x02d0:  323c 2f4d 6178 4b65 7973 3e3c 4465 6c69  2</MaxKeys><Deli
    	0x02e0:  6d69 7465 723e 2f3c 2f44 656c 696d 6974  miter>/</Delimit
    	0x02f0:  6572 3e3c 4973 5472 756e 6361 7465 643e  er><IsTruncated>
    	0x0300:  6661 6c73 653c 2f49 7354 7275 6e63 6174  false</IsTruncat
    	0x0310:  6564 3e3c 2f4c 6973 7442 7563 6b65 7452  ed></ListBucketR
    	0x0320:  6573 756c 743e                           esult>
    
    22:36:08.332189 IP minio.9000 > modest_swanson.spark.42862: Flags [P.], seq 1:543, ack 808, win 53013, options [nop,nop,TS val 2050817953 ecr 3773410224], length 542
    	// .. 생략
    	0x0190:  4e6f 5375 6368 4b65 790d 0a58 2d4d 696e  NoSuchKey..X-Min
    	0x01a0:  696f 2d45 7272 6f72 2d44 6573 633a 2022  io-Error-Desc:."
    	0x01b0:  5468 6520 7370 6563 6966 6965 6420 6b65  The.specified.ke
    	0x01c0:  7920 646f 6573 206e 6f74 2065 7869 7374  y.does.not.exist
    	0x01d0:  2e22 0d0a 582d 5261 7465 6c69 6d69 742d  ."..X-Ratelimit-
    	0x01e0:  4c69 6d69 743a 2031 3535 300d 0a58 2d52  Limit:.1550..X-R
    	0x01f0:  6174 656c 696d 6974 2d52 656d 6169 6e69  atelimit-Remaini

     

     

     

    관련된 명령어들.

    docker network create spark
    
    docker run -d --name minio --hostname minio --network spark -e MINIO_ROOT_USER=ROOTUSER -e MINIO_ROOT_PASSWORD=123456789 -p 9001:9001 bitnami/minio:latest
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --cpus 1 --memory 1g --network spark bitnami/spark:3.2.3-s3a start-worker.sh spark://master:7077
    
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar -P $SPARK_HOME/jars/
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.2/hadoop-common-3.3.2.jar -P $SPARK_HOME/jars/
    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -P $SPARK_HOME/jars/
    
    docker commit worker1 bitnami/spark:3.2.3-s3a
    
    docker run -it --rm --user root --network spark --privileged -p 4040:4040 bitnami/spark:3.2.3 bash
    
    docker run -it --rm apache/hadoop:3.4.1 bash
    
    /opt/spark/sbin/start-master.sh -i 0.0.0.0 -p 8080

     

     

    반응형
Designed by Tistory.