-
Parquet Reader 알아보기BigData/Parquet 2023. 12. 8. 05:37728x90반응형
- 목차
함께 보면 좋은 글.
https://westlife0615.tistory.com/50
소개.
Parquet Reader 들이 어떠한 방식으로 Parquet 파일을 읽어들이는지 자세히 살펴보려고 합니다.
Apache Arrow 기반의 라이브러리들은 많은 부분이 추상화되어 있어, Parquet 파일을 읽어들이는 방식이 명료하게 나타나지 않으며,
Python 관련 라이브러리들 또한 추상화된 부분이 많았습니다.
그래서 java 의 org.apache.parquet parquet-hadoop 라이브러리를 사용할 예정입니다.
https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop
Column 기반의 Parquet 파일이 어떠한 방식으로 저장되고 로드되는지 살펴보겠습니다.
Parquet File 생성.
먼저 테스트를 위한 Parquet File 을 생성합니다.
간단하게 생성하기 위해서 python 스크립트를 활용하겠습니다.
먼저 pyarrow 모듈을 설치합니다.
pip install pyarrow
그리고 터미널을 열어 python3 인터프린터 내부로 진입합니다.
python3
그리고 Parquet 파일을 생성합니다.
저는 /tmp/ 디렉토리 내부에 생성하였습니다.
import pandas as pd import pyarrow as pa import pyarrow.parquet as pq # Create a Pandas DataFrame data = {'name': [], 'city': []} for i in range(0, 100000): data["name"].append("name" + str(i)) data["city"].append("city" + str(i)) df = pd.DataFrame(data) # Convert Pandas DataFrame to PyArrow Table table = pa.Table.from_pandas(df) # Specify the Parquet file path parquet_file_path = '/tmp/user.parquet' # Write the PyArrow Table to a Parquet file pq.write_table(table, parquet_file_path, row_group_size=1000) print(f'Parquet file written to: {parquet_file_path}')
위 스크립트로 생성되는 user.parquet 파일은
name 과 city 칼럼을 가집니다.
그리고 row_group_size 를 1000 으로 두어서, 100 개의 RowGroup 이 생성되도록 조작하였습니다.
생성된 Parquet 파일 구조는 아래와 같습니다.
100 개의 RowGroup 이 있구요.
각 RowGroup 내부에 Column 별로 데이터들이 존재하게 됩니다.
하나의 Column 내부에 저장되는 데이터들의 단위는 Page 라고하며,
RowGroup -> Column -> Page 로 이어지는 구성을 어떻게 읽어들이는지 알아보려고 합니다.
Parquet Reader 로 Parquet 메타데이터 읽기.
저는 java 11 버전에서 테스트를 진행하고 있구요.
아래 모듈의 Parquet Reader 를 사용합니다.
// https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop implementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1'
package org.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import java.io.IOException; public class ParquetTest { public static void main(String[] args) throws IOException { Path path = new Path("/tmp/user.parquet"); ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path); ParquetMetadata parquetMetadata = parquetFileReader.getFooter(); MessageType schema = parquetMetadata.getFileMetaData().getSchema(); for (int i = 0; i < parquetMetadata.getBlocks().size(); i++) { BlockMetaData metaData = parquetMetadata.getBlocks().get(i); System.out.println("RowCount " + i + ": " + metaData.getRowCount()); System.out.println("RowIndexOffset " + i + ": " + metaData.getRowIndexOffset()); System.out.println("schema " + i + ": " + schema.toString()); } parquetFileReader.close(); } }
Parquet 은 Header 와 Footer 를 가집니다.
Footer 에는 Parquet 를 구성하는 여러가지 메타데이터들이 존재하는데요.
Schema, Row 갯수, 각 RowGroup 의 위치 정보 등이 존재합니다.
위 코드 예시는 해당 메타데이터를 조회하는 내용입니다.
( 참고로 RowGroup 의 offset 를 통해서 특정 RowGroup 의 조회가 즉각적으로 가능합니다. )
RowCount 0: 1000 RowIndexOffset 0: 0 schema 0: message schema { optional binary name (STRING); optional binary city (STRING); } // 생략 RowCount 98: 1000 RowIndexOffset 98: 98000 schema 98: message schema { optional binary name (STRING); optional binary city (STRING); } RowCount 99: 1000 RowIndexOffset 99: 99000 schema 99: message schema { optional binary name (STRING); optional binary city (STRING); }
Parquet Reader 로 Parquet Row Group 읽기.
ParquetReader 라이브러리 레벨에서 RowGroup 을 PageReadStore 라고 부릅니다.
Page 와 Column 들이 모여있는 Store 라는 의미에서 PageReadStore 라고 부르는 것 같습니다.
package org.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import java.io.IOException; public class ParquetTest { public static void main(String[] args) throws IOException { Path path = new Path("/tmp/user.parquet"); ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path); ParquetMetadata parquetMetadata = parquetFileReader.getFooter(); MessageType schema = parquetMetadata.getFileMetaData().getSchema(); PageReadStore pageReadStore; while ((pageReadStore = parquetFileReader.readNextRowGroup()) != null) { System.out.println("row Count : " + pageReadStore.getRowCount()); System.out.println("row index : " + pageReadStore.getRowIndexOffset().get()); } parquetFileReader.close(); } }
각 Row Group 이 가지는 1000 개의 Row 들의 정보들을 알 수 있습니다.
< 출력 >
row Count : 1000 row index : 0 // ... 생략 row Count : 1000 row index : 1000 row Count : 1000 row index : 2000 row Count : 1000 row index : 99000
Parquet Reader 로 Row 읽기.
이제 RowGroup -> Column -> Page 로 이어지는 실질적인 데이터를 읽어보겠습니다.
아래 예시에선 ColumnReader 와 PageReader 가 사용됩니다.
package org.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnReadStore; import org.apache.parquet.column.ColumnReader; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.page.*; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.List; public class ParquetTest { public static void main(String[] args) throws IOException { Path path = new Path("/tmp/user.parquet"); ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path); ParquetMetadata parquetMetadata = parquetFileReader.getFooter(); MessageType schema = parquetMetadata.getFileMetaData().getSchema(); PageReadStore pageReadStore; while ((pageReadStore = parquetFileReader.readNextRowGroup()) != null) { ColumnReadStore colReadStore = new ColumnReadStoreImpl( pageReadStore, new GroupRecordConverter(schema).getRootConverter(), schema, "" ); for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) { ColumnDescriptor columnDescriptor = schema.getColumns().get(columnIndex); PageReader pageReader = pageReadStore.getPageReader(columnDescriptor); ColumnReader colReader = colReadStore.getColumnReader(columnDescriptor); long valueSize = pageReader.getTotalValueCount(); for (int i = 0; i < valueSize; i++) { String value = colReader.getBinary().toStringUsingUTF8(); System.out.println(value); colReader.consume(); } } } parquetFileReader.close(); } }
아래 결과는 생략된 버전이긴하지만,
각 Column 은 row_group_size 만큼씩 읽어들입니다.
name0 name1 name2 name3 name4 name5 name6 name7 name8 name9 city0 city1 city2 city3 city4 city5 city6 city7
반응형'BigData > Parquet' 카테고리의 다른 글
[Parquet] Dictionary Encoding 알아보기 (0) 2024.03.24 [Apache Arrow] Pyarrow Table 알아보기 (0) 2024.03.08 Parquet 알아보기 (0) 2023.01.05