-
[Flink] Async IO 알아보기 ( AsyncDataStream )Flink 2024. 3. 23. 06:48728x90반응형
- 목차
들어가며.
이번 글에서는 Flink DataStream API 의 Async IO 에 대해서 알아보도록 하겠습니다.
Async IO 는 Flink DataStream API 에서 제공하는 비동기처리를 위한 기술입니다.
Java 가 기본적으로 제공하는 Future 모듈과 그 형식은 유사합니다.
그렇기 때문에 N 개의 Input 을 동시에 처리할 수 있습니다.
Map, FlatMap 같은 Stateless Operator 는 event by event 형식으로 동기적인 순서로 데이터를 처리하는 반면,
Async IO 를 통하여 1개 이상의 데이터를 효율적으로 처리할 수 있게 됩니다.
이 이미지처럼 Async IO 를 사용하게 되면 Async IO Operator 인 AsyncDataStream 은 Capacity 로 설정된 용량만큼 Event 들을 수용할 수 있습니다.
그래서 여러 이벤트를 동시에 처리할 수 있고, 이전 단계의 DataStream 에게 가해지는 Backpressure 부하를 줄일 수 있습니다.
즉, 하나의 이벤트를 처리하는데에 1초가 걸린다고 할 때에
N 개의 이벤트에 대하여 일반적인 Stateless Operator 인 Map, FlatMap 은 N 초가 걸리게 됩니다.
하지만 Async IO 는 Capacity 범위만큼 처리 시간이 감소합니다.
Async IO 는 보통 외부의 API Server 또는 데이터베이스와의 네트워크 통신을 효율적으로 수행하기 위해서 사용됩니다.
Network IO 는 반드시 지연시간이 발생하기 마련이고 이러한 지연시간동안 네트워크 관련 태스트를 수행하는 쓰레드는 Waiting 상태가 됩니다.
즉, CPU 를 점유하지 않고 Network 응답이 발생하기까지 대기하게 되죠.
이를 Network Socket 관점에서 본다면 send 와 recv 사이의 시간적인 지연을 의미합니다.
Async IO 는 이러한 지연 시간 동안에 다른 Task 를 수행하여 병렬적으로 수행됩니다.
AsyncDataStream.
Async IO 는 Flink DataStream 코드레벨에서 AsyncDataStream 으로 추상화되어 있습니다.
그리고 Timeout, Capacity 과 설정을 가지는데요.
이 설정들에 대해서 알아보도록 하겠습니다.
아래의 예시 코드는 AsyncDataStream 을 테스트하기 위한 기본적인 프로그램을 구성하였습니다.
package com.westlife.jobs; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.io.Serializable; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class TestAsyncIO { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source"); DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 5).name("async-io").uid("async-io"); stream.rebalance().addSink(new CustomPrint()).name("print").uid("print"); env.execute(); } public static class IntegerSource implements SourceFunction<Integer> { private int delay; private int count; public IntegerSource (int delay, int count) { this.delay = delay; this.count = count; } @Override public void run(SourceContext<Integer> ctx) throws Exception { int counter = 1; while (counter <= this.count) { ctx.collect(counter++); Thread.sleep(this.delay); } } @Override public void cancel() {} } public static class AsyncIO extends RichAsyncFunction<Integer, Integer> implements Serializable { @Override public void timeout(Integer input, ResultFuture<Integer> resultFuture) { System.out.println("AsyncIO Timeouts. value : " + input); resultFuture.complete(Collections.emptyList()); } private void delay (int second) { try { Thread.sleep(second * 1000L); } catch (InterruptedException e) { System.out.printf("Failed to delay %ss %n", second); } } @Override public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) { System.out.println("AsyncIO Processes. value : " + input); CompletableFuture.supplyAsync(() -> { delay(10); return input; }).thenAccept(integer -> resultFuture.complete(Collections.singletonList(integer))); } } public static class CustomPrint implements SinkFunction<Integer> { @Override public void invoke(Integer value, Context context) throws Exception { System.out.println("### print " + value); } } }
그리고 데이터 스트림의 그래프는 아래와 같습니다.
위 코드에 대한 설정을 간단히 드리겠습니다.
1. Source 와 2. AsyncDataStream, 3. Print 3개로 구성된 Flink Application 입니다.
Source 는 1부터 100까지의 숫자를 0.1초 간격으로 생성합니다.
AsyncDataStream 은 Source 로 부터 전달받은 데이터를 Sink 로 전달하는데, 의도적으로 10초의 지연시간을 적용하였습니다.
즉, async-io 는 데이터 하나를 처리하는데에 10초가 소요됩니다.
그리고 Print Sink 는 단순히 입력값을 표준출력으로 출력합니다.
Timeout.
Timeout 설정은 Async IO 에서 실행되는 비동기 처리의 제한 시간을 설정합니다.
Async IO 는 Capacity 로 설정한 N 개의 Input 을 동시에 처리할 수 있습니다.
아래의 그림은 Async IO Operator 가 4개의 Input 을 동시에 처리하는 모습입니다.
하지만 비동기 처리의 처리 시간이 너무 길어지면 전체적인 데이터 처리 흐름에 병목현상이 발생하게 됩니다.
따라서 적절한 Timeout 을 설정하여 처리 시간이 긴 데이터를 제거하거나 예외 처리를 적용해야합니다.
아래 Main Function 을 통해서 테스트를 수행하였습니다.
AsyncDataStream 의 Timeout 을 5초로 설정하였습니다.
하지만 AsyncIO 클래스는 각 Input 데이터를 처리하는데에 10초가 소요됩니다.
즉, 모든 이벤트는 Timeout 의 대상이 됩니다.
public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); // 1부터 100까지의 숫자 데이터가 생성됩니다. DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source"); // 1부터 100까지의 숫자 데이터를 처리합니다. // 각 Input 을 처리하는데에 10초가 소요됩니다. DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 5, TimeUnit.SECONDS, 100).name("async-io").uid("async-io"); stream.rebalance().addSink(new CustomPrint()).name("print").uid("print"); env.execute(); }
위 Flink Application 의 실행 후 출력 결과는 아래와 같습니다.
모든 이벤트는 Timeout 됩니다.
AsyncIO Timeouts. value : 1 AsyncIO Timeouts. value : 2 AsyncIO Timeouts. value : 3 AsyncIO Timeouts. value : 4 AsyncIO Timeouts. value : 5 AsyncIO Timeouts. value : 6 AsyncIO Timeouts. value : 7 AsyncIO Timeouts. value : 8 AsyncIO Timeouts. value : 9 AsyncIO Timeouts. value : 10 AsyncIO Timeouts. value : 11 // ... AsyncIO Timeouts. value : 94 AsyncIO Timeouts. value : 95 AsyncIO Timeouts. value : 96 AsyncIO Timeouts. value : 97 AsyncIO Timeouts. value : 98 AsyncIO Timeouts. value : 99 AsyncIO Timeouts. value : 100
그럼 Timeout 은 왜 필요한 것일까요 ? 그냥 무한히 큰 값을 주면 되지 않을까요 ?
만약 Timeout 이 매우 길어지게 되면 Source -> AsyncDataStream 사이에 병목이 발생하게 됩니다.
아래의 이미지처럼 async-io 는 Busy 100% 상태가 되며, Source 의 데이터 생성 속도를 따라가지 못합니다.
Capacity.
AsyncDataStream 의 또 다른 설정은 Capacity 가 존재합니다.
Capacity 설정은 동시에 처리할 수 있는 데이터의 수를 의미합니다.
ThreadPool 에서 설정하는 Pool 사이즈와 유사한 개념입니다.
만약 AsyncDataStream 을 Database 로부터 데이터를 조회하거나 생성하는 용도로 Capacity 와 ConnectionPool 의 사이즈를 함께 고려하는 것이 좋습니다.
왜냐하면 Async IO 의 Capacity 수가 Connection 수보다 월등히 많게 되면, Connection 을 획득하는데에 시간이 소요되다보니
Timeout 이 발생할 확률이 높습니다.
Capacity 또한 Timeout 처럼 값이 크다고 능사가 아닙니다.
Capacity 1 의 처리 속도.
100개의 데이터는 Capacity 1 인 상태에서 얼마나 처리 시간이 드는지 확인해보겠습니다.
public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source"); DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 1).name("async-io").uid("async-io"); stream.rebalance().addSink(new CustomPrint()).name("print").uid("print"); env.execute(); }
위 Flink Application 의 실행 출력 결과는 아래와 같습니다.
처리 시간은 5분 이상 소요되었구요.
Capacity 가 1 로 설정되었기 때문에 1개의 Input 을 전달받아 처리하게 됩니다.
이는 Map, FlatMap 과 같은 여타 Sync Operator 와 다르지 않습니다.
AsyncIO Processes. value : 1 AsyncIO Processes. value : 2 ### print 1 AsyncIO Processes. value : 3 ### print 2 AsyncIO Processes. value : 4 ### print 3 AsyncIO Processes. value : 5 ### print 4 // ... 생략 AsyncIO Processes. value : 97 ### print 96 AsyncIO Processes. value : 98 ### print 97 AsyncIO Processes. value : 99 ### print 98 AsyncIO Processes. value : 100 ### print 99 ### print 100
Capacity 5 의 처리 속도.
Capacity 5 의 설정은 동시에 5개의 입력 데이터를 처리할 수 있기 때문에 처리 시간이 단축됩니다.
CPU Core 등 여러 변수가 존재하긴 하지만 3분 이래의 처리가 완료됩니다.
public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStream<Integer> source = env.addSource(new IntegerSource(100, 100)).name("source").uid("source"); DataStream<Integer> stream = AsyncDataStream.unorderedWait(source, new AsyncIO(), 30, TimeUnit.SECONDS, 5).name("async-io").uid("async-io"); stream.rebalance().addSink(new CustomPrint()).name("print").uid("print"); env.execute(); }
출력 결과는 아래와 같습니다.
아래 로그와 같이 5개의 입력 데이터가 병렬 처리됩니다.
AsyncIO Processes. value : 1 AsyncIO Processes. value : 2 AsyncIO Processes. value : 3 AsyncIO Processes. value : 4 AsyncIO Processes. value : 5 AsyncIO Processes. value : 6 ### print 1 AsyncIO Processes. value : 7 ### print 2 AsyncIO Processes. value : 8 ### print 3 AsyncIO Processes. value : 9 ### print 4 AsyncIO Processes. value : 10 ### print 5 // ... 생략 AsyncIO Processes. value : 97 ### print 92 AsyncIO Processes. value : 98 ### print 93 AsyncIO Processes. value : 99 ### print 94 AsyncIO Processes. value : 100 ### print 95 ### print 96 ### print 97 ### print 98 ### print 99 ### print 100
마치며.
AsyncDataStream 을 통해서 Network IO 작업이 많은 Task 의 최적화를 적용할 수 있습니다.
Network IO 에 대한 시간 지연을 고려해야 Timeout 과 Capacity 를 적용한다면 데이터 처리 시간을 단축시킬 수 있습니다.
Capacity 의 값은 반드시 Network Connection 수와 CPU Core 의 수를 고려해야합니다.
Capacity 보다 Connection 의 수가 적으면 Async Task 에서 Connection 을 획득하는데에 시간이 소요됩니다.
그리고 CPU Core 가 적게 되면 Capacity 가 늘어나도 Async Task 가 적절히 수행되지 못합니다.
그래서 Timeout 이 발생할 확률이 커집니다.
반응형'Flink' 카테고리의 다른 글
[Flink] Operator Chaining 알아보기 (0) 2024.03.25 [Flink] Async IO Retry Strategy 알아보기 (0) 2024.03.23 [Flink] JSON FileSink 만들어보기 (Row Format) (0) 2024.02.12 [Flink] Tumbling Time Window 알아보기 (TumblingEventTimeWindows) (0) 2024.02.12 [Flink] Window AllowedLateness 알아보기 (Watermark) (0) 2024.02.12