ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] JSON FileSink 만들어보기 (Row Format)
    Flink 2024. 2. 12. 10:12
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 Flink DataStream Connector 중에서 File Sink 에 대해서 다루어보려고 합니다.

    특히 JSON File 을 취급하는 File Sink 에 대해서 작성할 예정이구요.

    관련된 구성요소들과 특징들에 대해서 작성해보겠습니다.

     

     

    Row-encoded Format.

    JSON 파일의 특징은 텍스트 기반의 파일이면서 Row 단위로 데이터를 저장합니다.

    이는 csv 파일과 동일한 특징을 가집니다.

    FileSink Connector 는 인코딩을 수행하기 위해서 Row 또는 Bulk 기반의 포맷이 존재하며,

    JSON 파일은 Row 기반의 인코딩 포맷을 사용합니다.

     

    Stream Data 를 처리하는 Row Format FileSink 는 다음과 같이 구성됩니다.

    0 부터 1만까지의 숫자를 출력 파일에 작성하는 FileSink 프로그램입니다.

    public static void main (String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    env.setParallelism(1);
    DataStream<String> source = env.fromSequence(0, 10000)
            .map(i -> "" + i);
    
    final String outputPath = "/tmp/output";
    final FileSink<String> sink = FileSink
            .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
            .withRollingPolicy(DefaultRollingPolicy.builder().build())
            .withBucketAssigner(new BasePathBucketAssigner())
            .build();
    
    source.sinkTo(sink);
    env.execute();
    }

     

    위 프로그램의 결과로써 아래와 같은 구조로 파일이 생성되구요.

    /tmp/output 디렉토리 하위에 생성된 파일이 위치합니다.

    그리고 파일의 내용은 0부터 1만까지의 숫자가 출력됩니다.

    ├── output
    │   └── part-54441ce9-e98e-452f-a11a-40decca7549a-0
    0
    1
    2
    3
    4
    5
    6
    7
    8
    // ... 생략
    9998
    9999
    10000

     

     

    Json Encoder 구현하기.

    이제 Pojo 데이터를 Json 형식으로 File Sink 를 수행하기 위한 Json Encoder 를 구현해보겠습니다.

    Pojo 를 Json String 으로 Encoding 하기 위해서 Jackson 모듈을 사용하였습니다.

     

      public static class JSONEncoder<IN> implements Encoder<IN> {
        private ObjectMapper objectMapper;
    
        public JSONEncoder () {
          this.objectMapper = new ObjectMapper();
        }
    
        @Override
        public void encode(IN element, OutputStream stream) throws IOException {
          JsonNode json = objectMapper.convertValue(element, JsonNode.class);
          stream.write(json.toString().getBytes(Charset.forName("UTF-8")));
          stream.write('\n');
        }
      }

     

    실행 코드는 아래와 같습니다.

    1만개의 Event Pojo 를 생성하고, 이를 Json 으로 직렬화하여 파일로 저장합니다.

      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        DataStream<Event> source = env.fromSequence(0, 10000)
                .map(i -> new Event(
                        Instant.parse("2023-01-01T00:00:00.000Z").plus(i, ChronoUnit.HOURS).toEpochMilli(),
                        "" + i)
                );
        final String outputPath = "/tmp/output";
    
        final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder().build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .build();
    
        source.sinkTo(sink);
        env.execute();
      }
    
      public static class Event {
        public Long timestamp;
        public String data;
        public Event(Long timestamp, String data) {
          this.timestamp = timestamp;
          this.data = data;
        }
      }
    }

     

    < 파일 내용 >

    // ...생략
    {"timestamp":1704758400000,"data":"8952"}
    {"timestamp":1704762000000,"data":"8953"}
    {"timestamp":1704765600000,"data":"8954"}
    {"timestamp":1704769200000,"data":"8955"}
    {"timestamp":1704772800000,"data":"8956"}
    {"timestamp":1704776400000,"data":"8957"}
    {"timestamp":1704780000000,"data":"8958"}
    {"timestamp":1704783600000,"data":"8959"}
    {"timestamp":1704787200000,"data":"8960"}
    {"timestamp":1704790800000,"data":"8961"}
    // ...생략

     

     

    BucketAssigner.

    FileSink Connector 는 BucketAssigner 기능이 존재합니다.

    Bucket Assigner 는 생성된 파일들을 파일 구조로 저장할지를 결정합니다.

    기본적으로 제공되는 BucketAssigner 는 두가지이며,

    BasePathBucketAssigner 와 DateTimeBucketAssigner 입니다.

     

    BasePathBucketAssigner.

    BasePathBucketAssigner 는 Output Path 로 설정한 디렉토리 위치에 파일들을 저장합니다.

    추가적인 계층 구조는 존재하지 않습니다.

    만약 Output Path 가 "/tmp/output" 으로 설정되었다면, 생성되는 파일들은 아래 예시와 같이 output 디렉토리 하위에 위치하게 됩니다.

    ├── output
    │   ├── part-02e49d54-e7ee-431c-ba31-be38f3d65173-0
    │   ├── part-040ee39a-0f87-4445-b5aa-4fce9c88af30-0
    │   ├── part-04558ecf-79b9-4625-8b93-7f8f9b74f192-0
    │   ├── part-04c7b0d4-6970-445f-9653-1f0cf95d8d26-0
    │   ├── part-066f5615-a6a3-4762-8c6a-e12a84a7c7fd-0
    │   ├── part-097ea513-8a4b-4001-a047-792a75053348-0
    │   ├── part-0f881593-7bfc-4c2a-b089-9289e5ded16b-0
    │   ├── part-1799f67d-6575-46e2-8304-a035476f94df-0
    │   ├── part-1e5374cb-de6f-462d-8735-f9991c6bba1d-0
    │   ├── part-1f39fb32-cb2f-4c94-8da3-cdeec509b94a-0
    │   ├── part-1f56d2d4-0405-49f9-89bb-9c17e2e4d302-0
    │   ├── part-235c8c3e-b441-4b2e-a488-65e802eb5d9a-0
    │   ├── part-25cb31bc-1b0d-474b-8bd4-eb52caaa54da-0
    │   ├── part-27ce3ea0-fbed-4c86-87e6-aad95ec020ae-0
    │   ├── part-2861117c-e4f3-4e34-a49c-c0acb25d566d-0
    │   ├── part-28ba46aa-d11a-4624-b1b5-d324cb604ba6-0

     

     

    DateTimeBucketAssigner.

    Flink 는 Event Stream 을 처리하는 프레임워크이기 때문에 DateTime 과 관련하여 파일 위치를 설정하는 것이 유용합니다.

    DateTimeBucketAssigner 는 시간을 기준으로 Bucketing 을 수행하기 때문에 시간단위로 File 의 계층 구조를 구성할 수 있습니다.

     

    아래의 FileSink 는 DateTimeBucketAssigner 를 활용한 예시이구요.

    저는 분단위로 Bucket Assignment 를 적용하였습니다.

    final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder().build())
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd-HH-mm"))
                .build();

     

    결과는 아래 파일 계층구조처럼 분단위로 디렉토리가 설정됩니다.

    하지만 이는 현재 시간을 반영하기 때문에 Event Time Semantics 로 설정하는 방법이 아닙니다.

    ├── output
    │   ├── 2024-02-11-16-22
    │   │   ├── part-6047b1db-6360-47ae-9e7d-8a76edff8116-0
    │   │   ├── part-6047b1db-6360-47ae-9e7d-8a76edff8116-1
    │   │   └── part-6047b1db-6360-47ae-9e7d-8a76edff8116-2
    │   └── 2024-02-11-16-23
    │       └── part-27248823-7e91-430e-a73f-6d87587c4b31-0

     

     

    Custom Bucket Assigner 구현하기.

    Event Time 을 기준으로 Bucketing 을 수행하는 방법에 대해서 설명하도록 하겠습니다.

    아래 코드 예시는 Event Time 을 기준으로 파일 계층구조를 구성하는 FileSink 코드입니다.

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        DataStream<Event> source = env.fromSequence(0, 72)
                .map(i -> new Event(
                        Instant.parse("2023-01-01T00:00:00.000Z").plus(i, ChronoUnit.HOURS).toEpochMilli(),
                        "" + i)
                ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofHours(0))
                        .withTimestampAssigner((event, timestamp) -> event.timestamp));
    
        final String outputPath = "/tmp/output";
    
        final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder().build())
                .withBucketAssigner(new BucketAssigner<>() {
                  @Override
                  public String getBucketId(Event element, Context context) {
                    return LocalDateTime.ofInstant(Instant.ofEpochMilli(element.timestamp), ZoneOffset.UTC)
                            .truncatedTo(ChronoUnit.HOURS)
                            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
                  }
                  @Override
                  public SimpleVersionedSerializer<String> getSerializer() {
                    return SimpleVersionedStringSerializer.INSTANCE;
                  }
                })
                .build();
    
        source.sinkTo(sink);
        env.execute();
      }
    
      public static class Event {
        public Long timestamp;
        public String data;
        public Event(Long timestamp, String data) {
          this.timestamp = timestamp;
          this.data = data;
        }
      }
    
      public static class JSONEncoder<IN> implements Encoder<IN> {
        private ObjectMapper objectMapper;
    
        public JSONEncoder () {
          this.objectMapper = new ObjectMapper();
        }
    
        @Override
        public void encode(IN element, OutputStream stream) throws IOException {
          JsonNode json = objectMapper.convertValue(element, JsonNode.class);
          stream.write(json.toString().getBytes(Charset.forName("UTF-8")));
          stream.write('\n');
        }
      }

     

    ├── output
    │   ├── 2023-01-01-00
    │   │   └── part-c0e22d43-7641-41aa-93ac-f861034a1354-0
    │   ├── 2023-01-01-01
    │   │   └── part-2f92a2f2-49c6-41b3-95a4-3287d2b00223-0
    │   ├── 2023-01-01-02
    │   │   └── part-53592c1d-2ac6-443b-ad91-5ac5fd26a242-0
    │   ├── 2023-01-01-03
    │   │   └── part-8b99dbf7-f96c-446b-a132-e2854025363a-0
    │   ├── 2023-01-01-04
    │   │   └── part-23db3827-b655-469a-9e7b-a231114a61db-0
    // ... 생략
    │   ├── 2023-01-03-22
    │   │   └── part-e8c2f1ca-57fa-47e7-bc7c-14e9b1067a20-0
    │   ├── 2023-01-03-23
    │   │   └── part-e3c3b49f-679c-43d4-83db-ce3ffadcacaf-0
    │   └── 2023-01-04-00
    │       └── part-0b4fcc61-0cc3-4ab6-b629-7a7f9c6e1bb3-0

     

    FileSink 에 BucketAssigner 익명클래스를 통해서 구현합니다.

    getBucketId 함수의 내용은 Bucket 의 ID 를 생성하는 코드로 구성되는데요.

    저의 경우에는 Event 의 Timestamp 를 시간단위로 Truncated 하여 BucketID 로 사용하였습니다.

     

    final FileSink<Event> sink = FileSink
    .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
    .withRollingPolicy(DefaultRollingPolicy.builder().build())
    .withBucketAssigner(new BucketAssigner<>() {
      @Override
      public String getBucketId(Event element, Context context) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(element.timestamp), ZoneOffset.UTC)
                .truncatedTo(ChronoUnit.HOURS)
                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
      }
      @Override
      public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
      }
    })
    .build();

     

     

     

    RollingPolicy.

    이벤트 스트림 처리 분야에서 Rolling 이라는 표현이 자주 사용됩니다.

    이벤트 스트림이란 끝없는 데이터의 흐름이며, 이를 Unbounded 라고 표현합니다.

    Rolling 이라는 표현은 Unbounded Data Stream 을 Flush 할때에 사용되는 표현이고,

    Kafka, Flink Stream 등의 영역에서 사용됩니다.

    즉, 끝없는 이벤트 스트림에서 일부분의 데이터를 Write, Sink 하는 행위라고 생각하시면 됩니다.

    그래서 Rolling Policy 는 "이벤트 스트림의 데이터를 언제 DataSink 에 저장할래 ? " 의 설정입니다.

     

    Flink File Sink Connector 에서의 Rolling Policy 는 생성 중인 임시파일을 설정된 원격저장소에 영구적으로 저장하는 설정입니다.

    Flink 의 File 들은 In-Progress --> Finished 로의 상태 변화가 존재하는데,

    In-progress Files 들이 Rolling 되면서 원격 저장소로 영구적으로 옮겨집니다.

     

    RollingPolicy 는 두가지가 존재합니다.

    DefaultRollingPolicy 와 OnCheckpointRollingPolicy 입니다.

     

    DefaultRollingPolicy.

    DefaultRollingPolicy 는 용량 기준 그리고 시간 기준으로 FIle 을 생성합니다.

    용량 기준은 생성 중인 파일이 몇 바이트 이상이 되면 생성 중인 파일을 닫고 새로운 파일을 생성합니다.

    시간 기준도 유사합니다.

    정해진 기준의 시간동안 하나의 파일을 생성하고, 기준 시간이 지나면 새로운 파일을 생성합니다.

    DefaultRollingPolicy 는 용량과 시간 기준을 활용하여 File 의 갯수를 결정합니다.

    참고로 DefaultRollingPolicy 에서 의미하는 시간 기준은 Event Time Semantics 를 따르지 않습니다.

    시스템 시간을 따르니 이 점을 참고하셔야합니다.

     

    withMaxPartSize.

    먼저 용량 기준으로 어떻게 파일이 생성되는지에 대해서 알아보겠습니다.

    용량 기준을 설정하는 방법은 withMaxPartSize 함수를 사용하는 것입니다.

    코드 예시는 아래와 같습니다.

    1024 즉 1KB 기준으로 파일을 생성합니다.

    final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withInactivityInterval(Duration.ofMinutes(1000))
                        .withRolloverInterval(Duration.ofMinutes(1000))
                        .withMaxPartSize(MemorySize.parse("1024"))
                        .build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .build();

     

    먼저 990 byte 의 데이터를 처리한 결과, 아래와 같이 1개의 파일이 생성됩니다.

    990 part-1ca580bf-110f-42d6-8cc6-efe1ceeca9dc-0

     

    이번엔 3000 byte 정보의 데이터를 처리하였고, 아래와 같이 3개의 파일이 생성됩니다.

    첫번째 파일은 1030, 두번째 파일은 1040 의 크기를 가집니다.

    1030  part-0a781bce-671b-4b8f-b9c7-e3794601c5d2-0
    1040  part-0a781bce-671b-4b8f-b9c7-e3794601c5d2-1
     840  part-0a781bce-671b-4b8f-b9c7-e3794601c5d2-2

     

     

    withRolloverInterval.

    시간 기준으로 Rolling Update 를 설정하기 위해서는 withRolloverInterval 함수를 사용합니다.

    withRolloverInterval 는 파일을 생성하는 시간 기준입니다.

    예시 코드는 아래와 같구요.

    10초를 기준으로 파일이 생성됩니다.

    ( withMaxPartSize 는 넉넉히 1GB 로 설정하였습니다. )

        final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withInactivityInterval(Duration.ofSeconds(60))
                        .withRolloverInterval(Duration.ofSeconds(10))
                        .withMaxPartSize(MemorySize.parse("1073741824"))
                        .build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .build();

     

    < 생성된 파일 구조 >

    424M  part-ba6bf9d8-f617-4513-a46a-47c16f1ec5c3-0
     12M  part-ba6bf9d8-f617-4513-a46a-47c16f1ec5c3-1

     

    withInactivityInterval.

    withInactivityInterval 은 Open 된 파일이 어떠한 Write 나 Append 동작없이 Idle 상태로 지속되는 허용 시간을 의미합니다.

    즉, 파일이 생성되어 Open 되었는데 Inactivity 시간이 지속되면 파일을 닫아버립니다.

     

    테스트를 위해서 아래와 같은 1분 주기로 데이터를 생성하는 DataSource 를 사용하였구요.

    withInactivityInterval 는 30초로 설정하여 InactivityInterval 를 테스트하였습니다.

    3개의 데이터들이 InactivityInterval 을 넘어서는 지연시간이 존재하므로 3개의 파일이 생성됩니다.

     

        DataStream<Event> source = env.addSource(new SourceFunction<Event>() {
          @Override
          public void run(SourceContext<Event> ctx) throws Exception {
            Event event = new Event(
                        Instant.parse("2023-01-01T00:00:00.000Z").toEpochMilli(),
                        "1");
            ctx.collect(event);
            Thread.sleep(1000 * 60);
            ctx.collect(event);
            Thread.sleep(1000 * 60);
            ctx.collect(event);
            Thread.sleep(1000 * 60);
          }
    
          @Override
          public void cancel() {
    
          }
        });
    final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withInactivityInterval(Duration.ofSeconds(30))
                        .withRolloverInterval(Duration.ofSeconds(120))
                        .withMaxPartSize(MemorySize.parse("1073741824"))
                        .build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .build();

     

    < 생성된 파일 구조 >

    39B  2 12 20:07 part-9ef84e4a-c1ba-470f-bb7f-f7346397f983-0
    39B  2 12 20:08 part-9ef84e4a-c1ba-470f-bb7f-f7346397f983-1
    39B  2 12 20:09 part-9ef84e4a-c1ba-470f-bb7f-f7346397f983-2

     

     

    OnCheckpointRollingPolicy.

    OnCheckpointRollingPolicy 는 가장 일반적으로 사용되는 RollingUpdate 방식입니다.

    Checkpoint 를 사용하기 때문에 내결함성이나 Exactly Once 를 구현할 수 있는 장점이 존재합니다.

    아래 그림과 같이 Checkpoint 가 생성되면서 File 또한 Rolling Update 가 됩니다.

     

    Checkpoint 와 OnCheckpointRollingPolicy 를 적용한 예시 코드는 아래와 같습니다.

      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        env.enableCheckpointing(1000 * 10);
        env.getCheckpointConfig().setCheckpointStorage(new Path("file:///tmp/checkpoint"));
        DataStream<Event> source = env.fromSequence(0, 10000000)
                .map(i -> new Event(
                        Instant.parse("2023-01-01T00:00:00.000Z").plus(i, ChronoUnit.HOURS).toEpochMilli(),
                        "" + i)
                ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofHours(0))
                        .withTimestampAssigner((event, timestamp) -> event.timestamp));
    
        final String outputPath = "/tmp/output";
        final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .build();
    
        source.sinkTo(sink);
        env.execute();
      }
    
      public static class Event {
        public Long timestamp;
        public String data;
        public Event(Long timestamp, String data) {
          this.timestamp = timestamp;
          this.data = data;
        }
      }
    
      public static class JSONEncoder<IN> implements Encoder<IN> {
        private ObjectMapper objectMapper;
    
        public JSONEncoder () {
          this.objectMapper = new ObjectMapper();
        }
    
        @Override
        public void encode(IN element, OutputStream stream) throws IOException {
          JsonNode json = objectMapper.convertValue(element, JsonNode.class);
          stream.write(json.toString().getBytes(Charset.forName("UTF-8")));
          stream.write('\n');
        }
      }

     

    < 생성된 파일 구조 >

    47M  part-ac41c832-1ec0-4488-91f5-14949acbe532-0
    79M  part-ac41c832-1ec0-4488-91f5-14949acbe532-1
    80M  part-ac41c832-1ec0-4488-91f5-14949acbe532-2
    78M  part-ac41c832-1ec0-4488-91f5-14949acbe532-3
    80M  part-ac41c832-1ec0-4488-91f5-14949acbe532-4
    71M  part-ac41c832-1ec0-4488-91f5-14949acbe532-5

     

     

    Output File Configuration.

    생성되는 파일의 이름과 압축 방식 등을 설정할 수 있는 withOutputFileConfig 함수가 제공됩니다.

    아래와 같은 형식으로 파일의 Prefix, Suffix 를 설정할 수 있습니다.

    final String outputPath = "/tmp/output";
        final FileSink<Event> sink = FileSink
                .forRowFormat(new Path(outputPath), new JSONEncoder<Event>())
                .withRollingPolicy(DefaultRollingPolicy.builder().build())
                .withBucketAssigner(new BasePathBucketAssigner())
                .withOutputFileConfig(OutputFileConfig.builder()
                        .withPartSuffix(".json")
                        .withPartPrefix("test")
                        .build())
                .build();

     

    생성된 파일명은 아래와 같이 test Prefix 와 .json Suffix 가 추가됩니다.

    test-3d5a3513-ebcb-4ef2-b1a7-a96f90c1222a-0.json

     

     

    마치며.

    읽어주셔서 감사합니다.

    혹시나 잘못된 내용일 있으면 피드백을 받아 수정하도록 하겠습니다.

     

     

    반응형
Designed by Tistory.