ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Parquet Reader 알아보기
    BigData/Parquet 2023. 12. 8. 05:37
    728x90
    반응형

    - 목차

     

    함께 보면 좋은 글.

    https://westlife0615.tistory.com/50

     

    Parquet 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/333 Avro Serialization 알아보기. - 목차 관련된 글 https://westlife0615.tistory.com/332 Avro File 알아보기 - 목차 소개. Avro 는 두가지 기능을 제공합니다. 첫번째는 직

    westlife0615.tistory.com

     

    소개.

    Parquet Reader 들이 어떠한 방식으로 Parquet 파일을 읽어들이는지 자세히 살펴보려고 합니다.

    Apache Arrow 기반의 라이브러리들은 많은 부분이 추상화되어 있어, Parquet 파일을 읽어들이는 방식이 명료하게 나타나지 않으며,

    Python 관련 라이브러리들 또한 추상화된 부분이 많았습니다.

    그래서 java 의 org.apache.parquet parquet-hadoop 라이브러리를 사용할 예정입니다.

    https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop

     

    Column 기반의 Parquet 파일이 어떠한 방식으로 저장되고 로드되는지 살펴보겠습니다.

     

    Parquet File 생성.

    먼저 테스트를 위한 Parquet File 을 생성합니다.

    간단하게 생성하기 위해서 python 스크립트를 활용하겠습니다.

     

    먼저 pyarrow 모듈을 설치합니다.

    pip install pyarrow

     

    그리고 터미널을 열어 python3 인터프린터 내부로 진입합니다.

    python3

     

    그리고 Parquet 파일을 생성합니다.

    저는 /tmp/ 디렉토리 내부에 생성하였습니다.

    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    
    # Create a Pandas DataFrame
    data = {'name': [], 'city': []}
    for i in range(0, 100000):
        data["name"].append("name" + str(i))
        data["city"].append("city" + str(i))
    
    df = pd.DataFrame(data)
    
    # Convert Pandas DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)
    
    # Specify the Parquet file path
    parquet_file_path = '/tmp/user.parquet'
    
    # Write the PyArrow Table to a Parquet file
    pq.write_table(table, parquet_file_path, row_group_size=1000)
    
    print(f'Parquet file written to: {parquet_file_path}')

     

    위 스크립트로 생성되는 user.parquet 파일은

    name 과 city 칼럼을 가집니다.

    그리고 row_group_size 를 1000 으로 두어서, 100 개의 RowGroup 이 생성되도록 조작하였습니다.

     

    생성된 Parquet 파일 구조는 아래와 같습니다.

    100 개의 RowGroup 이 있구요.

    각 RowGroup 내부에 Column 별로 데이터들이 존재하게 됩니다.

    하나의 Column 내부에 저장되는 데이터들의 단위는 Page 라고하며,

    RowGroup -> Column -> Page 로 이어지는 구성을 어떻게 읽어들이는지 알아보려고 합니다.

     

     

     

    Parquet Reader 로 Parquet 메타데이터 읽기.

     

    저는 java 11 버전에서 테스트를 진행하고 있구요.

    아래 모듈의 Parquet Reader 를 사용합니다.

    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop
    implementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1'

     

     

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.hadoop.ParquetFileReader;
    import org.apache.parquet.hadoop.metadata.BlockMetaData;
    import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    import org.apache.parquet.schema.MessageType;
    
    import java.io.IOException;
    
    public class ParquetTest {
      public static void main(String[] args) throws IOException {
        Path path = new Path("/tmp/user.parquet");
        ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path);
        ParquetMetadata parquetMetadata = parquetFileReader.getFooter();
        MessageType schema = parquetMetadata.getFileMetaData().getSchema();
    
        for (int i = 0; i < parquetMetadata.getBlocks().size(); i++) {
          BlockMetaData metaData = parquetMetadata.getBlocks().get(i);
          System.out.println("RowCount " + i + ": " + metaData.getRowCount());
          System.out.println("RowIndexOffset " + i + ": " + metaData.getRowIndexOffset());
          System.out.println("schema " + i + ": " + schema.toString());
        }
    
        parquetFileReader.close();
      }
    }

     

    Parquet 은 Header 와 Footer 를 가집니다.

    Footer 에는 Parquet 를 구성하는 여러가지 메타데이터들이 존재하는데요.

    Schema, Row 갯수, 각 RowGroup 의 위치 정보 등이 존재합니다.

    위 코드 예시는 해당 메타데이터를 조회하는 내용입니다.

    ( 참고로 RowGroup 의 offset 를 통해서 특정 RowGroup 의 조회가 즉각적으로 가능합니다. )

     

    RowCount 0: 1000
    RowIndexOffset 0: 0
    schema 0: message schema {
      optional binary name (STRING);
      optional binary city (STRING);
    }
    
    // 생략
    
    RowCount 98: 1000
    RowIndexOffset 98: 98000
    schema 98: message schema {
      optional binary name (STRING);
      optional binary city (STRING);
    }
    
    RowCount 99: 1000
    RowIndexOffset 99: 99000
    schema 99: message schema {
      optional binary name (STRING);
      optional binary city (STRING);
    }

     

     

    Parquet Reader 로 Parquet Row Group 읽기.

    ParquetReader 라이브러리 레벨에서 RowGroup 을 PageReadStore 라고 부릅니다.

    Page 와 Column 들이 모여있는 Store 라는 의미에서 PageReadStore 라고 부르는 것 같습니다.

     

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.column.page.PageReadStore;
    import org.apache.parquet.hadoop.ParquetFileReader;
    import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    import org.apache.parquet.schema.MessageType;
    
    import java.io.IOException;
    
    public class ParquetTest {
      public static void main(String[] args) throws IOException {
        Path path = new Path("/tmp/user.parquet");
        ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path);
        ParquetMetadata parquetMetadata = parquetFileReader.getFooter();
        MessageType schema = parquetMetadata.getFileMetaData().getSchema();
    
    
        PageReadStore pageReadStore;
        while ((pageReadStore = parquetFileReader.readNextRowGroup()) != null) {
          System.out.println("row Count : " + pageReadStore.getRowCount());
          System.out.println("row index : " + pageReadStore.getRowIndexOffset().get());
        }
        parquetFileReader.close();
      }
    }

     

    각 Row Group 이 가지는 1000 개의 Row 들의 정보들을 알 수 있습니다.

     

    < 출력 >

    row Count : 1000
    row index : 0
    
    // ... 생략
    
    row Count : 1000
    row index : 1000
    
    row Count : 1000
    row index : 2000
    
    row Count : 1000
    row index : 99000

     

     

     

    Parquet Reader 로 Row 읽기.

    이제 RowGroup -> Column -> Page 로 이어지는 실질적인 데이터를 읽어보겠습니다.

    아래 예시에선 ColumnReader 와 PageReader 가 사용됩니다.

     

     

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.column.ColumnDescriptor;
    import org.apache.parquet.column.ColumnReadStore;
    import org.apache.parquet.column.ColumnReader;
    import org.apache.parquet.column.impl.ColumnReadStoreImpl;
    import org.apache.parquet.column.page.*;
    import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    import org.apache.parquet.hadoop.ParquetFileReader;
    import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    import org.apache.parquet.schema.MessageType;
    
    import java.io.IOException;
    import java.util.List;
    
    public class ParquetTest {
      public static void main(String[] args) throws IOException {
        Path path = new Path("/tmp/user.parquet");
        ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path);
        ParquetMetadata parquetMetadata = parquetFileReader.getFooter();
        MessageType schema = parquetMetadata.getFileMetaData().getSchema();
    
        PageReadStore pageReadStore;
        while ((pageReadStore = parquetFileReader.readNextRowGroup()) != null) {
          ColumnReadStore colReadStore = new ColumnReadStoreImpl(
                  pageReadStore,
                  new GroupRecordConverter(schema).getRootConverter(),
                  schema,
                  ""
          );
    
          for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
            ColumnDescriptor columnDescriptor = schema.getColumns().get(columnIndex);
            PageReader pageReader = pageReadStore.getPageReader(columnDescriptor);
            ColumnReader colReader = colReadStore.getColumnReader(columnDescriptor);
            long valueSize = pageReader.getTotalValueCount();
            for (int i = 0; i < valueSize; i++) {
              String value = colReader.getBinary().toStringUsingUTF8();
              System.out.println(value);
              colReader.consume();
            }
          }
    
        }
        parquetFileReader.close();
      }
    }

     

     

    아래 결과는 생략된 버전이긴하지만,

    각 Column 은 row_group_size 만큼씩 읽어들입니다.

    name0
    name1
    name2
    name3
    name4
    name5
    name6
    name7
    name8
    name9
    city0
    city1
    city2
    city3
    city4
    city5
    city6
    city7

     

     

    반응형

    'BigData > Parquet' 카테고리의 다른 글

    [Parquet] Dictionary Encoding 알아보기  (0) 2024.03.24
    [Apache Arrow] Pyarrow Table 알아보기  (0) 2024.03.08
    Parquet 알아보기  (0) 2023.01.05
Designed by Tistory.