ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Combining Operator 알아보기
    Language/ReactiveX 2023. 10. 1. 17:50
    728x90
    반응형

     

    - 목차

     

     

    관련된 글

    https://westlife0615.tistory.com/317

     

    RxJava Observable 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니

    westlife0615.tistory.com

     

    소개.

    RxJava 의 결합 연산자에 대해서 알아보려고 합니다.

     

    mergeWith.

    mergeWith 는 두개 이상의 데이터 소스를 하나의 데이터 소스로 결합하는 결합 연산자입니다.

     

    merge Synchronously.

     

    아래의 예시는 동기적인 방식으로 두 소스코드를 결합한 예시입니다.

    dataSource1 과 dataSource2 에 해당하는 두 Observable 을 결합한 예시이며,

    두 Observable 모두 Main Thread 에서 실행됩니다.

    그래서 실행 결과 또한 dataSource1 의 subscribe 이 먼저 실행되고, 그 후에 dataSource2 의 subscribe 가 실행됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> dataSource1 = Observable.range(1, 10);
        Observable<Integer> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 10; i++) {
            emitter.onNext(i + 100);
          }
          emitter.onComplete();
        });
    
        dataSource1
                .mergeWith(dataSource2)
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
        
        while(true) {}
      }
    }

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109

     

    merge Asynchronously.

    subscribeOn 연산자를 통해서 Observable 의 Data 처리를 비동기적으로 처리할 수 있습니다.

    아래 예시는 두 dataSource Observable 을 개별적인 Worker Thread 에서 실행되게 처리한 코드인데요.

    실행결과를 보면 Multi Threaded 구조로 동시 실행됨을 파악할 수 있습니다.

     

    mergeWith 와 subscribeOn 연산자들을 적용하여 데이터 흐름의 순서대로 둘 이상의 DataSource 를 결합할 수 있습니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<String> dataSource1 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(String.format("$dataSource1_%s", i));
          }
          emitter.onComplete();
        });
        Observable<String> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(String.format("$dataSource2_%s", i));
          }
          emitter.onComplete();
        });
    
        dataSource1
                .subscribeOn(Schedulers.computation())
                .mergeWith(dataSource2.subscribeOn(Schedulers.computation()))
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
    
        while(true) {}
      }
    }

     

    <실행 결과>

    // 생략 ...
    $dataSource1_39
    $dataSource1_40
    $dataSource1_41
    $dataSource1_42
    $dataSource1_43
    $dataSource2_0
    $dataSource1_44
    $dataSource1_45
    $dataSource1_46
    $dataSource2_1
    $dataSource1_47
    $dataSource2_2
    $dataSource1_48
    $dataSource1_49
    $dataSource2_3
    $dataSource2_4
    $dataSource2_5
    $dataSource2_6'
    // 생략 ...

     

     

    데이터 소스의 타입이 다를 경우.

     

    데이터 소스의 타입이 다른 경우에는 결합이 불가능합니다.

    Map 연산자를 동원하여 타입을 통일시켜야합니다.

     

    dataSource1 이 Observable<Integer> 타입이며, map 연산을 통해 Observable<String> 으로 변환하는 코드입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> dataSource1 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(i);
          }
          emitter.onComplete();
        });
        Observable<String> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(String.format("$dataSource2_%s", i));
          }
          emitter.onComplete();
        });
    
        dataSource1
                .map(integer -> String.format("$dataSource1_%s", integer))
                .subscribeOn(Schedulers.computation())
                .mergeWith(dataSource2.subscribeOn(Schedulers.computation()))
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
    
        while(true) {}
      }
    }

     

    concatWith.

    concatWith Operator 는 2개 이상의 Observable 의 순서를 보장합니다.

    2개의 Observable 이 concatWith 로 결합된다면 첫번째 Observable 이 Completion 된 이후에 두번째 Observable 의 data emission 이 시작됩니다.

     

    아래는 concatWith 의 예시 코드입니다.

     

    dataSource1 과 dataSource2 Observable 는 각각 다른 Thread 에서 동작하는 비동기 작업이지만,

    dataSource1 의 모든 Data Emission 이 완료된 이후에 dataSource2 의 Data Emission 이 시작됩니다.

    즉, Observable 의 Sequence 가 보장됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<String> dataSource1 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(String.format("$dataSource1_%s_%s", i, Thread.currentThread().getName()));
          }
          emitter.onComplete();
        });
        Observable<String> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 50; i++) {
            emitter.onNext(String.format("$dataSource2_%s_%s", i, Thread.currentThread().getName()));
          }
          emitter.onComplete();
        });
    
        dataSource1
                .subscribeOn(Schedulers.computation())
                .concatWith(dataSource2.subscribeOn(Schedulers.computation()))
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
    
        while(true) {}
      }
    }

     

    // 생략 ...
    $dataSource1_45_RxComputationThreadPool-1
    $dataSource1_46_RxComputationThreadPool-1
    $dataSource1_47_RxComputationThreadPool-1
    $dataSource1_48_RxComputationThreadPool-1
    $dataSource1_49_RxComputationThreadPool-1
    $dataSource2_0_RxComputationThreadPool-2
    $dataSource2_1_RxComputationThreadPool-2
    $dataSource2_2_RxComputationThreadPool-2
    $dataSource2_3_RxComputationThreadPool-2
    // 생략 ...

     

    zipWith.

     

    zipWith 결합 연산자는 두개의 Observable 에서 emit 되는 Data 를 결합합니다.

    단, 서로 순서가 같은 Data 끼리 결합되는 구조로 몇 가지 제약사항이 있습니다.

     

    DataSource1, DataSource2 의 data 갯수가 다른 경우.

    DataSource1 이 emit 하는 데이터의 수가 1개이고, DataSource2 가 emit 하는 데이터의 수가 100개라고 가정하겠습니다.

    이때, zipWith 연산자에 의해서 Emit 되는 데이터의 갯수는 1개입니다.

    왜냐하면 두 Observable 사이에 pairing 될 수 있는 데이터는 1개 이기 때문입니다.

    DataSource2 의 나머지 99개의 데이터는 DataSource1 의 1개의 데이터와 쌍을 지을 수 없기 때문에 ZipWith 이후의 파이프라인을 통과할 수 없습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<String> dataSource1 = Observable.create(emitter -> {
          for (int i = 0; i < 1; i++) {
            emitter.onNext(String.format("$dataSource1_%s_%s", i, Thread.currentThread().getName()));
          }
          emitter.onComplete();
        });
        Observable<String> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 100; i++) {
            emitter.onNext(String.format("$dataSource2_%s_%s", i, Thread.currentThread().getName()));
          }
          emitter.onComplete();
        });
    
        dataSource1
                .subscribeOn(Schedulers.computation())
                .zipWith(dataSource2.subscribeOn(Schedulers.computation()), (s, s2) -> s + s2)
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
    
        while (true) {
        }
      }
    }

     

    <실행 결과>

    $dataSource1_0_RxComputationThreadPool-1$dataSource2_0_RxComputationThreadPool-2

     

     

    zipWith 는 BiFunction 으로 반드시 하나의 output 을 가진다.

     

    두 Observable 인 DataSource1 과 DataSource2 는 zipWith 의 결합 연산자에 의해서 하나의 output 으로 변형됩니다.

    이때, BiFunction 함수형 인터페이스가 사용됩니다.

    이 과정에서 형변환도 가능합니다.

     

    아래 예시는 Observable<Integer> 타입의 dataSource1 과 dataSource2 가 Observable<String> 으로 결합 및 변환되는 과정입니다.

    zipWith 를 통해서 결합 뿐만 아니라 Map/FlatMap 와 같은 변형 연산도 수행 가능합니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.BiFunction;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> dataSource1 = Observable.create(emitter -> {
          for (int i = 0; i < 10; i++) {
            emitter.onNext(i);
          }
          emitter.onComplete();
        });
        Observable<Integer> dataSource2 = Observable.create(emitter -> {
          for (int i = 0; i < 10; i++) {
            emitter.onNext(i);
          }
          emitter.onComplete();
        });
    
        dataSource1
                .subscribeOn(Schedulers.computation())
                .zipWith(dataSource2.subscribeOn(Schedulers.computation()), new BiFunction<Integer, Integer, String>() {
                  @Override
                  public String apply(Integer integer, Integer integer2) throws Throwable {
                    return String.format("%s_%s_%s", integer, integer2, Thread.currentThread().getName());
                  }
                })
                .subscribe(s -> System.out.println(s), t -> System.err.println(t));
    
        while (true) {
        }
      }
    }

     

    <실행 결과>

    0_0_RxComputationThreadPool-2
    1_1_RxComputationThreadPool-2
    2_2_RxComputationThreadPool-2
    3_3_RxComputationThreadPool-2
    4_4_RxComputationThreadPool-2
    5_5_RxComputationThreadPool-2
    6_6_RxComputationThreadPool-2
    7_7_RxComputationThreadPool-2
    8_8_RxComputationThreadPool-2
    9_9_RxComputationThreadPool-2

     

    반응형
Designed by Tistory.