ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Async IO Retry Strategy 알아보기
    Flink 2024. 3. 23. 10:50
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글은 Flink Async IO 의 Retry Strategy 에 대해서 알아보려고 합니다.

    이전에 작성하였던 https://westlife0615.tistory.com/781 글과 이어지는 내용입니다.

     

     

    [Flink] Async IO 알아보기 ( AsyncDataStream )

    - 목차 들어가며. 이번 글에서는 Flink DataStream API 의 Async IO 에 대해서 알아보도록 하겠습니다. Async IO 는 Flink DataStream API 에서 제공하는 비동기처리를 위한 기술입니다. Java 가 기본적으로 제공하

    westlife0615.tistory.com

     

    Async IO 는 네트워크 통신을 수행하는 Task 를 효율적으로 수행하기 위해서 주로 사용됩니다.

    네트워크 통신은 필연적으로 지연 시간이 발생합니다.

    원격의 서버에게 트래픽을 전달하고, 원격 서버가 클라이언트의 요청을 처리하는데에 드는 시간이 존재하기 때문입니다.

    이 시간이 1초라고 가정한다면, Flink Application 은 1초간 Waiting 상태에 머뭅니다.

    그래서 Flink Application 은 Async IO 를 통해서 지연 시간동안 최대한 많은 요청을 보냅니다.

     

    원격 서버와의 통신은 에러가 발생할 확률이 높습니다.

    인터넷망이라는 변수가 존재하기 때문이죠.

    그래서 Retry 전략을 설정하여 Network Call 을 Retry 하도록 설정합니다.

     

     

    Retry Strategy 설정하기.

     

    아래 Flink Application 은 Retry 전략을 테스트하기 위해서 구성한 코드예시입니다.

    Source, AsyncDataStream, PrintSink 세가지로 구성됩니다.

    Source 는 1부터 10까지의 데이터를 생성하구요.

    AsyncDataStream 은 오직 홀수 값만 처리할 수 있습니다.

    그리고 PrintSink 는 단순히 표준출력으로 로그를 생성합니다.

     

    package com.westlife.jobs;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
    import org.apache.flink.streaming.util.retryable.RetryPredicates;
    
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    
    public class TestAsyncIO {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
    
        AsyncRetryStrategy asyncRetryStrategy =
                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L)
                        .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
                        .build();
    
        DataStream<Integer> source = env.addSource(new CustomSource(1, 10)).name("source").uid("source");
        DataStream<Integer> stream = AsyncDataStream.unorderedWaitWithRetry(source, new AsyncIO(), 5, TimeUnit.SECONDS, 5, asyncRetryStrategy).name("async-io").uid("async-io");
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }
    
      public static class CustomSource implements SourceFunction<Integer> {
        int from;
        int to;
        public CustomSource (int from, int to) {
          this.from = from;
          this.to = to;
        }
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
          for (int i = from; i <= to; i++) {
            ctx.collect(i);
            Thread.sleep(100L);
          }
          Thread.sleep(60 * 1000L);
        }
    
        @Override
        public void cancel() {
    
        }
      }
    
      public static class AsyncIO extends RichAsyncFunction<Integer, Integer> {
        @Override
        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
          System.out.println("AsyncIO Timeouts. value : " + input);
          resultFuture.complete(Collections.emptyList());
        }
    
        @Override
        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
          CompletableFuture.supplyAsync(() -> {
            System.out.println("AsyncIO is Processing value " + input);
            if (input % 2 == 0) {
              System.out.println("AsyncIO is Throwing EvenNumberException value " + input);
              return null;
            }
            return input;
          }).thenAccept(integer -> {
            if (integer == null) resultFuture.complete(Collections.emptyList());
            else resultFuture.complete(Collections.singletonList(integer));
          });
        }
      }
    
      public static class CustomPrint implements SinkFunction<Integer> {
        @Override
        public void invoke(Integer value, Context context) {
          System.out.println("### print " + value);
        }
      }
    }

     

    위 프로그램을 실행하게 되면 아래의 로그가 생성됩니다.

    "### print 1, ### print 3, ### print 5, ### print 7, ### print 9"

    와 같이 홀수 값만이 PrintSink 로 넘어갈 수 있고, 짝수는 AsyncDataStream 에 의해서 필터링됩니다.

    AsyncIO is Processing value 1
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    ### print 1
    AsyncIO is Processing value 3
    ### print 3
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 4
    AsyncIO is Throwing EvenNumberException value 4
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 5
    ### print 5
    // ... 생략
    ### print 7
    // ... 생략
    ### print 9
    // ... 생략
    AsyncIO is Throwing EvenNumberException value 10
    AsyncIO is Processing value 10
    AsyncIO is Throwing EvenNumberException value 10

     

     

    ifResult.

    AsyncRetryStrategy 객체는 아래와 같이 생성합니다.

    저는 Retry 제한 횟수는 3번으로 설정했고, Retry 사이에 Backoff 시간은 1초로 설정하였습니다.

    그리고 ifResult 이라는 Builder 함수를 통해서 Retry 조건을 생성할 수 있는데요.

    제가 구성한 AsyncDataStream 은 입력값이 짝수인 경우에 Collections.emptyList() 을 반환합니다.

    그리고 ifResult 의 인자로써 RetryPredicates.EMPTY_RESULT_PREDICATE 를 적용함으로써

    빈 값이 반환되는 경우에 Retry 가 실행됩니다.

        AsyncRetryStrategy asyncRetryStrategy =
                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L)
                        .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
                        .build();

     

    아래 로그를 보시면 AsyncIO Processes. value : 2 이 두차례 실행됨을 확인할 수 있습니다.

    아래와 같이 입력값인 2 는 3차례의 Retry 가 수행됩니다.

    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2

     

     

    ifException.

    ifException Builder 함수를 통해서 Retry 조건을 설정할 수도 있습니다.

    ifException 을 적용하는 방식을 아래와 같습니다.

     

    ifException 은 모든 retry 시도 동안에 예외를 처리하지 못하면 Application 전역으로 Exception 이 Throw 되기 때문에

    적절한 예외 처리가 필요합니다.

     

    package com.westlife.jobs;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
    import org.apache.flink.streaming.util.retryable.RetryPredicates;
    
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    
    public class TestAsyncIO {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
    
        AsyncRetryStrategy asyncRetryStrategy =
                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L)
                        .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
                        .build();
    
        DataStream<Integer> source = env.addSource(new CustomSource(1, 10)).name("source").uid("source");
        DataStream<Integer> stream = AsyncDataStream.unorderedWaitWithRetry(source, new AsyncIO(), 5, TimeUnit.SECONDS, 5, asyncRetryStrategy).name("async-io").uid("async-io");
        stream.rebalance().addSink(new CustomPrint()).name("print").uid("print");
        env.execute();
      }
    
      public static class CustomSource implements SourceFunction<Integer> {
        int from;
        int to;
        public CustomSource (int from, int to) {
          this.from = from;
          this.to = to;
        }
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
          for (int i = from; i <= to; i++) {
            ctx.collect(i);
            Thread.sleep(100L);
          }
          Thread.sleep(60 * 1000L);
        }
    
        @Override
        public void cancel() {
    
        }
      }
    
      public static class AsyncIO extends RichAsyncFunction<Integer, Integer> {
        @Override
        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
          System.out.println("AsyncIO Timeouts. value : " + input);
          resultFuture.complete(Collections.emptyList());
        }
    
        @Override
        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
          CompletableFuture.supplyAsync(() -> {
            System.out.println("AsyncIO is Processing value " + input);
            if (input % 2 == 0) {
              System.out.println("AsyncIO is Throwing EvenNumberException value " + input);
              throw new EvenNumberException();
            }
            return input;
          }).thenAccept(integer -> {
            resultFuture.complete(Collections.singletonList(integer));
          }).exceptionallyAsync(throwable -> {
            resultFuture.completeExceptionally(throwable);
            return null;
          });
        }
      }
    
      public static class CustomPrint implements SinkFunction<Integer> {
        @Override
        public void invoke(Integer value, Context context) {
          System.out.println("### print " + value);
        }
      }
    
      public static class EvenNumberException extends RuntimeException {
        public EvenNumberException () {
          super("Even Number Can not be processed!");
        }
      }
    }

     

    입력값 2 에 대한 출력 로그는 아래와 같습니다.

    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2
    AsyncIO is Processing value 2
    AsyncIO is Throwing EvenNumberException value 2

     

     

    마치며.

    이번 글에서는 AsyncDataStream 의 Retry 전략에 대해서 알아보았습니다.

    저는 개인적으로 Flink Async IO 의 Retry 보다는 Future 의 Retry 나 함수 자체를 Retry 할 수 있도록

    코드를 작성하는 것이 좋다고 생각합니다.

    그래서 개인적으로는 Flink Async IO 는 잘 사용하진 않지만, 여러 Use Case 가 존재하기 때문에

    적절한 사용이 필요하다고 생각합니다.

    감사합니다.

    반응형
Designed by Tistory.