-
RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable)Language/ReactiveX 2023. 10. 1. 16:40728x90반응형
- 목차
관련된 글
https://westlife0615.tistory.com/317
https://westlife0615.tistory.com/2
소개.
RxJava 에서 Observable 이외에 여러 변형들이 존재합니다.
그 종류로 Single, Maybe, Completable, Flowable 등이 있습니다.
하나씩 예시와 함께 알아보도록 하겠습니다.
Single.
Single 은 Observable 과 달리 데이터 스트림을 처리할 수 없습니다.
단 하나의 데이터만을 처리할 수 있기 때문에 Single 이라고 불립니다.
Observable.fromArray
Observable.range
와 같이 복수개의 데이터 스트림을 다루는 것이 아닌 단 하나의 데이터만을 Observable Pipeline 으로 흘려보낼 수 있습니다.
아래는 Single 와 관련된 예시입니다.
"test1" 이라는 String 하나만이 Pipeline 을 흘러갑니다.
package org.example; import io.reactivex.rxjava3.core.Single; public class Main { public static void main(String[] args) { Single<String> single1 = Single.just("test1"); single1.subscribe(System.out::println); } }
아래 예시는 Observable Stream 을 강제로 Single 로 변형하는 예시입니다.
이를 에러를 발생시키는 코드입니다.
Caused by: java.lang.IllegalArgumentException: Sequence contains more than one element!
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9 ); Single<Integer> single2 = Single.fromObservable(observable); single2.subscribe(System.out::println); } }
Single 은 하나의 데이터만을 취급하는 특수한 Observable 의 한 형태입니다.
Maybe.
Maybe 는 Single 과 유사합니다.
다만 차이가 있다면 Single 은 반드시 하나의 데이터를 Emit 해야하는 Observable 입니다.
반면 Maybe 는 하나의 데이터를 Emit 하거나 아니면 Completion 해도 되는 Observable 입니다.
Maybe 의 의미처럼 1개 또는 0개의 데이터를 다룰 수 있는 Observable 입니다.
Maybe 와 관련된 예시코드입니다.
Pipeline 의 데이터 소스로 Maybe 가 사용됩니다.
데이터를 Emit 하여 데이터 파이프라인에 흘려보낼 수도 있고, Completion 할 수도 있습니다.
랜덤하게 onSuccess 와 onComplete 를 호출하는 구조로 Maybe 를 구현하였습니다.
package org.example; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.functions.Consumer; import java.util.Random; public class Main { public static void main(String[] args) { Maybe<Integer> maybe = Maybe.create(emitter -> { if (new Random().nextBoolean()) { emitter.onSuccess(1); } else { emitter.onComplete(); } }); maybe .map((value) -> { System.out.println(value); return value; }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { System.out.println(integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Throwable { System.err.println(throwable); } }); } }
emitter.onSuccess(1) 이 호출되는 경우에는 1이 출력되면서 파이파라인이 종료됩니다.
emitter.onCompletion() 이 호출되는 경우에는 즉시 전체 파이프라인이 종료됩니다.
Completable.
Completable 은 onError 또는 onCompletion 두 가지 행동만을 취할 수 있는 특수한 Observable 입니다.
Completable 을 통해서 구축할 수 있는 파이프라인 구조는 성공 또는 실패 (에러) 의 두가지 결과만을 활용하는 케이스인데요.
예를 들어, 스위치를 Turn On/Off 의 결과만을 수집하는 IoT 같은 환경에서 사용할 수 있을 것 같고,
On/Off, 역치값의 이상/이하 처럼 binary 형식의 데이터를 취급함으로써 파이프라인을 타고 흐르는 데이터의 사이즈를 줄이는 효과도 있을 것 같네요.
아래 코드는 Completable 과 관련된 예시입니다.
package org.example; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableEmitter; import io.reactivex.rxjava3.core.CompletableOnSubscribe; import io.reactivex.rxjava3.functions.Action; public class Main { public static void main(String[] args) { Completable completable = Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable { emitter.onComplete(); } }); completable .subscribe(new Action() { @Override public void run() throws Throwable { System.out.println("finished"); } }); } }
Flowable.
Flowable 은 BackPressure 기능을 하진 Observable 입니다.
관련 내용은 BackPressure 와 관련해서 다른 글로 포스팅하도록 하겠습니다.
반응형'Language > ReactiveX' 카테고리의 다른 글
RxJava Terminating Operator 알아보기 (0) 2023.10.02 RxJava Combining Operator 알아보기 (0) 2023.10.01 RxJava Observable 알아보기 (0) 2023.09.29 RxJava Thread Scheduling 알아보기 (0) 2023.09.28 ReactiveX 알아보기 (0) 2019.07.23