ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] SparkSQL CSV 파일 Aggregation 하기
    Spark 2023. 10. 3. 22:01
    728x90
    반응형

    - 목차

     

     

    소개.

    Spark 를 학습하기 위해서 간단한 예시를 기록해두려고 합니다.

    MovieRating 이라는 csv 파일을 읽어들이고 Aggregation 을 진행합니다.

    MovieRating.csv 파일은 아래 Web Link 에서 제공되는 영화 평점과 관련된 데이터셋입니다.

     

    https://raw.githubusercontent.com/prasertcbs/basic-dataset/master/Movie%20Ratings.csv

     

    아래와 같은 형식으로 작성된 csv 파일입니다.

    UserId,MovieId,Rating,Timestamp
    1,68646,10,1381620027
    1,113277,10,1379466669
    2,454876,8,1394818630
    2,790636,7,1389963947
    2,816711,8,1379963769
    2,1091191,7,1391173869

     

     

    csv 파일 처리하기.

     

    MovieRatings.csv 를 처리하여 3가지 결과를 만들어냅니다.

    1. avgRatingPerUser : 사용자 당 평균 평점.

    2. countMoviePerUser : 사용자 당 평점을 매긴 영화의 수.

    3. recentRatingRow : Timestamp 가 1381620027 인 Row 들.

     

    전체 Row 를 가지는 dataFrameWithColumns Dataset 을 기반으로

    groupBy, agg, filter 연산을 수행하였습니다.

     

    package org.example;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.functions;
    
    
    public class ReadTextFile {
      public static void main(String[] args) {
        SparkSession session = SparkSession.builder()
                .appName("processor-movie-ratings")
                .master("local[*]")
                .config("spark.driver.bindAddress", "127.0.0.1")
                .config("spark.driver.port", "9001")
                .getOrCreate();
    
        Dataset<Row> dataFrame = session.read().csv("/tmp/MovieRatings.csv");
        String[] columns = new String[]{"UserId", "MovieId", "Rating", "Timestamp"};
        Dataset<Row> dataFrameWithColumns = dataFrame.toDF(columns);
    
        Dataset<Row> avgRatingPerUser = dataFrameWithColumns.groupBy("UserId")
                        .agg(functions.avg("Rating"));
        avgRatingPerUser.show();
    
        Dataset<Row> countMoviePerUser = dataFrameWithColumns.groupBy("UserId")
                .agg(functions.count("MovieId"));
        countMoviePerUser.show();
    
        Dataset<Row> recentRatingRow = dataFrameWithColumns
                .filter("Timestamp >= 1381620027")
                .filter("Timestamp <= 1381620027");
        recentRatingRow.show();
    
        session.stop();
      }
    }

     

     

    < avgRatingPerUser Dataset >

    +------+-----------------+
    |UserId|      avg(Rating)|
    +------+-----------------+
    |   296|              8.0|
    |   467|              6.0|
    |   675|             7.25|
    |   691|              8.0|
    |   829|5.333333333333333|
    |  1090|             10.0|
    |  1159|6.631578947368421|
    |  1436|8.166666666666666|
    |  1512|              7.0|
    |  1572|             10.0|
    |  2069|              5.0|
    |  2088|              7.1|
    |  2136|              8.0|
    |  2162|              8.6|
    |  2294|              7.0|
    |  2904|             10.0|
    |  3210|              9.0|
    |  3414|7.904761904761905|
    |  3606|              9.0|
    |  3959|7.529411764705882|
    +------+-----------------+
    only showing top 20 rows

     

     

    < countMoviePerUser Dataset >

     

    +------+--------------+
    |UserId|count(MovieId)|
    +------+--------------+
    |   296|             1|
    |   467|             1|
    |   675|             4|
    |   691|             1|
    |   829|            21|
    |  1090|             1|
    |  1159|            57|
    |  1436|             6|
    |  1512|             1|
    |  1572|             1|
    |  2069|             1|
    |  2088|            10|
    |  2136|             2|
    |  2162|             5|
    |  2294|             1|
    |  2904|             1|
    |  3210|             1|
    |  3414|            21|
    |  3606|             1|
    |  3959|            17|
    +------+--------------+
    only showing top 20 rows

     

     

     

    < recentRatingRow Dataset >

    +------+-------+------+----------+
    |UserId|MovieId|Rating| Timestamp|
    +------+-------+------+----------+
    |     1|  68646|    10|1381620027|
    +------+-------+------+----------+

     

    반응형
Designed by Tistory.