-
RxJava Terminating Operator 알아보기Language/ReactiveX 2023. 10. 2. 01:08728x90반응형
- 목차
관련된 글
https://westlife0615.tistory.com/317
https://westlife0615.tistory.com/321
소개.
Terminating Operator 는 종결 연산자라고 불리는데요.
종결 연산자의 의미는 데이터의 흐름을 끝맺는 연산자입니다.
Java 의 Stream api 중에서 join, reduce, sum 등이 종결 연산자에 해당합니다.
join 을 예로 들면, Stream 의 모든 데이터를 한데 묶어 처리할 수 있고,
reduce 와 sum 또한 그 쓰임새가 일맥상통합니다.
RxJava 에서 Stream API 의 종결 연산자와 유사한 API 를 제공합니다.
Observable Pipeline 을 끝맺는 연산자들로 subscribe 가 대표적인 예시입니다.
subscribe.
subscribe 는 대표적인 Terminating Operator 입니다.
Observable 의 subscribe Operator 를 통해서 Observable 에 Consumer 들을 등록할 수 있는데요.
Consumer 들이 Observable 에 구독자로써 등록되는 순간부터 Observable Pipeline 의 데이터 스트리밍이 시작됩니다.
Observable Pipeline 을 통해 데이터들이 흘러가며, subscribe 로 등록된 Consumer 에 도달하게 됩니다.
Consumer 는 크게 3가지가 존재합니다.
- 데이터를 소비하는 Consumer (Consumer<Data>)
- 에러를 핸들링하는 Consumer (Consumer<Throwable>)
- 스트리밍의 종료를 핸들링하는 Consumer (Action<>)
위 3가지 Consumer 를 통해서 데이터 스트림을 처리하게 됩니다.
사용 예시는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); observable.subscribe( System.out::println, System.err::println, () -> System.out.println("finished") ); } }
<실행 결과>
1 2 3 4 5 6 7 8 9 10
forEach.
forEach 또한 subscribe 처럼 종결 연산자입니다.
그리고 forEach 도 subscribe 의 기능 중 하나인 Observable 의 Data Emission 을 consume 할 수 있습니다.
차이점이 있다면, forEach 연산자는 onError 와 onCompletion 에 대한 consume 은 수행할 수 없습니다.
오직 onNext 에 해당하는 Data Emission 의 Consume 에 특화되어 있습니다.
그래서 Error Handling 이 필요없는 케이스에 대하여 (Error 가 발생할 일이 없는 구조인 경우) forEach 를 사용할 수 있습니다.
아래 예시는 Observable 의 Data Emission 에 대한 Consume 과정입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } }); observable.forEach(System.out::println); } }
<실행 결과>
0 1 2 3 4 5 6 7 8 9
다만, Observable 이 onError 를 실행하는 경우에 forEach 는 Error Handling 을 수행할 수 없습니다.
그래서 위 Observable Pipeline 은 Observable 에서 Error 가 발생한다면 곧바로 종료되는 취약한 코드 구조입니다.
아래 코드는 Throw Error 이후에 forEach 연산자 레벨에서 에러가 발생하는 예시입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } emitter.onError(new Throwable()); for (int i = 10; i < 20; i++) { emitter.onNext(i); } }); observable.forEach(System.out::println); } }
<실행 결과>
0 1 2 3 4 5 6 7 8 9 io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call.
blockingFirst.
blockingFirst Operator 는 Observable Chain 의 데이터 스트림에서 첫번째 데이터를 얻을 수 있는 Interface 입니다.
두번째나 세번째 또는 그 이상의 index 의 데이터를 얻고자하는 상황이면 skip Operator 를 함께 사용해야합니다.
다만, 비동기 상황에서는 데이터의 시퀀스가 보장되지 않으니 이점을 유의해야합니다.
그리고 blockingFirst 를 통해서 전체적인 Observable chain 이 종료되진 않습니다.
아래의 예시가 관련 코드입니다.
Data Source 에 해당하는 Observable 이 종결되지 않고 unbound 하게 꾸준히 지속되는 상황이라면
blockingFirst Operator 를 사용한다고 하더라도 Observable Chain 이 종료되진 않습니다.
이 점을 유의해서 blockingFirst 를 사용하는 것이 중요합니다.
package org.example; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableEmitter; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { for (int i = 0; i < 100000000; i++) { emitter.onNext(i); System.out.println(i); System.out.println(Thread.currentThread().getName()); Thread.sleep(1000); } } }); Integer result = observable .subscribeOn(Schedulers.computation()) .blockingFirst(); System.out.println(result); while (true) {} } }
<실행 결과>
0 RxComputationThreadPool-1 blockingFirst is 0 1 RxComputationThreadPool-1 2 RxComputationThreadPool-1 3 RxComputationThreadPool-1 4 RxComputationThreadPool-1 5 RxComputationThreadPool-1 6 // 생략 ...
blockingLast.
blockingLast Operator 는 blockingFirst 와 반대로 마지막 데이터를 얻을 수 있는 interface 입니다.
blockingLast Operator 가 마지막 데이터를 식별하는 기준은 Observable 의 종료 여부입니다.
Observable 이 onComplete 를 호출하여 종료되었다면, 가장 마지막으로 유입된 데이터를 last data 로 간주합니다.
만약 Observable 이 onComplete 를 호출하지 않는 Observable Chain 이라면 blockingLast 에 의해서 Observable Chain 이 종료되지 않습니다.
package org.example; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableEmitter; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { for (int i = 0; i < 10; i++) { emitter.onNext(i); System.out.println(i); System.out.println(Thread.currentThread().getName()); } emitter.onComplete(); } }); Integer result = observable .subscribeOn(Schedulers.computation()) .blockingLast(); System.out.println("blockingFirst is " + result); while (true) {} } }
<실행 결과>
0 RxComputationThreadPool-1 1 RxComputationThreadPool-1 2 RxComputationThreadPool-1 3 RxComputationThreadPool-1 4 RxComputationThreadPool-1 5 RxComputationThreadPool-1 6 RxComputationThreadPool-1 7 RxComputationThreadPool-1 8 RxComputationThreadPool-1 9 RxComputationThreadPool-1 blockingFirst is 9
toList.
반응형'Language > ReactiveX' 카테고리의 다른 글
RxJava Hot vs Cold Observable 알아보기 (0) 2023.10.02 RxJava Flowable 알아보기 (0) 2023.10.02 RxJava Combining Operator 알아보기 (0) 2023.10.01 RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable) (0) 2023.10.01 RxJava Observable 알아보기 (0) 2023.09.29