ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] S3 DataFrameReader 구현하기 (s3a)
    Spark 2024. 1. 28. 07:39
    728x90
    반응형

    - 목차

     

    Hadoop-AWS module.

    Spark 에서 S3 Object 들을 읽어들이기 위해서 s3a 프로토콜을 사용해야합니다.

    s3a 는 BigData Processing Workload 를 처리하기 위해서 개발된 S3 의 새로운 기능이구요.

    BigData Processing Workload 라 함은 분산저장된 여러 Blocks 들을 효율적으로 조회하고 처리함을 뜻합니다.

     

    s3a 은 S3 의 Object File 들을 Hadoop 의 분산 저장된 여러 Block 들을 조회하는 것과 같이

    효율적인 Object 조회를 돕는 프로토콜 또는 파일시스템이구요.

    아래 이미지와 같이 여러 Object 들을 병렬/동시적으로 조회하여 데이터를 처리할 수 있습니다.

    s3a File System

     

    Hadoop-AWS 모듈은 s3a 를 처리하기 위한 클라이언트 기능을 제공합니다.

    즉, s3a 프로토콜을 해석할 수 있는 S3 Storage Server 의 클라이언트인 것이죠.

    그래서 S3 에 저장된 파일들을 처리하기 위해서 우선적으로 Hadoop-AWS 모듈이 필요합니다.

     

    Minio 설치하기.

    저는 S3 대신 Minio 를 사용하였고, Minio 세팅을 위한 링크를 아래에 첨부하겠습니다.

     

    https://westlife0615.tistory.com/582

     

    Docker 로 Minio Storage 구현하기

    - 목차 들어가며. Docker Container 를 기반으로 Minio Storage 를 생성하는 방법에 대해서 알아보도록 하겠습니다. AWS S3 같은 Object Storage 는 Data-Lake 의 용도로 활용됩니다. 그래서 Kafka-Connect, Spark, Flink 와

    westlife0615.tistory.com

     

    저는 아래와 같은 구조로 Bucket 과 파일들을 구성하였습니다.

    test-bucket
        └── year=2023
            └── month=12
                ├── date=11
                │   ├── users.json10
                │   ├── users.json11
                │   ├── users.json12
                │   ├── users.json13
                │   ├── users.json14
                │   ├── users.json15
                │   ├── users.json16
                │   ├── users.json17
                │   ├── users.json18
                │   └── users.json19
                ├── date=12
                │   ├── users.json0
                │   ├── users.json1
                │   ├── users.json2
                │   ├── users.json3
                │   ├── users.json4
                │   ├── users.json5
                │   ├── users.json6
                │   ├── users.json7
                │   ├── users.json8
                │   └── users.json9
                └── date=13
                    ├── users.json20
                    ├── users.json21
                    ├── users.json22
                    ├── users.json23
                    ├── users.json24
                    ├── users.json25
                    ├── users.json26
                    ├── users.json27
                    ├── users.json28
                    └── users.json29

     

    그리고 users.json 은 아래와 같은 형식의 user 정보가 json & singleline 형식으로 저장됩니다.

    {"id":10,"name":"bokZJrVHsM","age":73,"gender":"Female","birth":"1991-02-08"}

     

     

     

     

     

    Spark 로 S3 Object 조회하기.

    먼저 build.sbt 은 아래와 같습니다.

    저는 Spark 와 Hadoop 버전을 3.2.4 를 사용하였구요.

    아래와 같이 버저닝을 한 경우에 충돌없이 실행되었습니다.

    version := "0.1.0-SNAPSHOT"
    scalaVersion := "2.13.12"
    val sparkVersion = "3.2.4"
    val hadoopVersion = "3.2.4"
    
    lazy val root = (project in file("."))
      .settings(
        name := "spark"
      )
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
      "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
    )

     

     

    Spark S3 Json Reader.

    Minio 에 저장된 Json 파일들을 읽는 Spark Program 은 아래와 같습니다.

    코드의 내용은 단순히 Json 파일들을 조회하여 Count 와 Sampling 하는 코드입니다.

     

    package test.spark.reader
    
    import org.apache.spark.sql.SparkSession
    
    
    object S3Reader {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .master("local[*]")
          .config("spark.driver.bindAddress", "localhost")
          .config("spark.hadoop.fs.s3a.access.key", "EJ258LieJe9TAA3mY8pO")
          .config("spark.hadoop.fs.s3a.secret.key", "7b0gq3HYiw7qgAcYbv6imOZFc1BohSKAgxIptcG2")
          .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
          .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
          .config("fs.s3a.path.style.access", "true")
          .config("fs.s3a.endpoint", "http://localhost:9010")
          .getOrCreate();
    
       val df = spark.read.format("json").load("s3a://test-bucket/year=2023/")
        val count = df.count()
        print(count)
        df.show()
        spark.close()
    
      }
    }

     

    spark.hadoop.fs.s3a.aws.credentials.provider

    spark.hadoop.fs.s3a.aws.credentials.provider 는 S3 데이터에 접근하기 위한 Credential 을 등록하는 방식입니다.

    저는 SimpleAWSCredentialsProvider 를 사용하였구요.

    SimpleAWSCredentialsProvider 방식은 직접적으로 Access Key 와 Secret Key 를 등록하는 절차입니다.

     

    spark.hadoop.fs.s3a.impl

    사용할 FileSystem 의 구현체를 입력합니다.

    Fully Qualifed Name 을 사용해야하구요.

    s3a 를 사용하기 위한 필수값이라고 생각하시면 됩니다.

    org.apache.hadoop.fs.s3a.S3AFileSystem 클래스를 직접 확인해보게 되면,

    open, read, mkdir, rmdir, close 등에 해당하는 FileSystem 의 필수 기능들이 S3 sdk 를 통해서 구현되어있습니다.

    아래와 같이 말이죠.

    public String getBucketLocation(String bucketName) throws IOException {
        return invoker.retry("getBucketLocation()", bucketName, true,
            ()-> s3.getBucketLocation(bucketName));
      }

     

    fs.s3a.path.style.access

    데이터 조회 또는 생성을 위하여 s3 의 Object 에 접근할 때에 어떤 방식으로 URI 를 입력할지에 대한 설정입니다.

    종류로는 path style 과 virtual host style 이 존재합니다.

    path style 은 "Bucket / File Path" 와 같이 Bucket 과 File Path 를 입력하는 방식입니다.

    만약 Bucket 이 "test-bucket" 이고, File 이 "/year=2023/month=12/date=11/users.json" 이라고 한다면

    "test-bucket/year=2023/month=12/date=11/users.json" 와 같이 특정 파일을 지칭할 수 있습니다.

     

    virtual host style 은 bucket 의 이름을 subdomain 처럼 사용하여 파일을 지칭하는 방식입니다.

    "test-bucket.s3.amazon.com/year=2023/month=12/date=11/users.json" 과 같이

    s3.amazon.com 이라는 Domain 과 test-bucket 이라는 Subdomain 을 결합하여 파일을 지정할 수 있습니다.

     

    그리고 fs.s3a.path.style.access 의 기본값은 False 입니다.

     

     

    fs.s3a.endpoint

    마지막으로 저는 S3 가 아니라 Minio 환경에서 데이터를 조회하고 있기 때문에 fs.s3a.endpoint 를 제 로컬 네트워크 주소로 설정하였습니다.

    실제 s3 스토리지를 사용한다면, s3.amazonaws.com 으로 설정하시면 됩니다.

     

     

    마지막으로 실행 결과는 아래와 같습니다.

    +---+----------+------+---+----------+-----+----+
    |age|     birth|gender| id|      name|month|date|
    +---+----------+------+---+----------+-----+----+
    | 27|1951-02-18|  Male|  0|DGxbImPvOF|   12|  13|
    | 97|1946-02-19|  Male|  1|gJntLcXVkE|   12|  13|
    | 87|1962-02-15|Female|  2|oDSqVOYTEc|   12|  13|
    | 27|1932-02-23|  Male|  3|cNEeqWIdBz|   12|  13|
    | 24|2008-02-04|Female|  4|SOMWgcvodL|   12|  13|
    | 45|2012-02-03|Female|  5|qkTGvAeVzO|   12|  13|
    | 87|1940-02-21|Female|  6|vKiELTfhBx|   12|  13|
    | 17|1946-02-19|Female|  7|UbOFMgdQox|   12|  13|
    | 33|1969-02-13|Female|  8|xmSPEDlyGd|   12|  13|
    | 70|1942-02-20|  Male|  9|cMGCzveAjQ|   12|  13|
    | 73|1991-02-08|Female| 10|bokZJrVHsM|   12|  13|
    | 39|2002-02-05|  Male| 11|ABUFiXMwhr|   12|  13|
    | 71|1997-02-06|  Male| 12|OWIcPeDLaq|   12|  13|
    | 93|2013-02-02|  Male| 13|QtVhBgYyFw|   12|  13|
    |  3|1952-02-18|Female| 14|DqifMZygNY|   12|  13|
    | 13|1955-02-17|Female| 15|lNbwagWCmy|   12|  13|
    | 12|1958-02-16|Female| 16|WheVZMrQkq|   12|  13|
    | 86|1939-02-21|  Male| 17|UOFJQcWajP|   12|  13|
    | 18|1965-02-14|Female| 18|hAxGdYSqtC|   12|  13|
    | 91|1976-02-12|  Male| 19|KahxjFczGO|   12|  13|
    +---+----------+------+---+----------+-----+----+
    only showing top 20 rows

     

     

    마치며.

    이번 글을 시작으로 여러 DataSource 로부터의 Spark DataFrameReader 에 대한 글을 작성하려고 합니다.

    감사합니다.

     

    반응형
Designed by Tistory.