-
[Spark] SparkSQL CSV 파일 Aggregation 하기Spark 2023. 10. 3. 22:01728x90반응형
- 목차
소개.
Spark 를 학습하기 위해서 간단한 예시를 기록해두려고 합니다.
MovieRating 이라는 csv 파일을 읽어들이고 Aggregation 을 진행합니다.
MovieRating.csv 파일은 아래 Web Link 에서 제공되는 영화 평점과 관련된 데이터셋입니다.
https://raw.githubusercontent.com/prasertcbs/basic-dataset/master/Movie%20Ratings.csv
아래와 같은 형식으로 작성된 csv 파일입니다.
UserId,MovieId,Rating,Timestamp 1,68646,10,1381620027 1,113277,10,1379466669 2,454876,8,1394818630 2,790636,7,1389963947 2,816711,8,1379963769 2,1091191,7,1391173869
csv 파일 처리하기.
MovieRatings.csv 를 처리하여 3가지 결과를 만들어냅니다.
1. avgRatingPerUser : 사용자 당 평균 평점.
2. countMoviePerUser : 사용자 당 평점을 매긴 영화의 수.
3. recentRatingRow : Timestamp 가 1381620027 인 Row 들.
전체 Row 를 가지는 dataFrameWithColumns Dataset 을 기반으로
groupBy, agg, filter 연산을 수행하였습니다.
package org.example; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; public class ReadTextFile { public static void main(String[] args) { SparkSession session = SparkSession.builder() .appName("processor-movie-ratings") .master("local[*]") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.port", "9001") .getOrCreate(); Dataset<Row> dataFrame = session.read().csv("/tmp/MovieRatings.csv"); String[] columns = new String[]{"UserId", "MovieId", "Rating", "Timestamp"}; Dataset<Row> dataFrameWithColumns = dataFrame.toDF(columns); Dataset<Row> avgRatingPerUser = dataFrameWithColumns.groupBy("UserId") .agg(functions.avg("Rating")); avgRatingPerUser.show(); Dataset<Row> countMoviePerUser = dataFrameWithColumns.groupBy("UserId") .agg(functions.count("MovieId")); countMoviePerUser.show(); Dataset<Row> recentRatingRow = dataFrameWithColumns .filter("Timestamp >= 1381620027") .filter("Timestamp <= 1381620027"); recentRatingRow.show(); session.stop(); } }
< avgRatingPerUser Dataset >
+------+-----------------+ |UserId| avg(Rating)| +------+-----------------+ | 296| 8.0| | 467| 6.0| | 675| 7.25| | 691| 8.0| | 829|5.333333333333333| | 1090| 10.0| | 1159|6.631578947368421| | 1436|8.166666666666666| | 1512| 7.0| | 1572| 10.0| | 2069| 5.0| | 2088| 7.1| | 2136| 8.0| | 2162| 8.6| | 2294| 7.0| | 2904| 10.0| | 3210| 9.0| | 3414|7.904761904761905| | 3606| 9.0| | 3959|7.529411764705882| +------+-----------------+ only showing top 20 rows
< countMoviePerUser Dataset >
+------+--------------+ |UserId|count(MovieId)| +------+--------------+ | 296| 1| | 467| 1| | 675| 4| | 691| 1| | 829| 21| | 1090| 1| | 1159| 57| | 1436| 6| | 1512| 1| | 1572| 1| | 2069| 1| | 2088| 10| | 2136| 2| | 2162| 5| | 2294| 1| | 2904| 1| | 3210| 1| | 3414| 21| | 3606| 1| | 3959| 17| +------+--------------+ only showing top 20 rows
< recentRatingRow Dataset >
+------+-------+------+----------+ |UserId|MovieId|Rating| Timestamp| +------+-------+------+----------+ | 1| 68646| 10|1381620027| +------+-------+------+----------+
반응형'Spark' 카테고리의 다른 글
Helm 으로 Spark 구축하기 (0) 2023.12.15 Spark Driver Program 알아보기 (0) 2023.12.06 [Spark] Spark Lazy Evaluation 알아보기 (0) 2023.05.22 [Spark] RDD 로 숫자 처리하기 (0) 2023.01.19 [Spark] Docker 로 Spark Cluster 구현하기 ( Standalone Mode ) (0) 2022.07.18