ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark DataFrame 알아보기
    Spark 2023. 12. 15. 06:39
    728x90
    반응형

     

    - 목차

     

     

    소개.

    Spark 의 DataFrame 은 Pandas 의 DataFrame 처럼 행과 열을 가지는 데이터를 표현하는 방식입니다.

    즉, Matrix 를 표현할 수 있는 Spark 의 자료구조입니다.

    뿐만 아니라 DataFrame 은 Distributed Collection of Data 라고 불립니다.

    Matrix 를 표현하는 범위를 넘어서 분산처리가 가능한 자료구조입니다.

    정리하면, DataFrame 을 분산처리가 가능한 Spark 의 데이터 단위입니다.

    이번 글에서 Spark 의 DataFrame 에 대해서 자세히 일아보려고 합니다.

     

    DataFrame 이란.

    DataFrame 에 대해서 알아보기 이전에 Spark 의 자료구조들에 대해서 알아보려고 합니다.

    - Row

    - Seq

    - Dataset

     

    Row.

    Row 는 DataFrame 을 구성하는 개별 데이터를 의미합니다.

    SQL 기반의 데이터베이스나 Matrix 에서 뜻하는 로우와 의미를 같이합니다.

     

    아래 예시와 같이 Row 를 생성할 수 있는데요.

    StructType structType = new StructType()
                .add("name", DataTypes.StringType)
                .add("age", DataTypes.IntegerType)
                .add("city", DataTypes.StringType);
                
    Row row1 = RowFactory.create("Andy", 31, "Seoul");
    Row row2 = RowFactory.create("Bob", 44, "Daegu");
    Row row3 = RowFactory.create("Chris", 22, "Busan");

     

    Column 이 "이름", "나이", "도시" 와 같은 Matrix 에서 Row 들은

    - ("Andy", 31, "Seoul")
    - ("Bob", 44, "Daegu")
    - ("Chris", 22, "Busan")

    이런 방식으로 표현될 수 있겠죠?

     

    Row 는 DataFrame 을 이루는 데이터의 최소 단위가 됩니다.

     

    Seq.

    Seq 는 Row 들을 구성하는 List 형식의 자료구조입니다.

    아래와 같은 형식으로 사용되구요.

    Java 의 List 자료구조와 거의 유사하다고 생각하시면 됩니다.

     val rows = Seq(
      Row(1, "John", 25),
      Row(2, "Jane", 30),
      Row(3, "Bob", 22)
    )

     

     

    Dataset.

    DataFrame 은 Row 와 Seq 를 조합한 Dataset<Row> 라는 형식으로 표현됩니다.

    예시 코드를 작성해보면 아래와 같습니다.

    Row row1 = RowFactory.create("Andy", 31, "Seoul");
    Row row2 = RowFactory.create("Bob", 44, "Daegu");
    Row row3 = RowFactory.create("Chris", 22, "Busan");
    
    StructType structType = new StructType()
            .add("name", DataTypes.StringType)
            .add("age", DataTypes.IntegerType)
            .add("city", DataTypes.StringType);
    
    List<Row> source = new ArrayList<>();
    source.add(row1);
    source.add(row2);
    source.add(row3);
    
    Dataset<Row> dataset = session.createDataFrame(source, structType);

     

    Row, Seq 그리고 StructType 이 모여 DataFrame 을 구성하게 되구요.

    DataFrame 은 Dataset<Row> 타입의 데이터가 됩니다.

    생성된 Dataset 은 Transformation & Action 을 통하여 분산처리가 됩니다.

     

    Dataset 구성.

    DataFrame 은 Dataset<Row> 의 타입의 객체로써 관리됩니다.

    그리고 Dataset 는 Matrix 를 표현하는 자료구조 이외의 중요한 기능들을 가지는데요.

    이러한 데이터를 표현하는 기능 외에도

     

    - Lazy Evaluation

    - 분산 처리

    - Optimization

    - Serialization/Deserialization

    등의 데이터 처리 관점의 기능 또한 가지고 있습니다.

     

    logicalPlan.

    Dataset 클래스는 logicalPlan 이라는 구성요소를 가집니다.

    이는 Dataset 또는 DataFrame 이 어떤 Transformation 과정을 거치는지에 대한 기록입니다.

    실질적으로 Logical Execution Plan 이라고 불리며,

    이는 Optimizer 에 의해서 Physical Execution Plan 으로 바뀌게 됩니다.

     

    아래 예시는 각각이 Dataset 이 Transformation 을 거치게 되면,

    어떤 localPlan 을 가지는지를 표현하고 있습니다.

    Dataset<Row> dataset = session.createDataFrame(source, structType);
    --> LocalRelation [name#0, age#1, city#2]
    
    dataset = dataset.filter("age > 22");
    --> Filter (age#1 > 22)
    --> +- LocalRelation [name#0, age#1, city#2]
    
    dataset = dataset.filter("city != 'Seoul'");
    --> 'Filter NOT ('city = Seoul)
    --> +- Filter (age#1 > 22)
    -->    +- LocalRelation [name#0, age#1, city#2]

     

    예를 들어,

    dataset.filter("age > 22")

    "Filter (age#1 > 22 )" 라는 Logical Plan 으로 표현되게 되는 것이죠.

     

    encoder.

    Spark 의 DataFrame 은 분산 처리 가능한 데이터 형식입니다.

    그리고 데이터의 분산 처리에는 반드시 "직렬화/역직렬화" 에 대한 정의가 필수적입니다.

    왜냐하면 여러 서버로 데이터들이 전달되는 과정을 겪기 때문이죠.

    Dataset 의 encoder 은 어떤 Serialization 방식을 사용하는지에 대한 정의가 되어있습니다.

     

    Dataset<Row> 로 표현되는 DataFrame 은 Row 가 어떤 순서로 어떤 타입의 칼럼을 가지는지에 대한 정의가 되어 있습니다.

    아래와 같이 말이죠.

    class[name[0]: string, age[0]: int, city[0]: string]

     

    반면, Custom Class 를 사용하는 경우에는 해당하는 Custom Class 는 반드시 Serializable 해야만 합니다.

    반응형
Designed by Tistory.