-
Flink Parquet FileSink 알아보기Flink 2023. 10. 4. 09:04728x90반응형
- 목차
소개.
Flink 를 통해서 ETL 프로세스를 구성할 수 있습니다.
FileSink 는 Extract-Tranform-Load 의 구분에서 Load 영역의 구성요소이며,
로컬 또는 원격의 파일시스템에 생성된 파일을 저장할 수 있습니다.
이번 페이지에서는 Parquet FileSink 에 대한 설명과 예시를 작성해보려고 합니다.
참고로 저는 java 11, Flink 1.13.6 환경에서 진행하도록 하겠습니다.
Parquet.
Parquet 파일에 대한 디테일한 설명을 원하신다면 아래 페이지에서 확인 가능합니다.
https://westlife0615.tistory.com/50
파일은 크게 2가지 분류로 나뉩니다.
1. 바이너리 파일 (binary file) vs 텍스트 파일 (text file)
2. 칼럼 베이스 파일 (Column-based) vs 로우 베이스 파일 (Row-based)
텍스트 파일은 JSON, CSV, 엑셀 파일 처럼 human-reable 한 포맷의 파일입니다.
텍스트 파일의 내용은 사람이 읽을 수 있는 언어로 구성되어 있고, 컴퓨터 친화적인 파일이 아닙니다.
따라서 별도의 압출을 수행하기 때문에 용량이 클 수 밖에 없습니다.
반면 binary file 은 인코딩 과정과 압축을 통해서 비교적 적은 용량을 유지하며,
Binary File Format 의 의거해서 파일의 내용이 구성됩니다.
예를 들어, Parquet 의 경우에는 효율적인 Read/Write 를 위하여 Header, Footer, Schema 등이 존재합니다.
그리고 파일이 JSON 이나 CSV 처럼 일종의 스키마 형식이 존재한다면 이들은 칼럼 기반이나 로우 기반이냐로 분류할 수 있습니다.
당연히 이미지 파일이나 텍스트 파일과 같은 일반적은 파일은 비정형 데이터로써 스키마가 필요없습니다.
그 외의 정형 데이터의 경우에는 스키마가 필요한데요.
JSON 이나 CSV 는 일반 텍스트 파일처럼 스키마와 관계없이 저장되지만,
Parquet 와 Avro 같은 형식의 데이터는 스키마가 존재합니다.
Parquet 는 Binary format File 이면서 Column based File 입니다.
Avro.
Flink 의 Parquet File Sink 는 내부적으로 Avro Serialization Framework 를 사용합니다.
Java Object -> Avro Record -> Parquet File 의 순서로 데이터가 파일로 저장됩니다.
Avro Record 는 Java 로 작성된 프로그램과 Parquet File 사의 DTO 로써 동작하게 됩니다.
Avro Serialization 의 자세한 설명은 아래 링크로 대체하겠습니다.
https://westlife0615.tistory.com/333
예시.
flink-parquet module.
Flink 의 Parquet FileSink 를 사용하기 위해서는 아래 module 이 추가되어야합니다.
참고로 저는 flink 1.13.6 환경에서 테스트 진행 중입니다.
ext { scalaBinaryVersion = "2.11" flinkVersion = "1.13.6" slf4jVersion = '1.7.15' log4jVersion = '2.17.1' hadoopVersion = '2.4.1' } implementation "org.apache.flink:flink-parquet_${scalaBinaryVersion}:${flinkVersion}" implementation "org.apache.flink:flink-avro:${flinkVersion}" implementation 'org.apache.parquet:parquet-avro:1.12.1' implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}" implementation "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}" implementation "org.apache.hadoop:hadoop-client:${hadoopVersion}"
Row Write.
아래는 Stream Mode 인 Flink 에서 실시간으로 처리되는 데이터를 File Sink 하는 예시입니다.
아래 예시는 간단히 설명하면,
Flink 프로그램 시작부터 10분 동안 Random 한 User 객체를 생성하는 Source 가 있습니다.
그리고 User Java Object 를 Generic Record 로 변환하는 Mapper Operator 하나와
Parquet File Sink 로 구성됩니다.
package org.example.job; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.io.Serializable; import java.time.Instant; import java.util.Random; public class ParquetSinkExample implements Serializable { public static Schema schema = SchemaBuilder.record("User").namespace("com.example.kafka").fields().requiredString("name").requiredLong("age").endRecord(); public static void main(String[] args) throws Exception { Instant tenMinuteFromNow = Instant.now().plusSeconds(10 * 60); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoints")); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStream<User> source = env.addSource(new SourceFunction<>() { @Override public void run(SourceContext<User> ctx) { while (Instant.now().isBefore(tenMinuteFromNow)) { ctx.collect(User.generateRandom()); } } @Override public void cancel() { } }); String outputBasePath = "src/main/resources/test"; FileSink<GenericRecord> parquetSink = FileSink .forBulkFormat(new Path(outputBasePath), ParquetAvroWriters.forGenericRecord(schema)) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); source.map(user -> { GenericRecord record = new GenericData.Record(schema); record.put("name", user.name); record.put("age", user.age); return record; }).returns(new GenericRecordAvroTypeInfo(schema)).sinkTo(parquetSink).setParallelism(1); env.execute(); } } class User implements Serializable { public String name; public Long age; public User(String name, Long age) { this.name = name; this.age = age; } public static User generateRandom() { int leftLimit = 97; // letter 'a' int rightLimit = 122; // letter 'z' int targetStringLength = 10; String randomName = new Random().ints(leftLimit, rightLimit + 1).limit(targetStringLength).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); Long randomAge = Instant.now().toEpochMilli() % 100; return new User(randomName, randomAge); } }
bulk write.
Flink 에서 Bulk (or Batch) 단위로 데이터 쓰기가 가능합니다.
Flink 는 Batch 모드 또는 Stream 모드로 동작하는데요,
Flink 가 Batch 모드로 동작하는 경우, 모든 실행이 완료되는 시점에 Parquet File 에 Data Write 가 한번에 진행됩니다.
반면, Stream 모드인 경우에는 시간 주기로 혹은 Checkpoint (or Savepoint) 생성 시점에 Data Write 가 가능합니다.
아래의 예시는 User 라는 Java 객체가 Parquet 파일로 저장되는 예시코드입니다.
1000개의 랜덤인 User 가 Avro Record 형태로 변환되고,
Parquer File 로 저장됩니다.
package org.example.job; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.Serializable; import java.time.Instant; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ParquetSinkExample implements Serializable { public static Schema schema = SchemaBuilder.record("User").namespace("com.example.kafka").fields().requiredString("name").requiredLong("age").endRecord(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream<User> source = env.fromCollection(IntStream.of(1000).boxed().map(i -> User.generateRandom()).collect(Collectors.toList())); String outputBasePath = "src/main/resources/test"; FileSink<GenericRecord> parquetSink = FileSink.forBulkFormat(new Path(outputBasePath), ParquetAvroWriters.forGenericRecord(schema)).build(); source.map(user -> { GenericRecord record = new GenericData.Record(schema); record.put("name", user.name); record.put("age", user.age); return record; }).returns(new GenericRecordAvroTypeInfo(schema)).sinkTo(parquetSink).setParallelism(1); env.execute(); } } class User implements Serializable { public String name; public Long age; public User(String name, Long age) { this.name = name; this.age = age; } public static User generateRandom() { int leftLimit = 97; // letter 'a' int rightLimit = 122; // letter 'z' int targetStringLength = 10; String randomName = new Random().ints(leftLimit, rightLimit + 1).limit(targetStringLength).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); Long randomAge = Instant.now().toEpochMilli() % 100; return new User(randomName, randomAge); } }
실행 결과는 아래의 파일 위치에 생성됩니다.
part 로 시작하는 Parquet File 이름은 prefix + 난수 + suffix 와 같은 형식으로 작성됩니다.
/src/main/resources/test/part-956cb2ab-8963-4bfa-a9c6-3ee2f568b41d-0
반응형'Flink' 카테고리의 다른 글
Flink State 알아보기 (0) 2024.01.10 Flink Watermark 알아보기 (0) 2024.01.10 Flink Checkpoint 알아보기 (0) 2024.01.10 [Flink] 바이너리 파일 실행하기 (Binary Execution File) (0) 2023.12.29 Flink KeyedStream 알아보기 (0) 2023.10.11