-
[Flink] FileSource 알아보기 (json, csv, parquet, avro)Flink 2024. 2. 4. 19:53728x90반응형
- 목차
들어가며.
Flink 는 File IO 를 위한 Source 로 FileSource class 를 가집니다.
FileSource 는 File Read 를 위한 세가지 Factory Method 를 가지는데요.
forRecordStreamFormat, forBulkFileFormat, forRecordFileFormat 에 해당하는 Method 들을 가집니다.
단순한 텍스트 파일부터 JSON, CSV 포맷의 파일과 Avro, Parquet 같은 Binary File 을 처리하는 FileSource 에 대해서 알아보려고 합니다.
File Source 의 Fully Qualifed Name 은 org.apache.flink.connector.file.src.reader.FileSource 이구요.
1.16.1 버전의 모듈을 사용할 예정입니다.
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files implementation group: 'org.apache.flink', name: 'flink-connector-files', version: '1.16.1'
File DataSource 의 구성요소 살펴보기.
DataSource 는 SourceSplit, SourceReader, SourceEnumerator 로 구성됩니다.
이들의 관계를 그림으로 나타내면 아래와 같습니다.
SourceSplit 은 IO 의 대상이 될 수 있는 가장 작은 단위입니다.
만약 HDFS 나 s3a 처럼 여러 파일들을 읽게 된다면, Split 은 하나의 파일입니다.
그리고 Kafka 를 DataSource 로 연결하는 경우에는 Partition 이 SourceSplit 이 됩니다.
만약에 하나의 파일을 잘라서 각 Chunk 단위를 Split 으로 삼을 수도 있습니다.
( 예를 들어 Split-1 : 1 ~ 100 line, Split-2 : 101 ~ 200 line, Split-3 : 201 ~ 300 line ... 이런 느낌으로 말이죠.)
이는 구현하는 프로그래머의 몫입니다.
다만 FIle 또는 Partition 이 흔히 적용되는 Split 의 단위라는 것만 알고 계시면 좋을 것 같습니다.
FileSource 의 경우의 SourceSplit 은 FilePath, Offset, Length 와 같은 정보로 구현됩니다.
그리고 SplitReader 는 SourceSplit 을 조회해서 이를 역직렬화하고 DownStream 으로 흘려보냅니다.
예를 들어, 아래의 이미지처럼 File SourceSplit 의 정보를 토대로 File 을 읽어들여 Buffer 를 구성합니다.
그리고 그 Buffer Read 하고 Deserialize 합니다.
모든 SourceSplit 들을 처리할 때까지 이러한 과정을 반복합니다.
이어서 설명하겠지만, TextFile 의 SourceReader 는 단순한 BufferedReader 입니다.
즉, Line-by-Line 으로 Text 을 읽어들이는 단순한 구조입니다.
마지막으로 SourceEnumerator 는 JobManager 에서 TaskManager 의 SourceReader 에게 Split 을 제공합니다.
StreamFormat, BulkFormat.
FileSource 외에도 KafkaSource, Hadoop Presto Source 등이 존재합니다.
StreamFormat 과 BulkFormat 의 FileSource 만의 클래스이고,
StreamFormat 과 BulkFormat 내부에 SourceReader 를 정의하는 구조를 취합니다.
즉, SourceReader 를 가지는 Wrapper 또는 Factory 라고 생각하셔도 됩니다.
혹시나 StreamFormat 과 BulkFormat 을 접하시고 혼란을 느끼실까봐 이런 내용을 작성하였습니다.
Text FileSource.
이제 본격적으로 FileSource 의 다양한 케이스에 대해서 알아보겠습니다.
먼저 단순한 TextFile 를 읽어들이는 Flink DataStream 프로그램은 아래와 같이 구성됩니다.
< test.txt >
123 456 789 123 456 789 123 456 789 123 456 789
< Flink TestFileSource.java >
package com.westlife.jobs; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TestFileSource { public static void main (String[] args) throws Exception { buildTextFileSource(); } private static void buildTextFileSource() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("files/test.txt")).build(); DataStream<String> source = env.<String>fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source"); source.print(); env.execute(); } }
단순한 File 을 DataSource 로 사용할 때에는 TextLineInputFormat 을 사용합니다.
TextLineInputFormat 은 StreamFormat 을 구현한 클래스이구요.
StreamFormat 은 SourceReader 를 정의한 ? 구현한 ? Wrapper 또는 Factory 라고 말씀드렸죠 ?
TextLineInputFormat 은 이름에서 알 수 있듯이, Line-By-Line 으로 텍스트 파일을 읽어들이는 기능을 가집니다.
그리고 Java IO 에서 파일을 Line-By-Line 으로 읽어들이는 역할을 담당하는 것이 BufferedReader 죠 ?
그래서 TextLineInputFormat 은 단순히 BufferedReader 를 SourceReader 로 취합니다.
아래 캡쳐본은 TextLineInputFormat 이 정의하는 SourceReader 의 내용입니다.
그리고 FileSourceSplit 이라는 Class 를 SourceSplit 으로 사용합니다.
FileSourceSplit 의 Full Name 은 org.apache.flink.connector.file.src.FileSourceSplit 이구요.
FilePath, File Position, File Length 등의 정보를 가집니다.
위 세가지 정보만 있으면 FileInputStream 을 만들 수 있고, 현재 어떤 Position 까지 읽었는지 등의 정보를 다 알 수 있겠죠 ?
Flink Program 의 출력은 아래와 같습니다.
4> 123 4> 456 4> 789 4> 123 4> 456 4> 789 4> 123 4> 456 4> 789 4> 123 4> 456 4> 789
Json FileSource.
아래 코드는 TextLineInputFormat 을 토대로 구현한 JsonInputFormat 파일입니다.
Jackson Library 를 추가하여 Deserialization 기능만 추가하였습니다.
package com.westlife.connector.file; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.api.common.typeinfo.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.*; import org.apache.flink.core.fs.FSDataInputStream; import javax.annotation.Nullable; import java.io.*; public class JsonInputFormat extends SimpleStreamFormat<ObjectNode> { private static final long serialVersionUID = 1L; public static final String DEFAULT_CHARSET_NAME = "UTF-8"; private final String charsetName; public JsonInputFormat() { this(DEFAULT_CHARSET_NAME); } public JsonInputFormat(String charsetName) { this.charsetName = charsetName; } @Override public JsonInputFormat.JsonFormatReader createReader(Configuration config, FSDataInputStream stream) throws IOException { final BufferedReader reader = new BufferedReader(new InputStreamReader(stream, charsetName)); return new JsonInputFormat.JsonFormatReader(reader); } @Override public TypeInformation<ObjectNode> getProducedType() { return Types.GENERIC(ObjectNode.class); } public static final class JsonFormatReader implements StreamFormat.Reader<ObjectNode> { private final BufferedReader reader; private final ObjectMapper objectMapper; JsonFormatReader(final BufferedReader reader) { this.reader = reader; this.objectMapper = new ObjectMapper(); } @Nullable @Override public ObjectNode read() throws IOException { String content = reader.readLine(); if (content == null) return null; return (ObjectNode) this.objectMapper.readTree(content); } @Override public void close() throws IOException { reader.close(); } } }
그리고 아래와 같이 Flink FileSource 의 StreamFormat 으로 JsonInputFormat 을 사용하였습니다.
package com.westlife.jobs; import com.fasterxml.jackson.databind.node.BaseJsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.westlife.connector.file.JsonInputFormat; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TestFileSource { public static void main (String[] args) throws Exception { buildJsonFileSource(); } private static void buildJsonFileSource() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileSource<ObjectNode> fileSource = FileSource.forRecordStreamFormat(new JsonInputFormat(), new Path("files/users.json")).build(); DataStream<String> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source").map(BaseJsonNode::toString); source.print(); env.execute(); } }
< 출력 >
2> {"name":"Andy","age":10} 2> {"name":"Brad","age":12} 2> {"name":"Chris","age":42} 2> {"name":"Daniel","age":32} 2> {"name":"Emma","age":22}
CSV FileSource.
CSV File 를 DataSource 로 사용하기 위한 Connector Module 이 존재합니다.
// https://mvnrepository.com/artifact/org.apache.flink/flink-csv implementation group: 'org.apache.flink', name: 'flink-csv', version: '1.16.1'
먼저 아래와 같이 csv 파일을 Deserialization 할 Pojo 타입이 필요하고,
package com.westlife.model; import java.io.Serializable; public class User implements Serializable { private String name; private Integer age; public String getName() {return this.name;}; public Integer getAge() {return this.age;}; public void setName(String name) {this.name = name;}; public void setAge(Integer age) {this.age = age;}; public String toString() {return String.format("%s, %s", this.name, this.age);}; }
아래와 같이 CsvReaderFormat 인 StreamFormat 으로부터 FileSource 를 구현할 수 있습니다.
forSchema, forPojo 와 같인 Helper Function 들이 존재합니다.
저는 Schema 와 Pojo 클래스를 활용하는 forSchema 를 사용하였습니다.
package com.westlife.jobs; import com.westlife.model.User; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.csv.CsvReaderFormat; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TestFileSource { public static void main (String[] args) throws Exception { buildCSVFileSource(); } private static void buildCSVFileSource() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); CsvSchema schema = CsvSchema.builder() .setUseHeader(true) .addColumn("name", CsvSchema.ColumnType.STRING) .addColumn("age", CsvSchema.ColumnType.NUMBER).build(); FileSource<User> fileSource = FileSource.forRecordStreamFormat(CsvReaderFormat.forSchema(schema, TypeInformation.of(User.class)), new Path("files/users.csv")).build(); DataStream<User> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source"); source.print(); env.execute(); } }
< users.csv >
name,age Andy,10 Brad,12 Chris,42 Daniel,32 Emma,22
Parquet FileSource.
아래 코드는 users.parquet 파일을 생성하는 Python 코드입니다.
import pandas as pd ls = [("Andy", 32), ("Brad", 44), ("Chris", 23), ("Daniel", 19)] df = pd.DataFrame(ls, columns=["name", "age"]) df.to_parquet("/tmp/users.parquet")
생성된 users.parquet 파일의 Schema 는 아래와 같이 구성됩니다.
{ "type" : "record", "name" : "schema", "fields" : [ { "name" : "name", "type" : [ "null", "string" ], "default" : null }, { "name" : "age", "type" : [ "null", "long" ], "default" : null } ] }
그리고 Parquet FileSource 를 구성하기 위해서 사용한 Dependencies 는 아래와 같습니다.
총 3개의 Module 을 사용하였는데요.
flink-parquet 와 parquet-avro 모듈은 Parquet FileSource 와 관련된 클래스들이 존재하며,
hadoop-client 는 Hadoop 과 관련된 설정을 위해서 필요합니다.
저의 경우에는 hdfs 를 사용하지 않았지만, 기본적으로 hdfs 에 관한 설정이 요구되므로 hadoop-client 모듈이 필요합니다.
// https://mvnrepository.com/artifact/org.apache.flink/flink-parquet implementation group: 'org.apache.flink', name: 'flink-parquet', version: '1.16.1' // https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro implementation('org.apache.parquet:parquet-avro:1.12.2') implementation group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.4.1'
결과적인 코드는 아래와 같습니다.
ParquetAvroReader 는 Parquet 파일을 Avro Record 로 변환하는 FileSource 입니다.
아래는 Parquet FileSource 에 대한 공식문서이므로 참고해보시면 좋을 것 같습니다.
package com.westlife.jobs; import com.westlife.model.User; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.avro.AvroParquetReaders; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TestFileSource { public static void main (String[] args) throws Exception { buildParquetFileSource(); } private static void buildParquetFileSource() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final Schema schema = new Schema.Parser() .parse( "{\"type\": \"record\", " + "\"name\": \"User\", " + "\"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\" },\n" + " {\"name\": \"age\", \"type\": \"long\" }\n" + " ]\n" + " }"); final FileSource<GenericRecord> fileSource = FileSource.forRecordStreamFormat( AvroParquetReaders.forGenericRecord(schema), new Path("files/users.parquet")) .build(); DataStream<User> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source") .map(record -> new User(record.get("name").toString(), Integer.parseInt(record.get("age").toString()))); source.print(); env.execute(); } }
마치며.
여기까지 읽어주셔서 감사합니다.
저는 여러가지 Flink DataSource 들에 대해서 설명하는 글들을 작성하고 있는데요.
좀 더 자세히 알고싶으신 내용이 새롭게 알고 싶으시면 정보가 있으시면 댓글로 알려주시면 정말 좋을 것 같습니다.
반응형'Flink' 카테고리의 다른 글
[Flink] Window AllowedLateness 알아보기 (Watermark) (0) 2024.02.12 [Kryo] Kyro Serialization 알아보기 (0) 2024.02.05 [Flink] KafkaSource Connector 알아보기 (0) 2024.02.04 [Flink] Stateless Transform Operator 알아보기 (Map, Filter, FlatMap) (0) 2024.01.25 Flink Window 이해하기 (0) 2024.01.13