ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] FileSource 알아보기 (json, csv, parquet, avro)
    Flink 2024. 2. 4. 19:53
    728x90
    반응형

    - 목차

     

    들어가며.

    Flink 는 File IO 를 위한 Source 로 FileSource class 를 가집니다.

    FileSource 는 File Read 를 위한 세가지 Factory Method 를 가지는데요.

    forRecordStreamFormat, forBulkFileFormat, forRecordFileFormat 에 해당하는 Method 들을 가집니다.

    단순한 텍스트 파일부터 JSON, CSV 포맷의 파일과 Avro, Parquet 같은 Binary File 을 처리하는 FileSource 에 대해서 알아보려고 합니다.

     

    File Source 의 Fully Qualifed Name 은 org.apache.flink.connector.file.src.reader.FileSource 이구요.

    1.16.1 버전의 모듈을 사용할 예정입니다.

    // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files
    implementation group: 'org.apache.flink', name: 'flink-connector-files', version: '1.16.1'

     

     

     

    File DataSource 의 구성요소 살펴보기.

    DataSource 는 SourceSplit, SourceReader, SourceEnumerator 로 구성됩니다.

    이들의 관계를 그림으로 나타내면 아래와 같습니다.

    출처 : https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/sources/

     

    SourceSplit 은 IO 의 대상이 될 수 있는 가장 작은 단위입니다.

    만약 HDFS 나 s3a 처럼 여러 파일들을 읽게 된다면, Split 은 하나의 파일입니다.

    그리고 Kafka 를 DataSource 로 연결하는 경우에는 Partition 이 SourceSplit 이 됩니다.

    만약에 하나의 파일을 잘라서 각 Chunk 단위를 Split 으로 삼을 수도 있습니다.

    ( 예를 들어 Split-1 : 1 ~ 100 line, Split-2 : 101 ~ 200 line, Split-3 : 201 ~ 300 line ... 이런 느낌으로 말이죠.)

     

    이는 구현하는 프로그래머의 몫입니다.

    다만 FIle 또는 Partition 이 흔히 적용되는 Split 의 단위라는 것만 알고 계시면 좋을 것 같습니다.

    FileSource 의 경우의 SourceSplit 은 FilePath, Offset, Length 와 같은 정보로 구현됩니다.

     

    그리고 SplitReader 는 SourceSplit 을 조회해서 이를 역직렬화하고 DownStream 으로 흘려보냅니다.

    예를 들어, 아래의 이미지처럼 File SourceSplit 의 정보를 토대로 File 을 읽어들여 Buffer 를 구성합니다.

    그리고 그 Buffer Read 하고 Deserialize 합니다.

    모든 SourceSplit 들을 처리할 때까지 이러한 과정을 반복합니다.

     

    이어서 설명하겠지만, TextFile 의 SourceReader 는 단순한 BufferedReader 입니다.

    즉, Line-by-Line 으로 Text 을 읽어들이는 단순한 구조입니다.

     

    마지막으로 SourceEnumerator 는 JobManager 에서 TaskManager 의 SourceReader 에게 Split 을 제공합니다.

     

    StreamFormat, BulkFormat.

    FileSource 외에도 KafkaSource, Hadoop Presto Source 등이 존재합니다.

    StreamFormat 과 BulkFormat 의 FileSource 만의 클래스이고,

    StreamFormat 과 BulkFormat 내부에 SourceReader 를 정의하는 구조를 취합니다.

    즉, SourceReader 를 가지는 Wrapper 또는 Factory 라고 생각하셔도 됩니다.

    혹시나 StreamFormat 과 BulkFormat 을 접하시고 혼란을 느끼실까봐 이런 내용을 작성하였습니다.

     

    Text FileSource.

    이제 본격적으로 FileSource 의 다양한 케이스에 대해서 알아보겠습니다.

    먼저 단순한 TextFile 를 읽어들이는 Flink DataStream 프로그램은 아래와 같이 구성됩니다.

     

    < test.txt >

    123
    456
    789
    123
    456
    789
    123
    456
    789
    123
    456
    789

     

    < Flink TestFileSource.java >

    package com.westlife.jobs;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class TestFileSource {
      public static void main (String[] args) throws Exception {
        buildTextFileSource();
      }
    
      private static void buildTextFileSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("files/test.txt")).build();
        DataStream<String> source = env.<String>fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source");
        source.print();
        env.execute();
      }
    }

     

     

    단순한 File 을 DataSource 로 사용할 때에는 TextLineInputFormat 을 사용합니다.

    TextLineInputFormat 은 StreamFormat 을 구현한 클래스이구요.

    StreamFormat 은 SourceReader 를 정의한 ? 구현한 ? Wrapper 또는 Factory 라고 말씀드렸죠 ?

    TextLineInputFormat 은 이름에서 알 수 있듯이, Line-By-Line 으로 텍스트 파일을 읽어들이는 기능을 가집니다.

    그리고 Java IO 에서 파일을 Line-By-Line 으로 읽어들이는 역할을 담당하는 것이 BufferedReader 죠 ?

    그래서 TextLineInputFormat 은 단순히 BufferedReader 를 SourceReader 로 취합니다.

    아래 캡쳐본은 TextLineInputFormat 이 정의하는 SourceReader 의 내용입니다.

     

     

    그리고 FileSourceSplit 이라는 Class 를 SourceSplit 으로 사용합니다.

    FileSourceSplit 의 Full Name 은 org.apache.flink.connector.file.src.FileSourceSplit 이구요.

    FilePath, File Position, File Length 등의 정보를 가집니다.

    위 세가지 정보만 있으면 FileInputStream 을 만들 수 있고, 현재 어떤 Position 까지 읽었는지 등의 정보를 다 알 수 있겠죠 ?

     

    Flink Program 의 출력은 아래와 같습니다.

    4> 123
    4> 456
    4> 789
    4> 123
    4> 456
    4> 789
    4> 123
    4> 456
    4> 789
    4> 123
    4> 456
    4> 789

     

     

     

    Json FileSource.

    아래 코드는 TextLineInputFormat 을 토대로 구현한 JsonInputFormat 파일입니다.

    Jackson Library 를 추가하여 Deserialization 기능만 추가하였습니다.

    package com.westlife.connector.file;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.node.ObjectNode;
    import org.apache.flink.api.common.typeinfo.*;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.connector.file.src.reader.*;
    import org.apache.flink.core.fs.FSDataInputStream;
    
    import javax.annotation.Nullable;
    import java.io.*;
    
    public class JsonInputFormat extends SimpleStreamFormat<ObjectNode> {
      private static final long serialVersionUID = 1L;
    
      public static final String DEFAULT_CHARSET_NAME = "UTF-8";
    
      private final String charsetName;
    
      public JsonInputFormat() {
        this(DEFAULT_CHARSET_NAME);
      }
    
      public JsonInputFormat(String charsetName) {
        this.charsetName = charsetName;
      }
    
      @Override
      public JsonInputFormat.JsonFormatReader createReader(Configuration config, FSDataInputStream stream) throws IOException {
        final BufferedReader reader =
                new BufferedReader(new InputStreamReader(stream, charsetName));
        return new JsonInputFormat.JsonFormatReader(reader);
      }
    
      @Override
      public TypeInformation<ObjectNode> getProducedType() {
        return Types.GENERIC(ObjectNode.class);
      }
    
      public static final class JsonFormatReader implements StreamFormat.Reader<ObjectNode> {
    
        private final BufferedReader reader;
        private final ObjectMapper objectMapper;
    
        JsonFormatReader(final BufferedReader reader) {
          this.reader = reader;
          this.objectMapper = new ObjectMapper();
        }
    
        @Nullable
        @Override
        public ObjectNode read() throws IOException {
          String content = reader.readLine();
          if (content == null) return null;
          return (ObjectNode) this.objectMapper.readTree(content);
        }
    
        @Override
        public void close() throws IOException {
          reader.close();
        }
      }
    }

     

    그리고 아래와 같이 Flink FileSource 의 StreamFormat 으로 JsonInputFormat 을 사용하였습니다.

    package com.westlife.jobs;
    
    import com.fasterxml.jackson.databind.node.BaseJsonNode;
    import com.fasterxml.jackson.databind.node.ObjectNode;
    import com.westlife.connector.file.JsonInputFormat;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class TestFileSource {
      public static void main (String[] args) throws Exception {
        buildJsonFileSource();
      }
    
      private static void buildJsonFileSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FileSource<ObjectNode> fileSource = FileSource.forRecordStreamFormat(new JsonInputFormat(), new Path("files/users.json")).build();
        DataStream<String> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source").map(BaseJsonNode::toString);
        source.print();
        env.execute();
      }
    }

     

    < 출력 >

    2> {"name":"Andy","age":10}
    2> {"name":"Brad","age":12}
    2> {"name":"Chris","age":42}
    2> {"name":"Daniel","age":32}
    2> {"name":"Emma","age":22}

     

     

    CSV FileSource.

    CSV File 를 DataSource 로 사용하기 위한 Connector Module 이 존재합니다.

    // https://mvnrepository.com/artifact/org.apache.flink/flink-csv
    implementation group: 'org.apache.flink', name: 'flink-csv', version: '1.16.1'

     

    먼저 아래와 같이 csv 파일을 Deserialization 할 Pojo 타입이 필요하고,

    package com.westlife.model;
    
    import java.io.Serializable;
    
    public class User implements Serializable {
    
      private String name;
    
      private Integer age;
    
      public String getName() {return this.name;};
      public Integer getAge() {return this.age;};
      public void setName(String name) {this.name = name;};
      public void setAge(Integer age) {this.age = age;};
    
      public String toString() {return String.format("%s, %s", this.name, this.age);};
    }

     

    아래와 같이 CsvReaderFormat 인 StreamFormat 으로부터 FileSource 를 구현할 수 있습니다.

    forSchema, forPojo 와 같인 Helper Function 들이 존재합니다.

    저는 Schema 와 Pojo 클래스를 활용하는 forSchema 를 사용하였습니다.

    package com.westlife.jobs;
    
    import com.westlife.model.User;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.csv.CsvReaderFormat;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class TestFileSource {
      public static void main (String[] args) throws Exception {
        buildCSVFileSource();
      }
    
      private static void buildCSVFileSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CsvSchema schema = CsvSchema.builder()
                .setUseHeader(true)
                .addColumn("name", CsvSchema.ColumnType.STRING)
                .addColumn("age", CsvSchema.ColumnType.NUMBER).build();
        FileSource<User> fileSource = FileSource.forRecordStreamFormat(CsvReaderFormat.forSchema(schema, TypeInformation.of(User.class)), new Path("files/users.csv")).build();
        DataStream<User> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source");
        source.print();
        env.execute();
      }
    }

     

    < users.csv >

    name,age
    Andy,10
    Brad,12
    Chris,42
    Daniel,32
    Emma,22

     

     

    Parquet FileSource.

    아래 코드는 users.parquet 파일을 생성하는 Python 코드입니다.

    import pandas as pd
    
    ls = [("Andy", 32), ("Brad", 44), ("Chris", 23), ("Daniel", 19)]
    df = pd.DataFrame(ls, columns=["name", "age"])
    df.to_parquet("/tmp/users.parquet")

     

    생성된 users.parquet 파일의 Schema 는 아래와 같이 구성됩니다.

    {
      "type" : "record",
      "name" : "schema",
      "fields" : [ {
        "name" : "name",
        "type" : [ "null", "string" ],
        "default" : null
      }, {
        "name" : "age",
        "type" : [ "null", "long" ],
        "default" : null
      } ]
    }

     

    그리고 Parquet FileSource 를 구성하기 위해서 사용한 Dependencies 는 아래와 같습니다.

    총 3개의 Module 을 사용하였는데요.

    flink-parquet 와 parquet-avro 모듈은 Parquet FileSource 와 관련된 클래스들이 존재하며,

    hadoop-client 는 Hadoop 과 관련된 설정을 위해서 필요합니다.

    저의 경우에는 hdfs 를 사용하지 않았지만, 기본적으로 hdfs 에 관한 설정이 요구되므로 hadoop-client 모듈이 필요합니다.

    // https://mvnrepository.com/artifact/org.apache.flink/flink-parquet
    implementation group: 'org.apache.flink', name: 'flink-parquet', version: '1.16.1'
    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro
    implementation('org.apache.parquet:parquet-avro:1.12.2')
    implementation group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.4.1'

     

    결과적인 코드는 아래와 같습니다.

    ParquetAvroReader 는 Parquet 파일을 Avro Record 로 변환하는 FileSource 입니다.

     

    아래는 Parquet FileSource 에 대한 공식문서이므로 참고해보시면 좋을 것 같습니다.

    https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/

     

    Parquet

    Parquet format # Flink supports reading Parquet files, producing Flink RowData and producing Avro records. To use the format you need to add the flink-parquet dependency to your project: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-pa

    nightlies.apache.org

     

    package com.westlife.jobs;
    
    import com.westlife.model.User;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class TestFileSource {
      public static void main (String[] args) throws Exception {
        buildParquetFileSource();
      }
    
      private static void buildParquetFileSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final Schema schema =
                new Schema.Parser()
                        .parse(
                                "{\"type\": \"record\", "
                                        + "\"name\": \"User\", "
                                        + "\"fields\": [\n"
                                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                                        + "        {\"name\": \"age\",  \"type\": \"long\" }\n"
                                        + "    ]\n"
                                        + "    }");
    
        final FileSource<GenericRecord> fileSource =
                FileSource.forRecordStreamFormat(
                                AvroParquetReaders.forGenericRecord(schema), new Path("files/users.parquet"))
                        .build();
        DataStream<User> source = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source")
                .map(record -> new User(record.get("name").toString(), Integer.parseInt(record.get("age").toString())));
        source.print();
        env.execute();
      }
    }

     

     

    마치며.

    여기까지 읽어주셔서 감사합니다.

    저는 여러가지 Flink DataSource 들에 대해서 설명하는 글들을 작성하고 있는데요.

    좀 더 자세히 알고싶으신 내용이 새롭게 알고 싶으시면 정보가 있으시면 댓글로 알려주시면 정말 좋을 것 같습니다.

     

    반응형
Designed by Tistory.