ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Flowable 알아보기
    Language/ReactiveX 2023. 10. 2. 02:40
    728x90
    반응형

    - 목차

     

    소개.

    Flowable 은 BackPressure 기능을 가진 Observable 입니다.

    Flowable 은 단순히 데이터 스트림을 관리하는 수준을 넘어서 데이터 처리 속도의 동기화를 조율할 수 있습니다.

    Flowable 은 Downstream 의 Consumer 의 데이터 소비 속도에 맞추어 생산 속도를 조율합니다.

     

    처리 속도를 조율하기 위해서 데이터를 임시적으로 저장하기 위한 버퍼를 사용합니다.

    Flowable 은 Floawable 내부의 버퍼를 가지고 있고,

    Backpressure Buffer 라는 외부의 버퍼를 활용합니다.

    Flowable Buffer 와 Backpressure Buffer 를 통해서 데이터 처리를 잠시 유예하거나

    버퍼가 오버플로우되었을 때, 어떤식으로 Drop 할지 선택할 수 있습니다.

     

     

    Backpressure Strategy.

    Backpressure Strategy 에 대해서 설명해보도록 하겠습니다.

    Backpressure Strategy 는 Downstream Consumer 의 처리 속도가 느려서

    Flowable 의 데이터

     

    종류는 다음과 같습니다.

    - Buffer

    - Drop

    - Latest

    - Error

     

    각각 Backpressure Strategy 의 설명과 예시에 대해서 작성해보도록 하겠습니다.

     

    DROP.

    DROP 전략은 버퍼 사이즈보다 많은 양의 데이터가 생성된 경우에 추가적인 데이터를 삭제하는 전략입니다.

    예를 들어, 생성된 데이터가 30개가 존재하고, 버퍼 사이즈가 10입니다.

    그리거 Downstream Consumer 들이 제대로 소비를 진행하지 못하는 경우에

    버퍼 사이즈를 넘어선 20개의 데이터가 Drop 되는 전략입니다.

     

    버퍼 사이즈는 시스템 변수인 rx3.buffer-size 를 통해서 설정할 수 있구요.

    Subscriber 가 데이터의 처리를 진행할 수 없을 때에 rx3.buffer-size 가 10 이라면

    Flowable 은 10개의 데이터만큼 버퍼에 저장하고 나머지는 모두 DROP 합니다.

     

    아래의 예시에서

    Flowable 은 1 부터 30 까지의 데이터를 생성합니다.

    그리고 Subscriber 는 10초 이후부터 본격적으로 데이터 소비를 진행할 수 있습니다.

    따라서 실행 시점부터 10초가 될 때까지 Subscriber 는 제대로된 데이터 소비를 진행할 수 없으므로

    DROP 전략에 따라 데이터는 모두 DROP 됩니다.

     

    하지만 Flowable 의 버퍼 사이즈가 10 이므로 10개의 데이터는 버퍼에 저장됩니다.

    그래서  Subscriber 는 Upstream Flowable 의 버퍼에 저장되어 있던 1 ~ 10 까지 데이터를 소비하며

    나머지 11 ~ 30 의 데이터는 모두 DROP 됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.BackpressureStrategy;
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.core.FlowableEmitter;
    import io.reactivex.rxjava3.core.FlowableOnSubscribe;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
    
      public static void main(String[] args) {
    
        System.setProperty("rx3.buffer-size", "10");
    
        Instant after10Sec = Instant.now().plus(10, ChronoUnit.SECONDS);
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
            for (int i = 1; i < 30; i++) {
              emitter.onNext(i);
              System.out.printf("Data Source value is %s \n", i);
            }
            emitter.onComplete();
          }
        }, BackpressureStrategy.DROP);
    
        flowable
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  while (after10Sec.isAfter(Instant.now())) {}
                  System.out.printf("Subscriber value is %s \n", value);
                });
    
        while (true) {}
      }
    }

    <실행 결과>

    Data Source value is 1 
    Data Source value is 2 
    Data Source value is 3 
    Data Source value is 4 
    Data Source value is 5 
    Data Source value is 6 
    Data Source value is 7 
    Data Source value is 8 
    Data Source value is 9 
    Data Source value is 10 
    Data Source value is 11 
    Data Source value is 12 
    Data Source value is 13 
    Data Source value is 14 
    Data Source value is 15 
    Data Source value is 16 
    Data Source value is 17 
    Data Source value is 18 
    Data Source value is 19 
    Data Source value is 20 
    Data Source value is 21 
    Data Source value is 22 
    Data Source value is 23 
    Data Source value is 24 
    Data Source value is 25 
    Data Source value is 26 
    Data Source value is 27 
    Data Source value is 28 
    Data Source value is 29 
    Subscriber value is 1 
    Subscriber value is 2 
    Subscriber value is 3 
    Subscriber value is 4 
    Subscriber value is 5 
    Subscriber value is 6 
    Subscriber value is 7 
    Subscriber value is 8 
    Subscriber value is 9 
    Subscriber value is 10

     

    BUFFER.

    BUFFER 는 Backpressure Buffer 를 사용하는 전략입니다.

    Flowable 의 자체 버퍼 뿐만 아니라 Backpressure 을 위한 Buffer 를 사용합니다.

    Backpressure Buffer 의 사이즈와 Drop 전략 등을 설정해야합니다.

     

    BackpressureOverflowStrategy.DROP_OLDEST.

    아래 예시는

    Flowable Buffer Size : 10

    Backpressure Buffer Size : 10

    Backpressure Overflow Drop : oldest

    인 상황에 대한 케이스입니다.

     

    Flowable 은 1 ~ 30 까지의 데이터를 생성하고,

    실행 시작부터 5초까지 Subscriber 는 대기상태에 머뭅니다.

    그리고 5초 이후부터 Subscribe 가 시작되는데, DROP_OLDEST 전략에 의해서

    Flowable 버퍼에 담겨진 1 ~ 10 의 데이터와

    Backpressure Buffer 에 저장된 20 ~ 29 의 데이터가 소비됩니다.

    나머지 11 ~ 19 의 데이터는 Drop 됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.*;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
    
      public static void main(String[] args) {
    
        System.setProperty("rx3.buffer-size", "10");
    
        Instant after5Sec = Instant.now().plus(5, ChronoUnit.SECONDS);
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
            for (int i = 1; i < 30; i++) {
              emitter.onNext(i);
              System.out.printf("Data Source value is %s \n", i);
            }
            emitter.onComplete();
          }
        }, BackpressureStrategy.BUFFER);
    
        flowable
                .onBackpressureBuffer(
                        10,
                        () -> System.out.print("overflow !! help me!  "),
                        BackpressureOverflowStrategy.DROP_OLDEST,
                        integer -> System.out.printf("Dropped value is %s \n", integer)
                )
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  while (after5Sec.isAfter(Instant.now())) {}
                  System.out.printf("Subscriber value is %s \n", value);
                });
    
        while (true) {}
      }
    }

     

    <실행 결과>

    Data Source value is 1 
    Data Source value is 2 
    Data Source value is 3 
    Data Source value is 4 
    Data Source value is 5 
    Data Source value is 6 
    Data Source value is 7 
    Data Source value is 8 
    Data Source value is 9 
    Data Source value is 10 
    Data Source value is 11 
    Data Source value is 12 
    Data Source value is 13 
    Data Source value is 14 
    Data Source value is 15 
    Data Source value is 16 
    Data Source value is 17 
    Data Source value is 18 
    Data Source value is 19 
    Data Source value is 20 
    overflow !! help me!  Dropped value is 11 
    Data Source value is 21 
    overflow !! help me!  Dropped value is 12 
    Data Source value is 22 
    overflow !! help me!  Dropped value is 13 
    Data Source value is 23 
    overflow !! help me!  Dropped value is 14 
    Data Source value is 24 
    overflow !! help me!  Dropped value is 15 
    Data Source value is 25 
    overflow !! help me!  Dropped value is 16 
    Data Source value is 26 
    overflow !! help me!  Dropped value is 17 
    Data Source value is 27 
    overflow !! help me!  Dropped value is 18 
    Data Source value is 28 
    overflow !! help me!  Dropped value is 19 
    Data Source value is 29 
    Subscriber value is 1 
    Subscriber value is 2 
    Subscriber value is 3 
    Subscriber value is 4 
    Subscriber value is 5 
    Subscriber value is 6 
    Subscriber value is 7 
    Subscriber value is 8 
    Subscriber value is 9 
    Subscriber value is 10 
    Subscriber value is 20 
    Subscriber value is 21 
    Subscriber value is 22 
    Subscriber value is 23 
    Subscriber value is 24 
    Subscriber value is 25 
    Subscriber value is 26 
    Subscriber value is 27 
    Subscriber value is 28 
    Subscriber value is 29

     

     

    BackpressureOverflowStrategy.DROP_LATEST.

    DROP_LATEST 는 DROP_OLDEST 와 반대의 전략입니다.

    Backpressure Buffer 에 쌓인 데이터 중 최신 데이터가 Drop 되는 전략입니다.

     

    Flowable 의 설정과 상황은 DROP_OLDEST 케이스와 동일합니다.

    Flowable Buffer Size : 10

    Backpressure Buffer Size : 10

    Backpressure Overflow Drop : latest

    인 상황에 대한 케이스입니다.

     

    Flowable 은 1 ~ 30 까지의 데이터를 생성하고,

    실행 시작부터 5초까지 Subscriber 는 대기상태에 머뭅니다.

    그리고 5초 이후부터 Subscribe 가 시작되는데, DROP_LATEST 전략에 의해서

     

    1 ~ 20 까지의 데이터는 Flowable Buffer 와 Backpressure Buffer 에 10개씩 저장됩니다.

    Flowable Buffer 에는 1 부터 10 까지의 데이터가,

    Backpressure Buffer 에는 11 부터 20 까지의 데이터가 저장됩니다.

     

    문제는 21부터 30까지의 데이터인데요.

    Flowable 이 21 데이터를 생성할 때에 모든 버퍼가 가득 찬 상태입니다.

    따라서 DROP_LATEST 전략에 따라 가장 최근에 버퍼에 추가된 20 데이터가 Drop 됩니다.

    그리고 21, 22, ... ,29 가 Drop 됩니다.

    따라서 1 ~ 19 그리고 30 이 Flowable 의 Backpressure 전략에 의해서 Subscriber 에 전달됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.*;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
    
      public static void main(String[] args) {
    
        System.setProperty("rx3.buffer-size", "10");
    
        Instant after5Sec = Instant.now().plus(5, ChronoUnit.SECONDS);
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
            for (int i = 1; i <= 30; i++) {
              emitter.onNext(i);
              System.out.printf("Data Source value is %s \n", i);
            }
            emitter.onComplete();
          }
        }, BackpressureStrategy.BUFFER);
    
        flowable
                .onBackpressureBuffer(
                        10,
                        () -> System.out.print("overflow !! help me!  "),
                        BackpressureOverflowStrategy.DROP_LATEST,
                        integer -> System.out.printf("Dropped value is %s \n", integer)
                )
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  while (after5Sec.isAfter(Instant.now())) {}
                  System.out.printf("Subscriber value is %s \n", value);
                });
    
        while (true) {}
      }
    }

     

    <실행 결과>

    Data Source value is 1 
    Data Source value is 2 
    Data Source value is 3 
    Data Source value is 4 
    Data Source value is 5 
    Data Source value is 6 
    Data Source value is 7 
    Data Source value is 8 
    Data Source value is 9 
    Data Source value is 10 
    Data Source value is 11 
    Data Source value is 12 
    Data Source value is 13 
    Data Source value is 14 
    Data Source value is 15 
    Data Source value is 16 
    Data Source value is 17 
    Data Source value is 18 
    Data Source value is 19 
    Data Source value is 20 
    overflow !! help me!  Dropped value is 20 
    Data Source value is 21 
    overflow !! help me!  Dropped value is 21 
    Data Source value is 22 
    overflow !! help me!  Dropped value is 22 
    Data Source value is 23 
    overflow !! help me!  Dropped value is 23 
    Data Source value is 24 
    overflow !! help me!  Dropped value is 24 
    Data Source value is 25 
    overflow !! help me!  Dropped value is 25 
    Data Source value is 26 
    overflow !! help me!  Dropped value is 26 
    Data Source value is 27 
    overflow !! help me!  Dropped value is 27 
    Data Source value is 28 
    overflow !! help me!  Dropped value is 28 
    Data Source value is 29 
    overflow !! help me!  Dropped value is 29
    Data Source value is 30
    Subscriber value is 1 
    Subscriber value is 2 
    Subscriber value is 3 
    Subscriber value is 4 
    Subscriber value is 5 
    Subscriber value is 6 
    Subscriber value is 7 
    Subscriber value is 8 
    Subscriber value is 9 
    Subscriber value is 10 
    Subscriber value is 11 
    Subscriber value is 12 
    Subscriber value is 13 
    Subscriber value is 14 
    Subscriber value is 15 
    Subscriber value is 16 
    Subscriber value is 17 
    Subscriber value is 18 
    Subscriber value is 19 
    Subscriber value is 30

     

     

    Latest.

    Latest 전략은 가장 최근의 데이터 하나만이 Backpressure Buffer 에 의해서 살아남는 전략입니다.

    Backpressure Buffer 사이즈가 1인 상황과 유사합니다.

     

    아래 예시는

    1 부터 30 까지의 데이터 생성하는 Flowable 과 5초 동안 대기 상태에 머무는 Subscriber 의 예시입니다.

     

    생성된 1 부터 10 까지의 데이터는 Flowable Buffer 에 의해서 저장되고, 

    30 데이터는 Latest 전략에 의해서 Backpressure Buffer 에 저장되어 Subscriber 에게 전달됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.*;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
    
      public static void main(String[] args) {
    
        System.setProperty("rx3.buffer-size", "10");
    
        Instant after5Sec = Instant.now().plus(5, ChronoUnit.SECONDS);
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
            for (int i = 1; i <= 30; i++) {
              emitter.onNext(i);
              System.out.printf("Data Source value is %s \n", i);
            }
            emitter.onComplete();
          }
        }, BackpressureStrategy.LATEST);
    
        flowable
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  while (after5Sec.isAfter(Instant.now())) {}
                  System.out.printf("Subscriber value is %s \n", value);
                });
    
        while (true) {}
      }
    }

     

    <실행 결과>

    Data Source value is 1 
    Data Source value is 2 
    Data Source value is 3 
    Data Source value is 4 
    Data Source value is 5 
    Data Source value is 6 
    Data Source value is 7 
    Data Source value is 8 
    Data Source value is 9 
    Data Source value is 10 
    Data Source value is 11 
    Data Source value is 12 
    Data Source value is 13 
    Data Source value is 14 
    Data Source value is 15 
    Data Source value is 16 
    Data Source value is 17 
    Data Source value is 18 
    Data Source value is 19 
    Data Source value is 20 
    Data Source value is 21 
    Data Source value is 22 
    Data Source value is 23 
    Data Source value is 24 
    Data Source value is 25 
    Data Source value is 26 
    Data Source value is 27 
    Data Source value is 28 
    Data Source value is 29 
    Subscriber value is 1 
    Subscriber value is 2 
    Subscriber value is 3 
    Subscriber value is 4 
    Subscriber value is 5 
    Subscriber value is 6 
    Subscriber value is 7 
    Subscriber value is 8 
    Subscriber value is 9 
    Subscriber value is 10 
    Subscriber value is 30

     

     

    Error.

    Backpressure 상황 시에 Error 시그널을 Downstream Consumer 에게 전달합니다.

    Subscriber 와 같은 Downstream Consumer 들은 Error Handling 을 통해서

    에러 로그를 남기거나 Sentry 와 같은 알람을 제공하거나

    더 나아가서 Flowable Chain 을 재실행하는 방법 등이 있습니다.

     

    Flowable Chain 이 Side-Effect 가 없는 구조이면서 Batch 형식의 구조라면 오히려 다시 실행하는 것이 좋을 수도 있습니다.

     

    아래는 Error 전략에 대한 예시 코드입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.BackpressureStrategy;
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.core.FlowableEmitter;
    import io.reactivex.rxjava3.core.FlowableOnSubscribe;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
    
      public static void main(String[] args) {
    
        System.setProperty("rx3.buffer-size", "10");
    
        Instant after5Sec = Instant.now().plus(5, ChronoUnit.SECONDS);
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
            for (int i = 1; i <= 30; i++) {
              emitter.onNext(i);
              System.out.printf("Data Source value is %s \n", i);
            }
            emitter.onComplete();
          }
        }, BackpressureStrategy.ERROR);
    
        flowable
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  while (after5Sec.isAfter(Instant.now())) {}
                  System.out.printf("Subscriber value is %s \n", value);
                }, (throwable) -> {
                  System.err.printf("Subscriber value is %s \n", throwable);
                });
    
        while (true) {}
      }
    }

     

    <실행 결과>

    Data Source value is 1 
    Data Source value is 2 
    Data Source value is 3 
    Data Source value is 4 
    Data Source value is 5 
    Data Source value is 6 
    Data Source value is 7 
    Data Source value is 8 
    Data Source value is 9 
    Data Source value is 10 
    Data Source value is 11 
    Data Source value is 12 
    Data Source value is 13 
    Data Source value is 14 
    Data Source value is 15 
    Data Source value is 16 
    Data Source value is 17 
    Data Source value is 18 
    Data Source value is 19 
    Data Source value is 20 
    Data Source value is 21 
    Data Source value is 22 
    Data Source value is 23 
    Data Source value is 24 
    Data Source value is 25 
    Data Source value is 26 
    Data Source value is 27 
    Data Source value is 28 
    Data Source value is 29 
    Data Source value is 30 
    Subscriber value is 1 
    Subscriber value is io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: Could not emit value due to lack of requests

     

     

     

     

    반응형
Designed by Tistory.