-
RxJava Hot vs Cold Observable 알아보기Language/ReactiveX 2023. 10. 2. 11:11728x90반응형
- 목차
관련된 글
https://westlife0615.tistory.com/317
소개.
Observable 의 Data Emission 방식에는 두가지가 존재합니다.
하나는 Hot Observable 그리고 다른 하나는 Cold Observable 방식입니다.
Cold Observable 은 우리가 알고 있는 기존의 방식입니다.
Observable 의 subscribe 함수를 통해서 Consumer 를 등록하고,
Observable 은 Consumer 가 등록된 이후에 Data Emission 을 시작합니다.
Hot Observable 은 동작방식이 조금 다릅니다.
Subscriber 의 등록과 관계없이 Data Emission 을 시작합니다.
이를 Broadcasting 이라고도 하는데요.
데이터의 전달을 담당하는 Source 영역은 구독하는 Consumer 의 존재와 상관없이 데이터를 송출합니다.
데이터의 Loss 나 목적지 여부는 신경쓰지 않는 구조입니다.
Cold 와 Hot 이라고 이름붙인 이유는 온도와는 크게 상관없습니다.
그저 데이터의 전달방식이 얼마나 다른 요인들과 Independent 하고 active 한지에 따라 Hot 이나 Cold 인지를 나누는 기준인 것 같네요.
Hot Observable.
Hot Observable 은 Observable Chain 의 다른 요인들과 독립적으로 Data Emission 을 수행합니다.
Subscriber 가 있든 없든, 데이터의 목적지가 있든 없든지 개의치 않고 Data Emission 을 수행합니다.
RxJava 에서 Hot Observable 을 구현하는 class 들이 따로 존재합니다.
- PublishSubject
- Connectable
PublishSubject.
PublishSubject 클래스를 사용하여 Hot Observable 을 구현할 수 있습니다.
Observable 이 데이터 스트림을 내장하고 있는 구조인 것과 반대로
PublishSubject 는 외부 코드에서 직접 Data Emission 을 구현합니다.
이를 Subject Pattern 이라고 합니다.
Observable 이 데이터 생산을 책임지고 Observer 가 데이터의 소비를 책임지는 구조와 달리
Subject 는 Subject API 를 통해서 Observable 와 Observer 역할을 모두 수행합니다.
데이터를 생성하고 소비하는 역할을 모두 수행할 수 있습니다.
아래는 관련된 예시입니다.
PublishSubject 는 1 부터 20까지의 Integer 를 Emit 합니다.
1 부터 10까지의 데이터를 Emit 하는 과정에는 Subscriber 가 없습니다.
그리고 11부터 20까지의 데이터를 Emit 하는 과정에는 Subscriber 가 존재합니다.
Subject Pattern 상 Subscriber 의 존재와 무관하게 Data Emission 이 실행됩니다.
그래서 아래의 코드에선 11 부터 20 까지의 데이터만 소비할 수 있습니다.
package org.example; import io.reactivex.rxjava3.subjects.PublishSubject; public class Main { public static void main(String[] args) { PublishSubject<Integer> subject = PublishSubject.create(); for (int i = 1; i <= 10; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 20; i++) { subject.onNext(i); } } }
<실행 결과>
thread : main, value : 11 thread : main, value : 12 thread : main, value : 13 thread : main, value : 14 thread : main, value : 15 thread : main, value : 16 thread : main, value : 17 thread : main, value : 18 thread : main, value : 19
BehaviorSubject.
BehaviorSubject 는 PublishSubject 와 유사합니다.
차이점은 마지막으로 생성한 데이터를 기억하는지 여부입니다.
PublishSubject 는 "Fire and Forget" 방식으로 데이터를 생성합니다.
반면 BehaviorSubject 는 생성했던 마지막 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,
마지막 데이터부터 Emission 을 시작합니다.
아래 예시는 BehaviorSubject 가 마지막으로 생성했던 데이터를 기억해두었다가,
새로운 Subscriber 의 initialValue 로 전달하는 과정을 코드로 작성하였습니다.
1 부터 5 까지의 데이터가 onNext 되는 동안에는 어떠한 Subscriber 도 존재하지 않습니다.
6 부터 10 까지의 데이터를 onNext 하는 동안 하나의 Subscriber 가 존재합니다.
이때, 데이터의 Consume 이 수행됩니다.
11 부터 15 까지의 데이터를 onNext 하는 동안에는 2개의 Subscriber 들이 존재합니다.
새롭게 생성된 Subscriber 는 observeOn 연산자를 통해서 "RxCachedThreadScheduler-1" 쓰레드에서 작업이 수행되도록 설정하였습니다.
그리고 value 를 10 부터 15까지 consume 하는 것을 확인할 수 있습니다.
BehaviorSubject 는 10 이라는 latest value 를 기억해두었다가 새로운 Subscriber 에게 전달합니다.
package org.example; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.BehaviorSubject; public class Main { public static void main(String[] args) { BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(0); for (int i = 1; i <= 5; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 6; i <= 10; i++) { subject.onNext(i); } subject .observeOn(Schedulers.io()) .subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 15; i++) { subject.onNext(i); } while (true) { } } }
<실행 결과>
thread : main, value : 5 thread : main, value : 6 thread : main, value : 7 thread : main, value : 8 thread : main, value : 9 thread : main, value : 10 thread : main, value : 11 thread : RxCachedThreadScheduler-1, value : 10 thread : main, value : 12 thread : RxCachedThreadScheduler-1, value : 11 thread : main, value : 13 thread : RxCachedThreadScheduler-1, value : 12 thread : main, value : 14 thread : RxCachedThreadScheduler-1, value : 13 thread : RxCachedThreadScheduler-1, value : 14 thread : main, value : 15 thread : RxCachedThreadScheduler-1, value : 15
ReplaySubejct.
ReplaySubject 는 PublishSubject 와 유사합니다.
차이점은 데이터의 Replay 여부입니다.
반면 ReplaySubject 는 생성했던 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,
첫 데이터부터 Emission 을 Replay 합니다.
카프카의 Consumer 로 비유하지면 offset 을 "earliest" 으로 설정하는 경우와 비슷합니다.
관련된 예시입니다.
실행 결과는 보시면 ReplaySubject 의 역할을 한눈에 파악하실 수 있을 겁니다.
Subscriber 가 등록되기 이전의 모든 데이터들이 Replay 되어 소비됩니다.
package org.example; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.ReplaySubject; public class Main { public static void main(String[] args) { ReplaySubject<Integer> subject = ReplaySubject.create(); for (int i = 1; i <= 5; i++) { subject.onNext(i); } subject.subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 6; i <= 10; i++) { subject.onNext(i); } subject .observeOn(Schedulers.io()) .subscribe((value) -> { System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value)); }); for (int i = 11; i <= 15; i++) { subject.onNext(i); } while (true) { } } }
<실행 결과>
thread : main, value : 1 thread : main, value : 2 thread : main, value : 3 thread : main, value : 4 thread : main, value : 5 thread : main, value : 6 thread : main, value : 7 thread : main, value : 8 thread : main, value : 9 thread : main, value : 10 thread : main, value : 11 thread : RxCachedThreadScheduler-1, value : 1 thread : main, value : 12 thread : RxCachedThreadScheduler-1, value : 2 thread : main, value : 13 thread : RxCachedThreadScheduler-1, value : 3 thread : main, value : 14 thread : RxCachedThreadScheduler-1, value : 4 thread : main, value : 15 thread : RxCachedThreadScheduler-1, value : 5 thread : RxCachedThreadScheduler-1, value : 6 thread : RxCachedThreadScheduler-1, value : 7 thread : RxCachedThreadScheduler-1, value : 8 thread : RxCachedThreadScheduler-1, value : 9 thread : RxCachedThreadScheduler-1, value : 10 thread : RxCachedThreadScheduler-1, value : 11 thread : RxCachedThreadScheduler-1, value : 12 thread : RxCachedThreadScheduler-1, value : 13 thread : RxCachedThreadScheduler-1, value : 14 thread : RxCachedThreadScheduler-1, value : 15
ConnectableObservable.
ConnectableObservable 은 Cold Observable 을 Hot Observable 로 변경할 수 있는 클래스입니다.
Subject Pattern 과 달리 Observable 과 Observer 의 존재를 각각 유지하며, Observable 의 Data Emission 시점을 결정할 수 있습니다.
관련된 예시를 먼저 살펴보겠습니다.
아래 예시는 ConnectableObservable 을 생성하는 예시입니다.
Observable 의 publish 함수를 통해서 ConnectableObservable 을 생성할 수 있습니다.
아래의 예시에선 어떠한 데이터도 consume 되지 않습니다.
왜냐하면 ConnectableObservable 은 connect 함수를 통해서 ConnectableObservable 를 실행시켜야하는 구조입니다.
connect 함수를 통해서 Hot Observable 방식의 Broadcasting 이 시작됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } }
Connect 이후에 Consume 가능함.
아래 예시는 connect 함수를 사용하여 ConnectableObservable 을 활성화시킨 케이스입니다.
connect 이후에 subscriber 의 consume 이 시작됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); connectableObservable.connect(); } }
<실행 결과>
thread is main, value is 1 thread is main, value is 2 thread is main, value is 3 thread is main, value is 4 thread is main, value is 5 thread is main, value is 6 thread is main, value is 7 thread is main, value is 8 thread is main, value is 9 thread is main, value is 10
Replay 가 가능할까?
connect 이후에 생성된 Subscriber 가 과거의 데이터를 Consume 을 할 수 없습니다.
왜냐하면 기본적으로 Replay 되지 않습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.range(1, 10); ConnectableObservable<Integer> connectableObservable = observable.publish(); connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); connectableObservable.connect(); connectableObservable.subscribe((value) -> { System.out.println("it can not do replay!"); }); } }
<실행 결과>
thread is main, value is 1 thread is main, value is 2 thread is main, value is 3 thread is main, value is 4 thread is main, value is 5 thread is main, value is 6 thread is main, value is 7 thread is main, value is 8 thread is main, value is 9 thread is main, value is 10
Broadcasting.
Hot Observable 은 기본적으로 Broadcasting 을 위한 Observable 입니다.
여러 Subscriber 들에게 데이터를 제공합니다.
Cold Observable 를 여러 Subscriber 들이 Consume 하게 되면
Observable 의 불필요한 로직의 수행이 중복됩니다.
이게 무슨 의미냐하면
하나의 Observable 에 여러 Subscriber 가 붙게 되면,
여러 Subscriber 에게 데이터를 전송하기 위해서 Observable 이 Subscriber 갯수만큼 데이터를 전송하게 됩니다.
즉, Observable 의 Data Emission 로직이 중복으로 수행됩니다.
Cold Observable 에 여러 Subscriber 가 연결될 때.
Cold Observable 방식에서 여러 Subscriber 가 연결될 때,
Observable 을 데이터 생성 로직을 중복으로 수행합니다.
아래의 "here is unnecassary logic!" 문자열이 중복으로 수행됨을 알 수 있습니다.
만약 이러한 로직이 Database 조회나 heavy 한 계산을 해야하는 경우라면, 퍼포먼스에 악영향을 끼칩니다.
말그대로 불필요한 과정이죠.
package org.example; import io.reactivex.rxjava3.core.Observable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { System.out.println("here is unnecessary logic!"); emitter.onNext(1); emitter.onComplete(); }); for (int i = 1; i <= 5; i++) { observable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } } }
here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1 here is unnecessary logic! thread is main, value is 1
Hot Observable 에 여러 Subscriber 가 연결될 때.
ConnectableObservable 을 통해서 Hot Observable 을 구현하였습니다.
5개의 Subscriber 가 존재하구요.
아래 코드를 실행하면, Observable 의 emit 관련 로직이 1회만 실행됨을 알 수 있습니다.
"Fire and Forget" 방식으로 모든 Consumer 들에게 데이터를 Broadcasting 합니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.observables.ConnectableObservable; public class Main { public static void main(String[] args) { Observable<Integer> observable = Observable.create(emitter -> { System.out.println("here is unnecessary logic!"); emitter.onNext(1); emitter.onComplete(); }); ConnectableObservable<Integer> connectableObservable = observable.publish(); for (int i = 1; i <= 5; i++) { connectableObservable.subscribe((value) -> { System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value)); }); } connectableObservable.connect(); } }
<실행 결과>
here is unnecessary logic! thread is main, value is 1 thread is main, value is 1 thread is main, value is 1 thread is main, value is 1 thread is main, value is 1
반응형'Language > ReactiveX' 카테고리의 다른 글
RxJava Flowable 알아보기 (0) 2023.10.02 RxJava Terminating Operator 알아보기 (0) 2023.10.02 RxJava Combining Operator 알아보기 (0) 2023.10.01 RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable) (0) 2023.10.01 RxJava Observable 알아보기 (0) 2023.09.29