https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니
Observable 의 Data Emission 방식에는 두가지가 존재합니다.
하나는 Hot Observable 그리고 다른 하나는 Cold Observable 방식입니다.
Cold Observable 은 우리가 알고 있는 기존의 방식입니다.
Observable 의 subscribe 함수를 통해서 Consumer 를 등록하고,
Observable 은 Consumer 가 등록된 이후에 Data Emission 을 시작합니다.
Hot Observable 은 동작방식이 조금 다릅니다.
Subscriber 의 등록과 관계없이 Data Emission 을 시작합니다.
이를 Broadcasting 이라고도 하는데요.
데이터의 전달을 담당하는 Source 영역은 구독하는 Consumer 의 존재와 상관없이 데이터를 송출합니다.
데이터의 Loss 나 목적지 여부는 신경쓰지 않는 구조입니다.
Cold 와 Hot 이라고 이름붙인 이유는 온도와는 크게 상관없습니다.
그저 데이터의 전달방식이 얼마나 다른 요인들과 Independent 하고 active 한지에 따라 Hot 이나 Cold 인지를 나누는 기준인 것 같네요.
Hot Observable.
Hot Observable 은 Observable Chain 의 다른 요인들과 독립적으로 Data Emission 을 수행합니다.
Subscriber 가 있든 없든, 데이터의 목적지가 있든 없든지 개의치 않고 Data Emission 을 수행합니다.
RxJava 에서 Hot Observable 을 구현하는 class 들이 따로 존재합니다.
- PublishSubject
- Connectable
PublishSubject 클래스를 사용하여 Hot Observable 을 구현할 수 있습니다.
Observable 이 데이터 스트림을 내장하고 있는 구조인 것과 반대로
PublishSubject 는 외부 코드에서 직접 Data Emission 을 구현합니다.
이를 Subject Pattern 이라고 합니다.
Observable 이 데이터 생산을 책임지고 Observer 가 데이터의 소비를 책임지는 구조와 달리
Subject 는 Subject API 를 통해서 Observable 와 Observer 역할을 모두 수행합니다.
데이터를 생성하고 소비하는 역할을 모두 수행할 수 있습니다.
아래는 관련된 예시입니다.
PublishSubject 는 1 부터 20까지의 Integer 를 Emit 합니다.
1 부터 10까지의 데이터를 Emit 하는 과정에는 Subscriber 가 없습니다.
그리고 11부터 20까지의 데이터를 Emit 하는 과정에는 Subscriber 가 존재합니다.
Subject Pattern 상 Subscriber 의 존재와 무관하게 Data Emission 이 실행됩니다.
그래서 아래의 코드에선 11 부터 20 까지의 데이터만 소비할 수 있습니다.
package org.example; import io.reactivex.rxjava3.subjects.PublishSubject; public class Main { public static void main(String[] args) { PublishSubject<Integer> subject = PublishSubject.create(); for (int i = 1; i <= 10; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 20; i++) { subject.onNext(i); } } }
<실행 결과>
thread : main, value : 11 thread : main, value : 12 thread : main, value : 13 thread : main, value : 14 thread : main, value : 15 thread : main, value : 16 thread : main, value : 17 thread : main, value : 18 thread : main, value : 19
BehaviorSubject 는 PublishSubject 와 유사합니다.
차이점은 마지막으로 생성한 데이터를 기억하는지 여부입니다.
PublishSubject 는 "Fire and Forget" 방식으로 데이터를 생성합니다.
반면 BehaviorSubject 는 생성했던 마지막 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,
마지막 데이터부터 Emission 을 시작합니다.
아래 예시는 BehaviorSubject 가 마지막으로 생성했던 데이터를 기억해두었다가,
새로운 Subscriber 의 initialValue 로 전달하는 과정을 코드로 작성하였습니다.
1 부터 5 까지의 데이터가 onNext 되는 동안에는 어떠한 Subscriber 도 존재하지 않습니다.
6 부터 10 까지의 데이터를 onNext 하는 동안 하나의 Subscriber 가 존재합니다.
이때, 데이터의 Consume 이 수행됩니다.
11 부터 15 까지의 데이터를 onNext 하는 동안에는 2개의 Subscriber 들이 존재합니다.
새롭게 생성된 Subscriber 는 observeOn 연산자를 통해서 "RxCachedThreadScheduler-1" 쓰레드에서 작업이 수행되도록 설정하였습니다.
그리고 value 를 10 부터 15까지 consume 하는 것을 확인할 수 있습니다.
BehaviorSubject 는 10 이라는 latest value 를 기억해두었다가 새로운 Subscriber 에게 전달합니다.
package org.example; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.BehaviorSubject; public class Main { public static void main(String[] args) { BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(0); for (int i = 1; i <= 5; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 6; i <= 10; i++) { subject.onNext(i); } subject .observeOn(Schedulers.io()) .subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 15; i++) { subject.onNext(i); } while (true) { } } }
<실행 결과>
thread : main, value : 5 thread : main, value : 6 thread : main, value : 7 thread : main, value : 8 thread : main, value : 9 thread : main, value : 10 thread : main, value : 11 thread : RxCachedThreadScheduler-1, value : 10 thread : main, value : 12 thread : RxCachedThreadScheduler-1, value : 11 thread : main, value : 13 thread : RxCachedThreadScheduler-1, value : 12 thread : main, value : 14 thread : RxCachedThreadScheduler-1, value : 13 thread : RxCachedThreadScheduler-1, value : 14 thread : main, value : 15 thread : RxCachedThreadScheduler-1, value : 15
ReplaySubject 는 PublishSubject 와 유사합니다.
차이점은 데이터의 Replay 여부입니다.
반면 ReplaySubject 는 생성했던 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,
첫 데이터부터 Emission 을 Replay 합니다.
카프카의 Consumer 로 비유하지면 offset 을 "earliest" 으로 설정하는 경우와 비슷합니다.
관련된 예시입니다.
실행 결과는 보시면 ReplaySubject 의 역할을 한눈에 파악하실 수 있을 겁니다.
Subscriber 가 등록되기 이전의 모든 데이터들이 Replay 되어 소비됩니다.
package org.example; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.ReplaySubject; public class Main { public static void main(String[] args) { ReplaySubject<Integer> subject = ReplaySubject.create(); for (int i = 1; i <= 5; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 6; i <= 10; i++) { subject.onNext(i); } subject .observeOn(Schedulers.io()) .subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 15; i++) { subject.onNext(i); } while (true) { } } }
<실행 결과>
thread : main, value : 1 thread : main, value : 2 thread : main, value : 3 thread : main, value : 4 thread : main, value : 5 thread : main, value : 6 thread : main, value : 7 thread : main, value : 8 thread : main, value : 9 thread : main, value : 10 thread : main, value : 11 thread : RxCachedThreadScheduler-1, value : 1 thread : main, value : 12 thread : RxCachedThreadScheduler-1, value : 2 thread : main, value : 13 thread : RxCachedThreadScheduler-1, value : 3 thread : main, value : 14 thread : RxCachedThreadScheduler-1, value : 4 thread : main, value : 15 thread : RxCachedThreadScheduler-1, value : 5 thread : RxCachedThreadScheduler-1, value : 6 thread : RxCachedThreadScheduler-1, value : 7 thread : RxCachedThreadScheduler-1, value : 8 thread : RxCachedThreadScheduler-1, value : 9 thread : RxCachedThreadScheduler-1, value : 10 thread : RxCachedThreadScheduler-1, value : 11 thread : RxCachedThreadScheduler-1, value : 12 thread : RxCachedThreadScheduler-1, value : 13 thread : RxCachedThreadScheduler-1, value : 14 thread : RxCachedThreadScheduler-1, value : 15
ConnectableObservable 은 Cold Observable 을 Hot Observable 로 변경할 수 있는 클래스입니다.
Subject Pattern 과 달리 Observable 과 Observer 의 존재를 각각 유지하며, Observable 의 Data Emission 시점을 결정할 수 있습니다.
관련된 예시를 먼저 살펴보겠습니다.
아래 예시는 ConnectableObservable 을 생성하는 예시입니다.
Observable 의 publish 함수를 통해서 ConnectableObservable 을 생성할 수 있습니다.
아래의 예시에선 어떠한 데이터도 consume 되지 않습니다.
왜냐하면 ConnectableObservable 은 connect 함수를 통해서 ConnectableObservable 를 실행시켜야하는 구조입니다.
connect 함수를 통해서 Hot Observable 방식의 Broadcasting 이 시작됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } }
Connect 이후에 Consume 가능함.
아래 예시는 connect 함수를 사용하여 ConnectableObservable 을 활성화시킨 케이스입니다.
connect 이후에 subscriber 의 consume 이 시작됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); connectableObservable.connect(); } }
<실행 결과>
thread is main, value is 1 thread is main, value is 2 thread is main, value is 3 thread is main, value is 4 thread is main, value is 5 thread is main, value is 6 thread is main, value is 7 thread is main, value is 8 thread is main, value is 9 thread is main, value is 10
Replay 가 가능할까?
connect 이후에 생성된 Subscriber 가 과거의 데이터를 Consume 을 할 수 없습니다.
왜냐하면 기본적으로 Replay 되지 않습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); connectableObservable.connect(); connectableObservable.subscribe((value) -> { System.out.println("it can not do replay!"); }); } }
<실행 결과>
thread is main, value is 1 thread is main, value is 2 thread is main, value is 3 thread is main, value is 4 thread is main, value is 5 thread is main, value is 6 thread is main, value is 7 thread is main, value is 8 thread is main, value is 9 thread is main, value is 10
Hot Observable 은 기본적으로 Broadcasting 을 위한 Observable 입니다.
여러 Subscriber 들에게 데이터를 제공합니다.
Cold Observable 를 여러 Subscriber 들이 Consume 하게 되면
Observable 의 불필요한 로직의 수행이 중복됩니다.
이게 무슨 의미냐하면
하나의 Observable 에 여러 Subscriber 가 붙게 되면,
여러 Subscriber 에게 데이터를 전송하기 위해서 Observable 이 Subscriber 갯수만큼 데이터를 전송하게 됩니다.
즉, Observable 의 Data Emission 로직이 중복으로 수행됩니다.
Cold Observable 에 여러 Subscriber 가 연결될 때.
Cold Observable 방식에서 여러 Subscriber 가 연결될 때,
Observable 을 데이터 생성 로직을 중복으로 수행합니다.
아래의 "here is unnecassary logic!" 문자열이 중복으로 수행됨을 알 수 있습니다.
만약 이러한 로직이 Database 조회나 heavy 한 계산을 해야하는 경우라면, 퍼포먼스에 악영향을 끼칩니다.
말그대로 불필요한 과정이죠.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { System.out.println("here is unnecessary logic!"); emitter.onNext(1); emitter.onComplete(); }); for (int i = 1; i <= 5; i++) { observable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } } }
here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1
Hot Observable 에 여러 Subscriber 가 연결될 때.
ConnectableObservable 을 통해서 Hot Observable 을 구현하였습니다.
5개의 Subscriber 가 존재하구요.
아래 코드를 실행하면, Observable 의 emit 관련 로직이 1회만 실행됨을 알 수 있습니다.
"Fire and Forget" 방식으로 모든 Consumer 들에게 데이터를 Broadcasting 합니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { System.out.println("here is unnecessary logic!"); emitter.onNext(1); emitter.onComplete(); }); ConnectableObservable<Integer> connectableObservable = observable.publish(); for (int i = 1; i <= 5; i++) { connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } connectableObservable.connect(); } }
<실행 결과>
here is unnecessary logic! thread is main, value is 1 thread is main, value is 1 thread is main, value is 1 thread is main, value is 1 thread is main, value is 1
