-
[Spark] S3 DataFrameReader 구현하기 (s3a)Spark 2024. 1. 28. 07:39728x90반응형
- 목차
Hadoop-AWS module.
Spark 에서 S3 Object 들을 읽어들이기 위해서 s3a 프로토콜을 사용해야합니다.
s3a 는 BigData Processing Workload 를 처리하기 위해서 개발된 S3 의 새로운 기능이구요.
BigData Processing Workload 라 함은 분산저장된 여러 Blocks 들을 효율적으로 조회하고 처리함을 뜻합니다.
s3a 은 S3 의 Object File 들을 Hadoop 의 분산 저장된 여러 Block 들을 조회하는 것과 같이
효율적인 Object 조회를 돕는 프로토콜 또는 파일시스템이구요.
아래 이미지와 같이 여러 Object 들을 병렬/동시적으로 조회하여 데이터를 처리할 수 있습니다.
Hadoop-AWS 모듈은 s3a 를 처리하기 위한 클라이언트 기능을 제공합니다.
즉, s3a 프로토콜을 해석할 수 있는 S3 Storage Server 의 클라이언트인 것이죠.
그래서 S3 에 저장된 파일들을 처리하기 위해서 우선적으로 Hadoop-AWS 모듈이 필요합니다.
Minio 설치하기.
저는 S3 대신 Minio 를 사용하였고, Minio 세팅을 위한 링크를 아래에 첨부하겠습니다.
https://westlife0615.tistory.com/582
저는 아래와 같은 구조로 Bucket 과 파일들을 구성하였습니다.
test-bucket └── year=2023 └── month=12 ├── date=11 │ ├── users.json10 │ ├── users.json11 │ ├── users.json12 │ ├── users.json13 │ ├── users.json14 │ ├── users.json15 │ ├── users.json16 │ ├── users.json17 │ ├── users.json18 │ └── users.json19 ├── date=12 │ ├── users.json0 │ ├── users.json1 │ ├── users.json2 │ ├── users.json3 │ ├── users.json4 │ ├── users.json5 │ ├── users.json6 │ ├── users.json7 │ ├── users.json8 │ └── users.json9 └── date=13 ├── users.json20 ├── users.json21 ├── users.json22 ├── users.json23 ├── users.json24 ├── users.json25 ├── users.json26 ├── users.json27 ├── users.json28 └── users.json29
그리고 users.json 은 아래와 같은 형식의 user 정보가 json & singleline 형식으로 저장됩니다.
{"id":10,"name":"bokZJrVHsM","age":73,"gender":"Female","birth":"1991-02-08"}
Spark 로 S3 Object 조회하기.
먼저 build.sbt 은 아래와 같습니다.
저는 Spark 와 Hadoop 버전을 3.2.4 를 사용하였구요.
아래와 같이 버저닝을 한 경우에 충돌없이 실행되었습니다.
version := "0.1.0-SNAPSHOT" scalaVersion := "2.13.12" val sparkVersion = "3.2.4" val hadoopVersion = "3.2.4" lazy val root = (project in file(".")) .settings( name := "spark" ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.hadoop" % "hadoop-aws" % hadoopVersion )
Spark S3 Json Reader.
Minio 에 저장된 Json 파일들을 읽는 Spark Program 은 아래와 같습니다.
코드의 내용은 단순히 Json 파일들을 조회하여 Count 와 Sampling 하는 코드입니다.
package test.spark.reader import org.apache.spark.sql.SparkSession object S3Reader { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("test") .master("local[*]") .config("spark.driver.bindAddress", "localhost") .config("spark.hadoop.fs.s3a.access.key", "EJ258LieJe9TAA3mY8pO") .config("spark.hadoop.fs.s3a.secret.key", "7b0gq3HYiw7qgAcYbv6imOZFc1BohSKAgxIptcG2") .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("fs.s3a.path.style.access", "true") .config("fs.s3a.endpoint", "http://localhost:9010") .getOrCreate(); val df = spark.read.format("json").load("s3a://test-bucket/year=2023/") val count = df.count() print(count) df.show() spark.close() } }
spark.hadoop.fs.s3a.aws.credentials.provider
spark.hadoop.fs.s3a.aws.credentials.provider 는 S3 데이터에 접근하기 위한 Credential 을 등록하는 방식입니다.
저는 SimpleAWSCredentialsProvider 를 사용하였구요.
SimpleAWSCredentialsProvider 방식은 직접적으로 Access Key 와 Secret Key 를 등록하는 절차입니다.
spark.hadoop.fs.s3a.impl
사용할 FileSystem 의 구현체를 입력합니다.
Fully Qualifed Name 을 사용해야하구요.
s3a 를 사용하기 위한 필수값이라고 생각하시면 됩니다.
org.apache.hadoop.fs.s3a.S3AFileSystem 클래스를 직접 확인해보게 되면,
open, read, mkdir, rmdir, close 등에 해당하는 FileSystem 의 필수 기능들이 S3 sdk 를 통해서 구현되어있습니다.
아래와 같이 말이죠.
public String getBucketLocation(String bucketName) throws IOException { return invoker.retry("getBucketLocation()", bucketName, true, ()-> s3.getBucketLocation(bucketName)); }
fs.s3a.path.style.access
데이터 조회 또는 생성을 위하여 s3 의 Object 에 접근할 때에 어떤 방식으로 URI 를 입력할지에 대한 설정입니다.
종류로는 path style 과 virtual host style 이 존재합니다.
path style 은 "Bucket / File Path" 와 같이 Bucket 과 File Path 를 입력하는 방식입니다.
만약 Bucket 이 "test-bucket" 이고, File 이 "/year=2023/month=12/date=11/users.json" 이라고 한다면
"test-bucket/year=2023/month=12/date=11/users.json" 와 같이 특정 파일을 지칭할 수 있습니다.
virtual host style 은 bucket 의 이름을 subdomain 처럼 사용하여 파일을 지칭하는 방식입니다.
"test-bucket.s3.amazon.com/year=2023/month=12/date=11/users.json" 과 같이
s3.amazon.com 이라는 Domain 과 test-bucket 이라는 Subdomain 을 결합하여 파일을 지정할 수 있습니다.
그리고 fs.s3a.path.style.access 의 기본값은 False 입니다.
fs.s3a.endpoint
마지막으로 저는 S3 가 아니라 Minio 환경에서 데이터를 조회하고 있기 때문에 fs.s3a.endpoint 를 제 로컬 네트워크 주소로 설정하였습니다.
실제 s3 스토리지를 사용한다면, s3.amazonaws.com 으로 설정하시면 됩니다.
마지막으로 실행 결과는 아래와 같습니다.
+---+----------+------+---+----------+-----+----+ |age| birth|gender| id| name|month|date| +---+----------+------+---+----------+-----+----+ | 27|1951-02-18| Male| 0|DGxbImPvOF| 12| 13| | 97|1946-02-19| Male| 1|gJntLcXVkE| 12| 13| | 87|1962-02-15|Female| 2|oDSqVOYTEc| 12| 13| | 27|1932-02-23| Male| 3|cNEeqWIdBz| 12| 13| | 24|2008-02-04|Female| 4|SOMWgcvodL| 12| 13| | 45|2012-02-03|Female| 5|qkTGvAeVzO| 12| 13| | 87|1940-02-21|Female| 6|vKiELTfhBx| 12| 13| | 17|1946-02-19|Female| 7|UbOFMgdQox| 12| 13| | 33|1969-02-13|Female| 8|xmSPEDlyGd| 12| 13| | 70|1942-02-20| Male| 9|cMGCzveAjQ| 12| 13| | 73|1991-02-08|Female| 10|bokZJrVHsM| 12| 13| | 39|2002-02-05| Male| 11|ABUFiXMwhr| 12| 13| | 71|1997-02-06| Male| 12|OWIcPeDLaq| 12| 13| | 93|2013-02-02| Male| 13|QtVhBgYyFw| 12| 13| | 3|1952-02-18|Female| 14|DqifMZygNY| 12| 13| | 13|1955-02-17|Female| 15|lNbwagWCmy| 12| 13| | 12|1958-02-16|Female| 16|WheVZMrQkq| 12| 13| | 86|1939-02-21| Male| 17|UOFJQcWajP| 12| 13| | 18|1965-02-14|Female| 18|hAxGdYSqtC| 12| 13| | 91|1976-02-12| Male| 19|KahxjFczGO| 12| 13| +---+----------+------+---+----------+-----+----+ only showing top 20 rows
마치며.
이번 글을 시작으로 여러 DataSource 로부터의 Spark DataFrameReader 에 대한 글을 작성하려고 합니다.
감사합니다.
반응형'Spark' 카테고리의 다른 글
[Spark] Logical Plan 알아보기 1 (Catalyst Optimizer) (2) 2024.01.28 [Spark] JDBC DataFrameReader 알아보기 (MySQL) (0) 2024.01.28 [Spark] groupBy, RelationalGroupedDataset 알아보기 (0) 2024.01.26 [Spark] Union 알아보기 (0) 2024.01.26 [Spark] fold, reduce 알아보기 ( RDD, Action, Aggregation ) (0) 2024.01.19