ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flink Parquet FileSink 알아보기
    Flink 2023. 10. 4. 09:04
    728x90
    반응형

    - 목차

     

    소개.

    Flink 를 통해서 ETL 프로세스를 구성할 수 있습니다.

    FileSink 는 Extract-Tranform-Load 의 구분에서 Load 영역의 구성요소이며,

    로컬 또는 원격의 파일시스템에 생성된 파일을 저장할 수 있습니다.

     

    이번 페이지에서는 Parquet FileSink 에 대한 설명과 예시를 작성해보려고 합니다.

    참고로 저는 java 11, Flink 1.13.6 환경에서 진행하도록 하겠습니다.

     

    Parquet.

    Parquet 파일에 대한 디테일한 설명을 원하신다면 아래 페이지에서 확인 가능합니다.

    https://westlife0615.tistory.com/50

     

    Parquet 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/333 Avro Serialization 알아보기. - 목차 관련된 글 https://westlife0615.tistory.com/332 Avro File 알아보기 - 목차 소개. Avro 는 두가지 기능을 제공합니다. 첫번째는 직

    westlife0615.tistory.com

     

     

    파일은 크게 2가지 분류로 나뉩니다.

    1. 바이너리 파일 (binary file) vs 텍스트 파일 (text file)

    2. 칼럼 베이스 파일 (Column-based) vs 로우 베이스 파일 (Row-based)

     

    텍스트 파일은 JSON, CSV, 엑셀 파일 처럼 human-reable 한 포맷의 파일입니다.

    텍스트 파일의 내용은 사람이 읽을 수 있는 언어로 구성되어 있고, 컴퓨터 친화적인 파일이 아닙니다.

    따라서 별도의 압출을 수행하기 때문에 용량이 클 수 밖에 없습니다.

    반면 binary file 은 인코딩 과정과 압축을 통해서 비교적 적은 용량을 유지하며,

    Binary File Format 의 의거해서 파일의 내용이 구성됩니다.

    예를 들어, Parquet 의 경우에는 효율적인 Read/Write 를 위하여 Header, Footer, Schema 등이 존재합니다.

     

    그리고 파일이 JSON 이나 CSV 처럼 일종의 스키마 형식이 존재한다면 이들은 칼럼 기반이나 로우 기반이냐로 분류할 수 있습니다.

    당연히 이미지 파일이나 텍스트 파일과 같은 일반적은 파일은 비정형 데이터로써 스키마가 필요없습니다.

    그 외의 정형 데이터의 경우에는 스키마가 필요한데요.

    JSON 이나 CSV 는 일반 텍스트 파일처럼 스키마와 관계없이 저장되지만,

    Parquet 와 Avro 같은 형식의 데이터는 스키마가 존재합니다.

     

    Parquet 는 Binary format File 이면서 Column based File 입니다.

     

     

    Avro.

    Flink Parquet File Sink 는 내부적으로 Avro Serialization Framework 를 사용합니다.

    Java Object -> Avro Record -> Parquet File 의 순서로 데이터가 파일로 저장됩니다.

    Avro Record 는 Java 로 작성된 프로그램과 Parquet File 사의 DTO 로써 동작하게 됩니다.

     

    Avro Serialization 의 자세한 설명은 아래 링크로 대체하겠습니다.

     

    https://westlife0615.tistory.com/333

     

    Avro Serialization 알아보기.

    - 목차 관련된 글 https://westlife0615.tistory.com/332 Avro File 알아보기 - 목차 소개. Avro 는 두가지 기능을 제공합니다. 첫번째는 직렬화 기능입니다. Avro 는 Serialization Framework 로서 직렬화와 역직렬화를

    westlife0615.tistory.com

     

    예시.

     

    flink-parquet module.

    Flink 의 Parquet FileSink 를 사용하기 위해서는 아래 module 이 추가되어야합니다.

    참고로 저는 flink 1.13.6 환경에서 테스트 진행 중입니다.

    ext {
        scalaBinaryVersion = "2.11"
        flinkVersion = "1.13.6"
        slf4jVersion = '1.7.15'
        log4jVersion = '2.17.1'
        hadoopVersion = '2.4.1'
    }
    
    implementation "org.apache.flink:flink-parquet_${scalaBinaryVersion}:${flinkVersion}"
    implementation "org.apache.flink:flink-avro:${flinkVersion}"
    implementation 'org.apache.parquet:parquet-avro:1.12.1'
    implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}"
    implementation "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}"
    implementation "org.apache.hadoop:hadoop-client:${hadoopVersion}"

     

    Row Write.

     

    아래는 Stream Mode 인 Flink 에서 실시간으로 처리되는 데이터를 File Sink 하는 예시입니다.

    아래 예시는 간단히 설명하면,

    Flink 프로그램 시작부터 10분 동안 Random 한 User 객체를 생성하는 Source 가 있습니다.

    그리고 User Java Object 를 Generic Record 로 변환하는 Mapper Operator 하나와

    Parquet File Sink 로 구성됩니다.

     

     

    package org.example.job;
    
    
    import org.apache.avro.Schema;
    import org.apache.avro.SchemaBuilder;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.io.Serializable;
    import java.time.Instant;
    import java.util.Random;
    
    public class ParquetSinkExample implements Serializable {
      public static Schema schema = SchemaBuilder.record("User").namespace("com.example.kafka").fields().requiredString("name").requiredLong("age").endRecord();
    
      public static void main(String[] args) throws Exception {
        Instant tenMinuteFromNow = Instant.now().plusSeconds(10 * 60);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoints"));
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        DataStream<User> source = env.addSource(new SourceFunction<>() {
          @Override
          public void run(SourceContext<User> ctx) {
            while (Instant.now().isBefore(tenMinuteFromNow)) {
              ctx.collect(User.generateRandom());
            }
          }
    
          @Override
          public void cancel() {
    
          }
        });
    
        String outputBasePath = "src/main/resources/test";
        FileSink<GenericRecord> parquetSink = FileSink
                .forBulkFormat(new Path(outputBasePath), ParquetAvroWriters.forGenericRecord(schema))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();
    
        source.map(user -> {
          GenericRecord record = new GenericData.Record(schema);
          record.put("name", user.name);
          record.put("age", user.age);
          return record;
        }).returns(new GenericRecordAvroTypeInfo(schema)).sinkTo(parquetSink).setParallelism(1);
    
    
        env.execute();
      }
    }
    
    class User implements Serializable {
    
      public String name;
      public Long age;
    
      public User(String name, Long age) {
        this.name = name;
        this.age = age;
      }
    
      public static User generateRandom() {
        int leftLimit = 97; // letter 'a'
        int rightLimit = 122; // letter 'z'
        int targetStringLength = 10;
        String randomName = new Random().ints(leftLimit, rightLimit + 1).limit(targetStringLength).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
    
        Long randomAge = Instant.now().toEpochMilli() % 100;
        return new User(randomName, randomAge);
      }
    }

     

    bulk write.

    Flink 에서 Bulk (or Batch) 단위로 데이터 쓰기가 가능합니다.

    Flink 는 Batch 모드 또는 Stream 모드로 동작하는데요,

    Flink 가 Batch 모드로 동작하는 경우, 모든 실행이 완료되는 시점에 Parquet File 에 Data Write 가 한번에 진행됩니다.

    반면, Stream 모드인 경우에는 시간 주기로 혹은 Checkpoint (or Savepoint) 생성 시점에 Data Write 가 가능합니다.

     

    아래의 예시는 User 라는 Java 객체가 Parquet 파일로 저장되는 예시코드입니다.

    1000개의 랜덤인 User 가 Avro Record 형태로 변환되고,

    Parquer File 로 저장됩니다.

     

    package org.example.job;
    
    
    import org.apache.avro.Schema;
    import org.apache.avro.SchemaBuilder;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.Serializable;
    import java.time.Instant;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class ParquetSinkExample implements Serializable {
      public static Schema schema = SchemaBuilder.record("User").namespace("com.example.kafka").fields().requiredString("name").requiredLong("age").endRecord();
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream<User> source = env.fromCollection(IntStream.of(1000).boxed().map(i -> User.generateRandom()).collect(Collectors.toList()));
        String outputBasePath = "src/main/resources/test";
        FileSink<GenericRecord> parquetSink = FileSink.forBulkFormat(new Path(outputBasePath), ParquetAvroWriters.forGenericRecord(schema)).build();
    
        source.map(user -> {
          GenericRecord record = new GenericData.Record(schema);
          record.put("name", user.name);
          record.put("age", user.age);
          return record;
        }).returns(new GenericRecordAvroTypeInfo(schema)).sinkTo(parquetSink).setParallelism(1);
    
    
        env.execute();
      }
    }
    
    class User implements Serializable {
    
      public String name;
      public Long age;
    
      public User(String name, Long age) {
        this.name = name;
        this.age = age;
      }
    
      public static User generateRandom() {
        int leftLimit = 97; // letter 'a'
        int rightLimit = 122; // letter 'z'
        int targetStringLength = 10;
        String randomName = new Random().ints(leftLimit, rightLimit + 1).limit(targetStringLength).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
    
        Long randomAge = Instant.now().toEpochMilli() % 100;
        return new User(randomName, randomAge);
      }
    }

     

    실행 결과는 아래의 파일 위치에 생성됩니다.

    part 로 시작하는 Parquet File 이름은 prefix + 난수 + suffix 와 같은 형식으로 작성됩니다.

    /src/main/resources/test/part-956cb2ab-8963-4bfa-a9c6-3ee2f568b41d-0

     

     

     

    반응형

    'Flink' 카테고리의 다른 글

    Flink State 알아보기  (0) 2024.01.10
    Flink Watermark 알아보기  (0) 2024.01.10
    Flink Checkpoint 알아보기  (0) 2024.01.10
    [Flink] 바이너리 파일 실행하기 (Binary Execution File)  (0) 2023.12.29
    Flink KeyedStream 알아보기  (0) 2023.10.11
Designed by Tistory.