-
[Spark] Spark 로 Web File Reader 구현하기 ( SparkFiles )Spark 2024. 3. 18. 07:04728x90반응형
- 목차
들어가며.
Spark 를 활용하여 http 프로토콜로 호스팅되는 웹 파일을 다운로드해야하는 경우가 존재합니다.
일반적인 Spark 의 File Reader 를 통해서 웹파일을 다운로드하는 것을 불가능합니다.
이 과정에서 SparkFiles 모듈을 사용하며, 이는 여러 이점이 존재합니다.
첫번째 이점은 Spark Application 의 메모리보다 큰 용량의 파일을 손쉽게 읽어들일 수 있습니다.
Http Streaming 방식으로 웹파일을 읽어들이는 수고로움을 덜 수 있습니다.
두번째 이점은 기존의 Spark 프로그래밍 패턴을 사용할 수 있습니다.
외부의 모듈을 활용하게 되면, 외부 모듈의 사용법에 따라 프로그래밍을 수행해야합니다.
하지만 SparkFiles 는 Spark 의 내장 모듈로써 Spark 의 프로그래밍 패턴과 유사합니다.
Nginx 로 웹파일 서빙하기.
주제가 Spark 로 웹파일을 다운로드하는 내용이기 때문에 웹서버로써 Nginx 를 사용하도록 하겠습니다.
Nginx 를 활용하여 간단한 웹서버를 구축하는 것은 아래의 예시로 대체하도록 하겠습니다.
https://westlife0615.tistory.com/774
SparkFiles.
SparkFiles 은 SparkContext 에서 사용할 수 있는 Spark 모듈입니다.
이는 로컬 파일 이외의 외부 파일 자원을 Spark Application 의 파일 시스템 내부로 인식할 수 있도록 돕습니다.
이 과정에서 사용되는 함수는 addFile 를 사용하구요.
아래의 문장은 공식 문서에서 확인할 수 있는 내용입니다.
Resolves paths to files added through SparkContext.addFile().
간단한 예시를 작성해볼까요 ?
먼저 간단한 csv 파일들을 생성해보도록 하겠습니다.
csv 파일 생성하기.
아주 간단한 csv 파일을 생성합니다.
그리고 이 csv 파일을 Nginx Web Server 에서 서빙할 수 있도록 설정합니다.
import pandas as pd rows = [ ("Andy", 38, "Seoul"), ("Bob", 48, "LA"), ("Chris", 28, "Busan"), ("Daniel", 18, "Tokyo"), ] pd.DataFrame(rows, columns=["name", "age", "city"]).to_csv("/tmp/test.csv", index=False)
name age city 0 Andy 38 Seoul 1 Bob 48 LA 2 Chris 28 Busan 3 Daniel 18 Tokyo
Nginx 설정하기.
아래의 파일은 csv 파일을 서빙하기 위한 default.conf 설정입니다.
cat <<EOF> /tmp/default.conf server { listen 80; server_name _; location / { root /usr/share/nginx/html; index index.html index.htm; } location /csv/ { root /usr/share/nginx; } } EOF
그리고 아래와 같이 Nginx Docker Container 를 실행하면 test.csv 이라는 웹파일을 조회할 수 있습니다.
docker run -d --name nginx \ -v /tmp/default.conf:/etc/nginx/conf.d/default.conf:ro \ --mount type=bind,source=/tmp/test.csv,target=/usr/share/nginx/csv/test.csv \ -p 8080:80 nginx
curl http://localhost:8080/csv/test.csv name,age,city Andy,38,Seoul Bob,48,LA Chris,28,Busan Daniel,18,Tokyo
SparkFiles 로 웹파일 조회하기.
아래와 같이 Http 웹파일을 조회하게 되면, Spark 는 웹파일을 인식하지 못합니다.
from pyspark.sql import SparkSession from pyspark import SparkConf conf = (SparkConf() .setMaster("local[*]") .setAppName("test-spark-files") .set("spark.driver.bindAddress", "0.0.0.0")) spark = SparkSession.builder.config(conf=conf).getOrCreate() df = (spark.read .format("csv") .option("header", True) .load("http://localhost:8080/csv/test.csv")) df.show() spark.stop()
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load. : java.lang.UnsupportedOperationException
그래서 HTTP 프로토콜로 서빙되는 웹파일을 읽어들이기 위해서 SparkFIles 가 사용됩니다.
SparkFiles 이 사용되는 예시는 아래와 같습니다.
from pyspark.sql import SparkSession from pyspark import SparkConf, SparkFiles conf = (SparkConf() .setMaster("local[*]") .setAppName("test-spark-files") .set("spark.driver.bindAddress", "0.0.0.0")) spark = SparkSession.builder.config(conf=conf).getOrCreate() context = spark.sparkContext context.addFile("http://localhost:8080/csv/test.csv") file = SparkFiles.get("test.csv") df = (spark.read .format("csv") .option("header", True) .load(file)) df.show() spark.stop()
+------+---+-----+ | name|age| city| +------+---+-----+ | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| +------+---+-----+
메모리보다 웹파일의 크기가 더 큰 경우.
조회하는 웹파일의 크기가 Spark Application 이 사용하는 메모리의 크기보다 훨씬 큰 경우에 SparkFiles 는 유용하게 사용됩니다.
예를 들어 웹파일의 크기가 10GB 이고, Spark Application 의 메모리가 1GB 라고 가정하겠습니다.
일반적으로 이러한 경우에는 파일의 처리가 쉽지 않습니다.
OOM 에러에 빠질 가능성이 큽니다.
하지만 SparkFiles 을 사용하는 경우에는 웹파일을 조회하는 과정에서 OOM 케이스에 빠지지 않습니다.
먼저 아래와 같은 도커파일을 생성합니다.
5GB 웹파일 생성.
import pandas as pd rows = [ ("Andy", 38, "Seoul"), ("Bob", 48, "LA"), ("Chris", 28, "Busan"), ("Daniel", 18, "Tokyo"), ] df = pd.DataFrame(rows, columns=["name", "age", "city"]) for i in range(30): print(i) df = pd.concat([df, df]) df.to_csv("/tmp/test.csv", index=False)
test.py
from pyspark.sql import SparkSession from pyspark import SparkFiles spark = SparkSession.builder.getOrCreate() context = spark.sparkContext context.addFile("http://host.docker.internal:8080/csv/test.csv") file = SparkFiles.get("test.csv") df = (spark.read .format("csv") .option("header", True) .load(file)) df.show() spark.stop()
Dockerfile
docker build -t test:0.0.1 .
FROM spark:3.4.1 WORKDIR /opt/app/ USER ${spark_uid} ADD test.py test.py RUN pip install pyspark==3.4.1 RUN pip install py4j==0.10.9.5
아래와 같이 1GB 의 메모리를 할당한 Spark Docker Container 에서 5GB 인 웹파일을 조회합니다.
docker run -d --cpus='1' -m='1g' test:0.0.1 \ spark-submit --master 'local[*]' \ --name test --conf spark.driver.bindAddress=0.0.0.0 \ /opt/app/test.py
조회된 파일은 아래와 같이 특정 위치의 디렉토리에 저장되며, 메모리는 할당한 1GB 를 넘지 않습니다.
ls -lh total 14G -rw-r--r-- 1 root root 6.9G May 30 11:55 fetchFileTemp4976345169363839730.tmp -rwxr-xr-x 1 root root 6.9G May 30 11:55 test.csv
<출력>
+------+---+-----+ | name|age| city| +------+---+-----+ | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| | Andy| 38|Seoul| | Bob| 48| LA| | Chris| 28|Busan| |Daniel| 18|Tokyo| +------+---+-----+ only showing top 20 rows
반응형'Spark' 카테고리의 다른 글
[Spark] Spark Cluster 를 구축하기 ( Docker ) (0) 2024.05.17 [Spark] Window 알아보기 ( lag, lead, sum ) (0) 2024.05.15 [Spark] Row 알아보기 (0) 2024.03.03 [Spark] approxCountDistinct 알아보기 (0) 2024.02.22 [Spark] Logical Plan 알아보기 1 (Catalyst Optimizer) (2) 2024.01.28