ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Terminating Operator 알아보기
    Language/ReactiveX 2023. 10. 2. 01:08
    728x90
    반응형

    - 목차

     

     

    관련된 글

    https://westlife0615.tistory.com/317

     

    RxJava Observable 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니

    westlife0615.tistory.com

    https://westlife0615.tistory.com/321

     

    RxJava Combining Operator 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/317 RxJava Observable 알아보기 - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Pus

    westlife0615.tistory.com

     

    소개.

    Terminating Operator 는 종결 연산자라고 불리는데요.

    종결 연산자의 의미는 데이터의 흐름을 끝맺는 연산자입니다.

    Java 의 Stream api 중에서 join, reduce, sum 등이 종결 연산자에 해당합니다.

    join 을 예로 들면, Stream 의 모든 데이터를 한데 묶어 처리할 수 있고,

    reduce 와 sum 또한 그 쓰임새가 일맥상통합니다.

     

    RxJava 에서 Stream API 의 종결 연산자와 유사한 API 를 제공합니다.

    Observable Pipeline 을 끝맺는 연산자들로 subscribe 가 대표적인 예시입니다.

     

     

    subscribe.

    subscribe 는 대표적인 Terminating Operator 입니다.

    Observable 의 subscribe Operator 를 통해서 Observable 에 Consumer 들을 등록할 수 있는데요.

    Consumer 들이 Observable 에 구독자로써 등록되는 순간부터 Observable Pipeline 의 데이터 스트리밍이 시작됩니다.

    Observable Pipeline 을 통해 데이터들이 흘러가며, subscribe 로 등록된 Consumer 에 도달하게 됩니다.

     

    Consumer 는 크게 3가지가 존재합니다.

    - 데이터를 소비하는 Consumer (Consumer<Data>)

    - 에러를 핸들링하는 Consumer (Consumer<Throwable>)

    - 스트리밍의 종료를 핸들링하는 Consumer (Action<>)

     

    위 3가지 Consumer 를 통해서 데이터 스트림을 처리하게 됩니다.

     

    사용 예시는 아래와 같습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.range(1, 10);
        observable.subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("finished")
        );
      }
    }

     

    <실행 결과>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

     

     

    forEach.

    forEach 또한 subscribe 처럼 종결 연산자입니다.

    그리고 forEach 도 subscribe 의 기능 중 하나인 Observable 의 Data Emission 을 consume 할 수 있습니다.

    차이점이 있다면, forEach 연산자는 onError 와 onCompletion 에 대한 consume 은 수행할 수 없습니다.

    오직 onNext 에 해당하는 Data Emission 의 Consume 에 특화되어 있습니다.

    그래서 Error Handling 이 필요없는 케이스에 대하여 (Error 가 발생할 일이 없는 구조인 경우) forEach 를 사용할 수 있습니다.

     

    아래 예시는 Observable 의 Data Emission 에 대한 Consume 과정입니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(emitter -> {
          for (int i = 0; i < 10; i++) {
            emitter.onNext(i);
          }
        });
        observable.forEach(System.out::println);
      }
    }

     

    <실행 결과>

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9

     

    다만, Observable 이 onError 를 실행하는 경우에 forEach 는 Error Handling 을 수행할 수 없습니다.

    그래서 위 Observable Pipeline 은 Observable 에서 Error 가 발생한다면 곧바로 종료되는 취약한 코드 구조입니다.

     

    아래 코드는 Throw Error 이후에 forEach 연산자 레벨에서 에러가 발생하는 예시입니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(emitter -> {
          for (int i = 0; i < 10; i++) {
            emitter.onNext(i);
          }
          emitter.onError(new Throwable());
          for (int i = 10; i < 20; i++) {
            emitter.onNext(i);
          }
        });
        observable.forEach(System.out::println);
      }
    }

     

    <실행 결과>

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call.

     

     

    blockingFirst.

    blockingFirst Operator 는 Observable Chain 의 데이터 스트림에서 첫번째 데이터를 얻을 수 있는 Interface 입니다.

    두번째나 세번째 또는 그 이상의 index 의 데이터를 얻고자하는 상황이면 skip Operator 를 함께 사용해야합니다.

    다만, 비동기 상황에서는 데이터의 시퀀스가 보장되지 않으니 이점을 유의해야합니다.

     

    그리고 blockingFirst 를 통해서 전체적인 Observable chain 이 종료되진 않습니다.

    아래의 예시가 관련 코드입니다.

     

    Data Source 에 해당하는 Observable 이 종결되지 않고 unbound 하게 꾸준히 지속되는 상황이라면

    blockingFirst Operator 를 사용한다고 하더라도 Observable Chain 이 종료되진 않습니다.

    이 점을 유의해서 blockingFirst 를 사용하는 것이 중요합니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableEmitter;
    import io.reactivex.rxjava3.core.ObservableOnSubscribe;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            for (int i = 0; i < 100000000; i++) {
              emitter.onNext(i);
              System.out.println(i);
              System.out.println(Thread.currentThread().getName());
              Thread.sleep(1000);
            }
          }
        });
        Integer result = observable
                .subscribeOn(Schedulers.computation())
                .blockingFirst();
        System.out.println(result);
    
        while (true) {}
      }
    }

     

    <실행 결과>

    0
    RxComputationThreadPool-1
    blockingFirst is 0
    1
    RxComputationThreadPool-1
    2
    RxComputationThreadPool-1
    3
    RxComputationThreadPool-1
    4
    RxComputationThreadPool-1
    5
    RxComputationThreadPool-1
    6
    // 생략 ...

     

    blockingLast.

    blockingLast Operator 는 blockingFirst 와 반대로 마지막 데이터를 얻을 수 있는 interface 입니다.

    blockingLast Operator 가 마지막 데이터를 식별하는 기준은 Observable 의 종료 여부입니다.

    Observable 이 onComplete 를 호출하여 종료되었다면, 가장 마지막으로 유입된 데이터를 last data 로 간주합니다.

     

    만약 Observable 이 onComplete 를 호출하지 않는 Observable Chain 이라면 blockingLast 에 의해서 Observable Chain 이 종료되지 않습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableEmitter;
    import io.reactivex.rxjava3.core.ObservableOnSubscribe;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            for (int i = 0; i < 10; i++) {
              emitter.onNext(i);
              System.out.println(i);
              System.out.println(Thread.currentThread().getName());
            }
            emitter.onComplete();
          }
        });
        Integer result = observable
                .subscribeOn(Schedulers.computation())
                .blockingLast();
        System.out.println("blockingFirst is " + result);
    
        while (true) {}
      }
    }

     

    <실행 결과>

    0
    RxComputationThreadPool-1
    1
    RxComputationThreadPool-1
    2
    RxComputationThreadPool-1
    3
    RxComputationThreadPool-1
    4
    RxComputationThreadPool-1
    5
    RxComputationThreadPool-1
    6
    RxComputationThreadPool-1
    7
    RxComputationThreadPool-1
    8
    RxComputationThreadPool-1
    9
    RxComputationThreadPool-1
    blockingFirst is 9

     

     

    toList.

     

    반응형
Designed by Tistory.