ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Spark 로 Web File Reader 구현하기 ( SparkFiles )
    Spark 2024. 3. 18. 07:04
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Spark 를 활용하여 http 프로토콜로 호스팅되는 웹 파일을 다운로드해야하는 경우가 존재합니다.

    일반적인 Spark 의 File Reader 를 통해서 웹파일을 다운로드하는 것을 불가능합니다.

    이 과정에서 SparkFiles 모듈을 사용하며, 이는 여러 이점이 존재합니다.

    첫번째 이점은 Spark Application 의 메모리보다 큰 용량의 파일을 손쉽게 읽어들일 수 있습니다.

    Http Streaming 방식으로 웹파일을 읽어들이는 수고로움을 덜 수 있습니다.

    두번째 이점은 기존의 Spark 프로그래밍 패턴을 사용할 수 있습니다.

    외부의 모듈을 활용하게 되면, 외부 모듈의 사용법에 따라 프로그래밍을 수행해야합니다.

    하지만 SparkFiles 는 Spark 의 내장 모듈로써 Spark 의 프로그래밍 패턴과 유사합니다.

     

    Nginx 로 웹파일 서빙하기.

    주제가 Spark 로 웹파일을 다운로드하는 내용이기 때문에 웹서버로써 Nginx 를 사용하도록 하겠습니다.

    Nginx 를 활용하여 간단한 웹서버를 구축하는 것은 아래의 예시로 대체하도록 하겠습니다.

     

    https://westlife0615.tistory.com/774

     

    [Nginx] Web Content Serving 알아보기 ( default.conf )

    - 목차 들어가며.이번 글에서는 Nginx 를 활용하여 웹 서버를 구성하는 방법에 대해서 알아보도록 하겠습니다.xml, csv, 이미지 파일 등 여러가지 웹파일을 서빙하는 웹서버를 Nginx 로 구성할 수 있

    westlife0615.tistory.com

     

     

    SparkFiles.

    SparkFiles 은 SparkContext 에서 사용할 수 있는 Spark 모듈입니다.

    이는 로컬 파일 이외의 외부 파일 자원을 Spark Application 의 파일 시스템 내부로 인식할 수 있도록 돕습니다.

    이 과정에서 사용되는 함수는 addFile 를 사용하구요.

    아래의 문장은 공식 문서에서 확인할 수 있는 내용입니다.

    Resolves paths to files added through SparkContext.addFile().

     

    간단한 예시를 작성해볼까요 ?

    먼저 간단한 csv 파일들을 생성해보도록 하겠습니다.

     

    csv 파일 생성하기.

    아주 간단한 csv 파일을 생성합니다.

    그리고 이 csv 파일을 Nginx Web Server 에서 서빙할 수 있도록 설정합니다.

    import pandas as pd
    
    rows = [
        ("Andy", 38, "Seoul"),
        ("Bob", 48, "LA"),
        ("Chris", 28, "Busan"),
        ("Daniel", 18, "Tokyo"),
    ]
    pd.DataFrame(rows, columns=["name", "age", "city"]).to_csv("/tmp/test.csv", index=False)

     

         name  age   city
    0    Andy   38  Seoul
    1     Bob   48     LA
    2   Chris   28  Busan
    3  Daniel   18  Tokyo

     

    Nginx 설정하기.

    아래의 파일은 csv 파일을 서빙하기 위한 default.conf 설정입니다.

    cat <<EOF> /tmp/default.conf
    server {
        listen       80;
        server_name  _;
    
        location / {
            root   /usr/share/nginx/html;
            index  index.html index.htm;
        }
        
        location /csv/ {
            root   /usr/share/nginx;
        }    
    
    }
    EOF

     

    그리고 아래와 같이 Nginx Docker Container 를 실행하면 test.csv 이라는 웹파일을 조회할 수 있습니다.

    docker run -d --name nginx \
    -v /tmp/default.conf:/etc/nginx/conf.d/default.conf:ro \
    --mount type=bind,source=/tmp/test.csv,target=/usr/share/nginx/csv/test.csv \
    -p 8080:80 nginx

     

    curl http://localhost:8080/csv/test.csv
    
    name,age,city
    Andy,38,Seoul
    Bob,48,LA
    Chris,28,Busan
    Daniel,18,Tokyo

     

     

    SparkFiles 로 웹파일 조회하기.

    아래와 같이 Http 웹파일을 조회하게 되면, Spark 는 웹파일을 인식하지 못합니다.

    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    
    conf = (SparkConf()
            .setMaster("local[*]")
            .setAppName("test-spark-files")
            .set("spark.driver.bindAddress", "0.0.0.0"))
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    df = (spark.read
          .format("csv")
          .option("header", True)
          .load("http://localhost:8080/csv/test.csv"))
    df.show()
    spark.stop()
    py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
    : java.lang.UnsupportedOperationException

     

    그래서 HTTP 프로토콜로 서빙되는 웹파일을 읽어들이기 위해서 SparkFIles 가 사용됩니다.

    SparkFiles 이 사용되는 예시는 아래와 같습니다.

    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkFiles
    
    conf = (SparkConf()
            .setMaster("local[*]")
            .setAppName("test-spark-files")
            .set("spark.driver.bindAddress", "0.0.0.0"))
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    context = spark.sparkContext
    context.addFile("http://localhost:8080/csv/test.csv")
    file = SparkFiles.get("test.csv")
    
    df = (spark.read
          .format("csv")
          .option("header", True)
          .load(file))
    df.show()
    spark.stop()

     

    +------+---+-----+
    |  name|age| city|
    +------+---+-----+
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    +------+---+-----+

     

     

    메모리보다 웹파일의 크기가 더 큰 경우.

    조회하는 웹파일의 크기가 Spark Application 이 사용하는 메모리의 크기보다 훨씬 큰 경우에 SparkFiles 는 유용하게 사용됩니다.

    예를 들어 웹파일의 크기가 10GB 이고, Spark Application 의 메모리가 1GB 라고 가정하겠습니다.

    일반적으로 이러한 경우에는 파일의 처리가 쉽지 않습니다.

    OOM 에러에 빠질 가능성이 큽니다.

     

    하지만 SparkFiles 을 사용하는 경우에는 웹파일을 조회하는 과정에서 OOM 케이스에 빠지지 않습니다.

    먼저 아래와 같은 도커파일을 생성합니다.

     

    5GB 웹파일 생성.

    import pandas as pd
    
    rows = [
        ("Andy", 38, "Seoul"),
        ("Bob", 48, "LA"),
        ("Chris", 28, "Busan"),
        ("Daniel", 18, "Tokyo"),
    ]
    df = pd.DataFrame(rows, columns=["name", "age", "city"])
    for i in range(30):
        print(i)
        df = pd.concat([df, df])
    
    df.to_csv("/tmp/test.csv", index=False)

     

    test.py

    from pyspark.sql import SparkSession
    from pyspark import  SparkFiles
    
    spark = SparkSession.builder.getOrCreate()
    context = spark.sparkContext
    context.addFile("http://host.docker.internal:8080/csv/test.csv")
    file = SparkFiles.get("test.csv")
    
    df = (spark.read
          .format("csv")
          .option("header", True)
          .load(file))
    df.show()
    spark.stop()

     

    Dockerfile

    docker build -t test:0.0.1 .

    FROM spark:3.4.1
    WORKDIR /opt/app/
    USER ${spark_uid}
    ADD test.py test.py
    RUN pip install pyspark==3.4.1
    RUN pip install py4j==0.10.9.5

     

    아래와 같이 1GB 의 메모리를 할당한 Spark Docker Container 에서 5GB 인 웹파일을 조회합니다.

     

    docker run -d --cpus='1' -m='1g' test:0.0.1 \
    spark-submit --master 'local[*]' \
    --name test --conf spark.driver.bindAddress=0.0.0.0 \
    /opt/app/test.py

     

    조회된 파일은 아래와 같이 특정 위치의 디렉토리에 저장되며, 메모리는 할당한 1GB 를 넘지 않습니다.

    ls -lh
    total 14G
    -rw-r--r-- 1 root root 6.9G May 30 11:55 fetchFileTemp4976345169363839730.tmp
    -rwxr-xr-x 1 root root 6.9G May 30 11:55 test.csv

     

    <출력>

    +------+---+-----+
    |  name|age| city|
    +------+---+-----+
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    |  Andy| 38|Seoul|
    |   Bob| 48|   LA|
    | Chris| 28|Busan|
    |Daniel| 18|Tokyo|
    +------+---+-----+
    only showing top 20 rows

     

    반응형
Designed by Tistory.