-
RxJava Combining Operator 알아보기Language/ReactiveX 2023. 10. 1. 17:50728x90반응형
- 목차
관련된 글
https://westlife0615.tistory.com/317
소개.
RxJava 의 결합 연산자에 대해서 알아보려고 합니다.
mergeWith.
mergeWith 는 두개 이상의 데이터 소스를 하나의 데이터 소스로 결합하는 결합 연산자입니다.
merge Synchronously.
아래의 예시는 동기적인 방식으로 두 소스코드를 결합한 예시입니다.
dataSource1 과 dataSource2 에 해당하는 두 Observable 을 결합한 예시이며,
두 Observable 모두 Main Thread 에서 실행됩니다.
그래서 실행 결과 또한 dataSource1 의 subscribe 이 먼저 실행되고, 그 후에 dataSource2 의 subscribe 가 실행됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> dataSource1 = Observable.range(1, 10); Observable<Integer> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i + 100); } emitter.onComplete(); }); dataSource1 .mergeWith(dataSource2) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while(true) {} } }
1 2 3 4 5 6 7 8 9 10 100 101 102 103 104 105 106 107 108 109
merge Asynchronously.
subscribeOn 연산자를 통해서 Observable 의 Data 처리를 비동기적으로 처리할 수 있습니다.
아래 예시는 두 dataSource Observable 을 개별적인 Worker Thread 에서 실행되게 처리한 코드인데요.
실행결과를 보면 Multi Threaded 구조로 동시 실행됨을 파악할 수 있습니다.
mergeWith 와 subscribeOn 연산자들을 적용하여 데이터 흐름의 순서대로 둘 이상의 DataSource 를 결합할 수 있습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<String> dataSource1 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(String.format("$dataSource1_%s", i)); } emitter.onComplete(); }); Observable<String> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(String.format("$dataSource2_%s", i)); } emitter.onComplete(); }); dataSource1 .subscribeOn(Schedulers.computation()) .mergeWith(dataSource2.subscribeOn(Schedulers.computation())) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while(true) {} } }
<실행 결과>
// 생략 ... $dataSource1_39 $dataSource1_40 $dataSource1_41 $dataSource1_42 $dataSource1_43 $dataSource2_0 $dataSource1_44 $dataSource1_45 $dataSource1_46 $dataSource2_1 $dataSource1_47 $dataSource2_2 $dataSource1_48 $dataSource1_49 $dataSource2_3 $dataSource2_4 $dataSource2_5 $dataSource2_6' // 생략 ...
데이터 소스의 타입이 다를 경우.
데이터 소스의 타입이 다른 경우에는 결합이 불가능합니다.
Map 연산자를 동원하여 타입을 통일시켜야합니다.
dataSource1 이 Observable<Integer> 타입이며, map 연산을 통해 Observable<String> 으로 변환하는 코드입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource1 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(i); } emitter.onComplete(); }); Observable<String> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(String.format("$dataSource2_%s", i)); } emitter.onComplete(); }); dataSource1 .map(integer -> String.format("$dataSource1_%s", integer)) .subscribeOn(Schedulers.computation()) .mergeWith(dataSource2.subscribeOn(Schedulers.computation())) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while(true) {} } }
concatWith.
concatWith Operator 는 2개 이상의 Observable 의 순서를 보장합니다.
2개의 Observable 이 concatWith 로 결합된다면 첫번째 Observable 이 Completion 된 이후에 두번째 Observable 의 data emission 이 시작됩니다.
아래는 concatWith 의 예시 코드입니다.
dataSource1 과 dataSource2 Observable 는 각각 다른 Thread 에서 동작하는 비동기 작업이지만,
dataSource1 의 모든 Data Emission 이 완료된 이후에 dataSource2 의 Data Emission 이 시작됩니다.
즉, Observable 의 Sequence 가 보장됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<String> dataSource1 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(String.format("$dataSource1_%s_%s", i, Thread.currentThread().getName())); } emitter.onComplete(); }); Observable<String> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 50; i++) { emitter.onNext(String.format("$dataSource2_%s_%s", i, Thread.currentThread().getName())); } emitter.onComplete(); }); dataSource1 .subscribeOn(Schedulers.computation()) .concatWith(dataSource2.subscribeOn(Schedulers.computation())) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while(true) {} } }
// 생략 ... $dataSource1_45_RxComputationThreadPool-1 $dataSource1_46_RxComputationThreadPool-1 $dataSource1_47_RxComputationThreadPool-1 $dataSource1_48_RxComputationThreadPool-1 $dataSource1_49_RxComputationThreadPool-1 $dataSource2_0_RxComputationThreadPool-2 $dataSource2_1_RxComputationThreadPool-2 $dataSource2_2_RxComputationThreadPool-2 $dataSource2_3_RxComputationThreadPool-2 // 생략 ...
zipWith.
zipWith 결합 연산자는 두개의 Observable 에서 emit 되는 Data 를 결합합니다.
단, 서로 순서가 같은 Data 끼리 결합되는 구조로 몇 가지 제약사항이 있습니다.
DataSource1, DataSource2 의 data 갯수가 다른 경우.
DataSource1 이 emit 하는 데이터의 수가 1개이고, DataSource2 가 emit 하는 데이터의 수가 100개라고 가정하겠습니다.
이때, zipWith 연산자에 의해서 Emit 되는 데이터의 갯수는 1개입니다.
왜냐하면 두 Observable 사이에 pairing 될 수 있는 데이터는 1개 이기 때문입니다.
DataSource2 의 나머지 99개의 데이터는 DataSource1 의 1개의 데이터와 쌍을 지을 수 없기 때문에 ZipWith 이후의 파이프라인을 통과할 수 없습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<String> dataSource1 = Observable.create(emitter -> { for (int i = 0; i < 1; i++) { emitter.onNext(String.format("$dataSource1_%s_%s", i, Thread.currentThread().getName())); } emitter.onComplete(); }); Observable<String> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 100; i++) { emitter.onNext(String.format("$dataSource2_%s_%s", i, Thread.currentThread().getName())); } emitter.onComplete(); }); dataSource1 .subscribeOn(Schedulers.computation()) .zipWith(dataSource2.subscribeOn(Schedulers.computation()), (s, s2) -> s + s2) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while (true) { } } }
<실행 결과>
$dataSource1_0_RxComputationThreadPool-1$dataSource2_0_RxComputationThreadPool-2
zipWith 는 BiFunction 으로 반드시 하나의 output 을 가진다.
두 Observable 인 DataSource1 과 DataSource2 는 zipWith 의 결합 연산자에 의해서 하나의 output 으로 변형됩니다.
이때, BiFunction 함수형 인터페이스가 사용됩니다.
이 과정에서 형변환도 가능합니다.
아래 예시는 Observable<Integer> 타입의 dataSource1 과 dataSource2 가 Observable<String> 으로 결합 및 변환되는 과정입니다.
zipWith 를 통해서 결합 뿐만 아니라 Map/FlatMap 와 같은 변형 연산도 수행 가능합니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.functions.BiFunction; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource1 = Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } emitter.onComplete(); }); Observable<Integer> dataSource2 = Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } emitter.onComplete(); }); dataSource1 .subscribeOn(Schedulers.computation()) .zipWith(dataSource2.subscribeOn(Schedulers.computation()), new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer integer, Integer integer2) throws Throwable { return String.format("%s_%s_%s", integer, integer2, Thread.currentThread().getName()); } }) .subscribe(s -> System.out.println(s), t -> System.err.println(t)); while (true) { } } }
<실행 결과>
0_0_RxComputationThreadPool-2 1_1_RxComputationThreadPool-2 2_2_RxComputationThreadPool-2 3_3_RxComputationThreadPool-2 4_4_RxComputationThreadPool-2 5_5_RxComputationThreadPool-2 6_6_RxComputationThreadPool-2 7_7_RxComputationThreadPool-2 8_8_RxComputationThreadPool-2 9_9_RxComputationThreadPool-2
반응형'Language > ReactiveX' 카테고리의 다른 글
RxJava Flowable 알아보기 (0) 2023.10.02 RxJava Terminating Operator 알아보기 (0) 2023.10.02 RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable) (0) 2023.10.01 RxJava Observable 알아보기 (0) 2023.09.29 RxJava Thread Scheduling 알아보기 (0) 2023.09.28