ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Async IO 알아보기 ( AsyncDataStream )
    Flink 2024. 3. 23. 06:48
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 Flink DataStream API 의 Async IO 에 대해서 알아보도록 하겠습니다.

    Async IO 는 Flink DataStream API 에서 제공하는 비동기처리를 위한 기술입니다.

    Java 가 기본적으로 제공하는 Future 모듈과 그 형식은 유사합니다.

    그렇기 때문에 N 개의 Input 을 동시에 처리할 수 있습니다.

    Map, FlatMap 같은 Stateless Operator 는 event by event 형식으로 동기적인 순서로 데이터를 처리하는 반면,

    Async IO 를 통하여 1개 이상의 데이터를 효율적으로 처리할 수 있게 됩니다.

     

    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

     

    이 이미지처럼 Async IO 를 사용하게 되면 Async IO Operator 인 AsyncDataStream 은 Capacity 로 설정된 용량만큼 Event 들을 수용할 수 있습니다.

    그래서 여러 이벤트를 동시에 처리할 수 있고, 이전 단계의 DataStream 에게 가해지는 Backpressure 부하를 줄일 수 있습니다.

    즉, 하나의 이벤트를 처리하는데에 1초가 걸린다고 할 때에

    N 개의 이벤트에 대하여 일반적인 Stateless Operator 인 Map, FlatMap 은 N 초가 걸리게 됩니다.

    하지만 Async IO 는 Capacity 범위만큼 처리 시간이 감소합니다.

     

    Async IO 는 보통 외부의 API Server 또는 데이터베이스와의 네트워크 통신을 효율적으로 수행하기 위해서 사용됩니다.

    Network IO 는 반드시 지연시간이 발생하기 마련이고 이러한 지연시간동안 네트워크 관련 태스트를 수행하는 쓰레드는 Waiting 상태가 됩니다.

    즉, CPU 를 점유하지 않고 Network 응답이 발생하기까지 대기하게 되죠.

    이를 Network Socket 관점에서 본다면 send 와 recv 사이의 시간적인 지연을 의미합니다.

    Async IO 는 이러한 지연 시간 동안에 다른 Task 를 수행하여 병렬적으로 수행됩니다.

     

    AsyncDataStream.

    Async IO 는 Flink DataStream 코드레벨에서 AsyncDataStream 으로 추상화되어 있습니다.

    그리고 Timeout, Capacity 과 설정을 가지는데요.

    이 설정들에 대해서 알아보도록 하겠습니다.

     

    아래의 예시 코드는 AsyncDataStream 을 테스트하기 위한 기본적인 프로그램을 구성하였습니다.

    package com.westlife.jobs;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.io.Serializable;
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    
    public class TestAsyncIO {
    
      public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source");
        DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 5).name("async-io").uid("async-io");
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }
    
      public static class IntegerSource implements SourceFunction<Integer> {
    
        private int delay;
        private int count;
    
        public IntegerSource (int delay, int count) {
          this.delay = delay;
          this.count = count;
        }
    
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
          int counter = 1;
          while (counter <= this.count) {
            ctx.collect(counter++);
            Thread.sleep(this.delay);
          }
        }
    
        @Override
        public void cancel() {}
      }
    
      public static class AsyncIO extends RichAsyncFunction<Integer, Integer> implements Serializable {
        @Override
        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
          System.out.println("AsyncIO Timeouts. value : " + input);
          resultFuture.complete(Collections.emptyList());
        }
    
        private void delay (int second) {
          try {
            Thread.sleep(second * 1000L);
          } catch (InterruptedException e) {
            System.out.printf("Failed to delay %ss %n", second);
          }
        }
    
        @Override
        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
          System.out.println("AsyncIO Processes. value : " + input);
          CompletableFuture.supplyAsync(() -> {
            delay(10);
            return input;
          }).thenAccept(integer -> resultFuture.complete(Collections.singletonList(integer)));
        }
      }
    
      public static class CustomPrint implements SinkFunction<Integer> {
        @Override
        public void invoke(Integer value, Context context) throws Exception {
          System.out.println("### print " + value);
        }
      }
    }

     

     

    그리고 데이터 스트림의 그래프는 아래와 같습니다.

     

    위 코드에 대한 설정을 간단히 드리겠습니다.

    1. Source 와 2. AsyncDataStream, 3. Print 3개로 구성된 Flink Application 입니다.

    Source 는 1부터 100까지의 숫자를 0.1초 간격으로 생성합니다.

    AsyncDataStream 은 Source 로 부터 전달받은 데이터를 Sink 로 전달하는데, 의도적으로 10초의 지연시간을 적용하였습니다.

    즉, async-io 는 데이터 하나를 처리하는데에 10초가 소요됩니다.

    그리고 Print Sink 는 단순히 입력값을 표준출력으로 출력합니다.

     

    Timeout.

    Timeout 설정은 Async IO 에서 실행되는 비동기 처리의 제한 시간을 설정합니다.

    Async IO 는 Capacity 로 설정한 N 개의 Input 을 동시에 처리할 수 있습니다.

    아래의 그림은 Async IO Operator 가 4개의 Input 을 동시에 처리하는 모습입니다.

    하지만 비동기 처리의 처리 시간이 너무 길어지면 전체적인 데이터 처리 흐름에 병목현상이 발생하게 됩니다.

    따라서 적절한 Timeout 을 설정하여 처리 시간이 긴 데이터를 제거하거나 예외 처리를 적용해야합니다.

     

     

    아래 Main Function 을 통해서 테스트를 수행하였습니다.

    AsyncDataStream 의 Timeout 을 5초로 설정하였습니다.

    하지만 AsyncIO 클래스는 각 Input 데이터를 처리하는데에 10초가 소요됩니다.

    즉, 모든 이벤트는 Timeout 의 대상이 됩니다.

      public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        
        // 1부터 100까지의 숫자 데이터가 생성됩니다. 
        DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source");
        
        // 1부터 100까지의 숫자 데이터를 처리합니다. 
        // 각 Input 을 처리하는데에 10초가 소요됩니다. 
        DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 5, TimeUnit.SECONDS, 100).name("async-io").uid("async-io");
        
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }

     

    위 Flink Application 의 실행 후 출력 결과는 아래와 같습니다.

    모든 이벤트는 Timeout 됩니다.

    AsyncIO Timeouts. value : 1
    AsyncIO Timeouts. value : 2
    AsyncIO Timeouts. value : 3
    AsyncIO Timeouts. value : 4
    AsyncIO Timeouts. value : 5
    AsyncIO Timeouts. value : 6
    AsyncIO Timeouts. value : 7
    AsyncIO Timeouts. value : 8
    AsyncIO Timeouts. value : 9
    AsyncIO Timeouts. value : 10
    AsyncIO Timeouts. value : 11
    // ...
    AsyncIO Timeouts. value : 94
    AsyncIO Timeouts. value : 95
    AsyncIO Timeouts. value : 96
    AsyncIO Timeouts. value : 97
    AsyncIO Timeouts. value : 98
    AsyncIO Timeouts. value : 99
    AsyncIO Timeouts. value : 100

     

     

    그럼 Timeout 은 왜 필요한 것일까요 ? 그냥 무한히 큰 값을 주면 되지 않을까요 ?

    만약 Timeout 이 매우 길어지게 되면 Source -> AsyncDataStream 사이에 병목이 발생하게 됩니다.

    아래의 이미지처럼 async-io 는 Busy 100% 상태가 되며, Source 의 데이터 생성 속도를 따라가지 못합니다.

     

     

    Capacity.

    AsyncDataStream 의 또 다른 설정은 Capacity 가 존재합니다.

    Capacity 설정은 동시에 처리할 수 있는 데이터의 수를 의미합니다.

    ThreadPool 에서 설정하는 Pool 사이즈와 유사한 개념입니다.

    만약 AsyncDataStream 을 Database 로부터 데이터를 조회하거나 생성하는 용도로 Capacity 와 ConnectionPool 의 사이즈를 함께 고려하는 것이 좋습니다.

    왜냐하면 Async IO 의 Capacity 수가 Connection 수보다 월등히 많게 되면, Connection 을 획득하는데에 시간이 소요되다보니

    Timeout 이 발생할 확률이 높습니다.

     

    Capacity 또한 Timeout 처럼 값이 크다고 능사가 아닙니다.

     

    Capacity 1 의 처리 속도.

    100개의 데이터는 Capacity 1 인 상태에서 얼마나 처리 시간이 드는지 확인해보겠습니다.

      public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source");
        DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 1).name("async-io").uid("async-io");
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }

     

    위 Flink Application 의 실행 출력 결과는 아래와 같습니다.

    처리 시간은 5분 이상 소요되었구요.

    Capacity 가 1 로 설정되었기 때문에 1개의 Input 을 전달받아 처리하게 됩니다.

    이는 Map, FlatMap 과 같은 여타 Sync Operator 와 다르지 않습니다.

    AsyncIO Processes. value : 1
    AsyncIO Processes. value : 2
    ### print 1
    AsyncIO Processes. value : 3
    ### print 2
    AsyncIO Processes. value : 4
    ### print 3
    AsyncIO Processes. value : 5
    ### print 4
    // ... 생략
    AsyncIO Processes. value : 97
    ### print 96
    AsyncIO Processes. value : 98
    ### print 97
    AsyncIO Processes. value : 99
    ### print 98
    AsyncIO Processes. value : 100
    ### print 99
    ### print 100

     

    Capacity 5 의 처리 속도.

    Capacity 5 의 설정은 동시에 5개의 입력 데이터를 처리할 수 있기 때문에 처리 시간이 단축됩니다.

    CPU Core 등 여러 변수가 존재하긴 하지만 3분 이래의 처리가 완료됩니다.

      public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source");
        DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 5).name("async-io").uid("async-io");
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }

     

    출력 결과는 아래와 같습니다.

    아래 로그와 같이 5개의 입력 데이터가 병렬 처리됩니다.

    AsyncIO Processes. value : 1
    AsyncIO Processes. value : 2
    AsyncIO Processes. value : 3
    AsyncIO Processes. value : 4
    AsyncIO Processes. value : 5
    AsyncIO Processes. value : 6
    ### print 1
    AsyncIO Processes. value : 7
    ### print 2
    AsyncIO Processes. value : 8
    ### print 3
    AsyncIO Processes. value : 9
    ### print 4
    AsyncIO Processes. value : 10
    ### print 5
    // ... 생략
    AsyncIO Processes. value : 97
    ### print 92
    AsyncIO Processes. value : 98
    ### print 93
    AsyncIO Processes. value : 99
    ### print 94
    AsyncIO Processes. value : 100
    ### print 95
    ### print 96
    ### print 97
    ### print 98
    ### print 99
    ### print 100

     

     

    마치며.

    AsyncDataStream 을 통해서 Network IO 작업이 많은 Task 의 최적화를 적용할 수 있습니다.

    Network IO 에 대한 시간 지연을 고려해야 Timeout 과 Capacity 를 적용한다면 데이터 처리 시간을 단축시킬 수 있습니다.

     

    Capacity 의 값은 반드시 Network Connection 수와 CPU Core 의 수를 고려해야합니다.

    Capacity 보다 Connection 의 수가 적으면 Async Task 에서 Connection 을 획득하는데에 시간이 소요됩니다.

    그리고 CPU Core 가 적게 되면 Capacity 가 늘어나도 Async Task 가 적절히 수행되지 못합니다.

    그래서 Timeout 이 발생할 확률이 커집니다.

     

     

     

    반응형
Designed by Tistory.