ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable)
    Language/ReactiveX 2023. 10. 1. 16:40
    728x90
    반응형

    - 목차

     

    관련된 글

    https://westlife0615.tistory.com/317

     

    RxJava Observable 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니

    westlife0615.tistory.com

     

    https://westlife0615.tistory.com/2

     

    ReactiveX 알아보기

    - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니다. Pull. Pull 모델에 대해서 이야기하기 이전에 데이터의 생성

    westlife0615.tistory.com

     

    소개.

    RxJava 에서 Observable 이외에 여러 변형들이 존재합니다.

    그 종류로 Single, Maybe, Completable, Flowable 등이 있습니다.

    하나씩 예시와 함께 알아보도록 하겠습니다.

     

    Single.

    Single 은 Observable 과 달리 데이터 스트림을 처리할 수 없습니다.

    단 하나의 데이터만을 처리할 수 있기 때문에 Single 이라고 불립니다.

     

    Observable.fromArray

    Observable.range

     

    와 같이 복수개의 데이터 스트림을 다루는 것이 아닌 단 하나의 데이터만을 Observable Pipeline 으로 흘려보낼 수 있습니다.

     

    아래는 Single 와 관련된 예시입니다.

    "test1" 이라는 String 하나만이 Pipeline 을 흘러갑니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Single;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Single<String> single1 = Single.just("test1");
        single1.subscribe(System.out::println);
        
      }
    
    }

     

     

    아래 예시는 Observable Stream 을 강제로 Single 로 변형하는 예시입니다.

    이를 에러를 발생시키는 코드입니다.

     

    Caused by: java.lang.IllegalArgumentException: Sequence contains more than one element!

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.Single;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9 );
        Single<Integer> single2 = Single.fromObservable(observable);
        single2.subscribe(System.out::println);
      }
    
    }

     

    Single 은 하나의 데이터만을 취급하는 특수한 Observable 의 한 형태입니다.

     

     

    Maybe.

    Maybe 는 Single 과 유사합니다.

    다만 차이가 있다면 Single 은 반드시 하나의 데이터를 Emit 해야하는 Observable 입니다.

    반면 Maybe 는 하나의 데이터를 Emit 하거나 아니면 Completion 해도 되는 Observable 입니다.

    Maybe 의 의미처럼 1개 또는 0개의 데이터를 다룰 수 있는 Observable 입니다.

     

    Maybe 와 관련된 예시코드입니다.

    Pipeline 의 데이터 소스로 Maybe 가 사용됩니다.

    데이터를 Emit 하여 데이터 파이프라인에 흘려보낼 수도 있고, Completion 할 수도 있습니다.

    랜덤하게 onSuccess 와 onComplete 를 호출하는 구조로 Maybe 를 구현하였습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Maybe;
    import io.reactivex.rxjava3.functions.Consumer;
    
    import java.util.Random;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Maybe<Integer> maybe = Maybe.create(emitter -> {
          if (new Random().nextBoolean()) {
            emitter.onSuccess(1);
          } else {
            emitter.onComplete();
          }
    
        });
        maybe
                .map((value) -> {
                  System.out.println(value);
                  return value;
                })
                .subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer integer) throws Throwable {
            System.out.println(integer);
          }
        }, new Consumer<Throwable>() {
          @Override
          public void accept(Throwable throwable) throws Throwable {
            System.err.println(throwable);
          }
        });
    
      }
    
    }

     

    emitter.onSuccess(1) 이 호출되는 경우에는 1이 출력되면서 파이파라인이 종료됩니다.

    emitter.onCompletion() 이 호출되는 경우에는 즉시 전체 파이프라인이 종료됩니다.

     

    Completable.

    Completable 은 onError 또는 onCompletion 두 가지 행동만을 취할 수 있는 특수한 Observable 입니다.

    Completable 을 통해서 구축할 수 있는 파이프라인 구조는 성공 또는 실패 (에러) 의 두가지 결과만을 활용하는 케이스인데요.

    예를 들어, 스위치를 Turn On/Off 의 결과만을 수집하는 IoT 같은 환경에서 사용할 수 있을 것 같고,

    On/Off, 역치값의 이상/이하 처럼 binary 형식의 데이터를 취급함으로써 파이프라인을 타고 흐르는 데이터의 사이즈를 줄이는 효과도 있을 것 같네요.

     

    아래 코드는 Completable 과 관련된 예시입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.Completable;
    import io.reactivex.rxjava3.core.CompletableEmitter;
    import io.reactivex.rxjava3.core.CompletableOnSubscribe;
    import io.reactivex.rxjava3.functions.Action;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Completable completable = Completable.create(new CompletableOnSubscribe() {
          @Override
          public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
            emitter.onComplete();
          }
        });
    
        completable
                .subscribe(new Action() {
                  @Override
                  public void run() throws Throwable {
                    System.out.println("finished");
                  }
                });
      }
    }

     

     

    Flowable.

    Flowable 은 BackPressure 기능을 하진 Observable 입니다.

    관련 내용은 BackPressure 와 관련해서 다른 글로 포스팅하도록 하겠습니다.

    반응형

    'Language > ReactiveX' 카테고리의 다른 글

    RxJava Terminating Operator 알아보기  (0) 2023.10.02
    RxJava Combining Operator 알아보기  (0) 2023.10.01
    RxJava Observable 알아보기  (0) 2023.09.29
    RxJava Thread Scheduling 알아보기  (0) 2023.09.28
    ReactiveX 알아보기  (0) 2019.07.23
Designed by Tistory.