-
Spark DataFrame 알아보기Spark 2023. 12. 15. 06:39728x90반응형
- 목차
소개.
Spark 의 DataFrame 은 Pandas 의 DataFrame 처럼 행과 열을 가지는 데이터를 표현하는 방식입니다.
즉, Matrix 를 표현할 수 있는 Spark 의 자료구조입니다.
뿐만 아니라 DataFrame 은 Distributed Collection of Data 라고 불립니다.
Matrix 를 표현하는 범위를 넘어서 분산처리가 가능한 자료구조입니다.
정리하면, DataFrame 을 분산처리가 가능한 Spark 의 데이터 단위입니다.
이번 글에서 Spark 의 DataFrame 에 대해서 자세히 일아보려고 합니다.
DataFrame 이란.
DataFrame 에 대해서 알아보기 이전에 Spark 의 자료구조들에 대해서 알아보려고 합니다.
- Row
- Seq
- Dataset
Row.
Row 는 DataFrame 을 구성하는 개별 데이터를 의미합니다.
SQL 기반의 데이터베이스나 Matrix 에서 뜻하는 로우와 의미를 같이합니다.
아래 예시와 같이 Row 를 생성할 수 있는데요.
StructType structType = new StructType() .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType); Row row1 = RowFactory.create("Andy", 31, "Seoul"); Row row2 = RowFactory.create("Bob", 44, "Daegu"); Row row3 = RowFactory.create("Chris", 22, "Busan");
Column 이 "이름", "나이", "도시" 와 같은 Matrix 에서 Row 들은
- ("Andy", 31, "Seoul")
- ("Bob", 44, "Daegu")
- ("Chris", 22, "Busan")이런 방식으로 표현될 수 있겠죠?
Row 는 DataFrame 을 이루는 데이터의 최소 단위가 됩니다.
Seq.
Seq 는 Row 들을 구성하는 List 형식의 자료구조입니다.
아래와 같은 형식으로 사용되구요.
Java 의 List 자료구조와 거의 유사하다고 생각하시면 됩니다.
val rows = Seq( Row(1, "John", 25), Row(2, "Jane", 30), Row(3, "Bob", 22) )
Dataset.
DataFrame 은 Row 와 Seq 를 조합한 Dataset<Row> 라는 형식으로 표현됩니다.
예시 코드를 작성해보면 아래와 같습니다.
Row row1 = RowFactory.create("Andy", 31, "Seoul"); Row row2 = RowFactory.create("Bob", 44, "Daegu"); Row row3 = RowFactory.create("Chris", 22, "Busan"); StructType structType = new StructType() .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType); List<Row> source = new ArrayList<>(); source.add(row1); source.add(row2); source.add(row3); Dataset<Row> dataset = session.createDataFrame(source, structType);
Row, Seq 그리고 StructType 이 모여 DataFrame 을 구성하게 되구요.
DataFrame 은 Dataset<Row> 타입의 데이터가 됩니다.
생성된 Dataset 은 Transformation & Action 을 통하여 분산처리가 됩니다.
Dataset 구성.
DataFrame 은 Dataset<Row> 의 타입의 객체로써 관리됩니다.
그리고 Dataset 는 Matrix 를 표현하는 자료구조 이외의 중요한 기능들을 가지는데요.
이러한 데이터를 표현하는 기능 외에도
- Lazy Evaluation
- 분산 처리
- Optimization
- Serialization/Deserialization
등의 데이터 처리 관점의 기능 또한 가지고 있습니다.
logicalPlan.
Dataset 클래스는 logicalPlan 이라는 구성요소를 가집니다.
이는 Dataset 또는 DataFrame 이 어떤 Transformation 과정을 거치는지에 대한 기록입니다.
실질적으로 Logical Execution Plan 이라고 불리며,
이는 Optimizer 에 의해서 Physical Execution Plan 으로 바뀌게 됩니다.
아래 예시는 각각이 Dataset 이 Transformation 을 거치게 되면,
어떤 localPlan 을 가지는지를 표현하고 있습니다.
Dataset<Row> dataset = session.createDataFrame(source, structType); --> LocalRelation [name#0, age#1, city#2] dataset = dataset.filter("age > 22"); --> Filter (age#1 > 22) --> +- LocalRelation [name#0, age#1, city#2] dataset = dataset.filter("city != 'Seoul'"); --> 'Filter NOT ('city = Seoul) --> +- Filter (age#1 > 22) --> +- LocalRelation [name#0, age#1, city#2]
예를 들어,
dataset.filter("age > 22") 는
"Filter (age#1 > 22 )" 라는 Logical Plan 으로 표현되게 되는 것이죠.
encoder.
Spark 의 DataFrame 은 분산 처리 가능한 데이터 형식입니다.
그리고 데이터의 분산 처리에는 반드시 "직렬화/역직렬화" 에 대한 정의가 필수적입니다.
왜냐하면 여러 서버로 데이터들이 전달되는 과정을 겪기 때문이죠.
Dataset 의 encoder 은 어떤 Serialization 방식을 사용하는지에 대한 정의가 되어있습니다.
Dataset<Row> 로 표현되는 DataFrame 은 Row 가 어떤 순서로 어떤 타입의 칼럼을 가지는지에 대한 정의가 되어 있습니다.
아래와 같이 말이죠.
class[name[0]: string, age[0]: int, city[0]: string]
반면, Custom Class 를 사용하는 경우에는 해당하는 Custom Class 는 반드시 Serializable 해야만 합니다.
반응형'Spark' 카테고리의 다른 글
Spark RDD Storage 알아보기 (Persist, Cache) (0) 2023.12.15 [Spark] parallelize 알아보기 (0) 2023.12.15 Helm 으로 Spark 구축하기 (0) 2023.12.15 Spark Driver Program 알아보기 (0) 2023.12.06 [Spark] SparkSQL CSV 파일 Aggregation 하기 (0) 2023.10.03