ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Hot vs Cold Observable 알아보기
    Language/ReactiveX 2023. 10. 2. 11:11
    728x90
    반응형

    - 목차

     

     

    관련된 글

    https://westlife0615.tistory.com/317

     

    RxJava Observable 알아보기

    - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니

    westlife0615.tistory.com

     

     

    소개.

    Observable 의 Data Emission 방식에는 두가지가 존재합니다. 

    하나는 Hot Observable 그리고 다른 하나는 Cold Observable 방식입니다.

    Cold Observable 은 우리가 알고 있는 기존의 방식입니다.

    Observable 의 subscribe 함수를 통해서 Consumer 를 등록하고, 

    Observable 은 Consumer 가 등록된 이후에 Data Emission 을 시작합니다.

     

    Hot Observable 은 동작방식이 조금 다릅니다.

    Subscriber 의 등록과 관계없이 Data Emission 을 시작합니다.

    이를 Broadcasting 이라고도 하는데요.

    데이터의 전달을 담당하는 Source 영역은 구독하는 Consumer 의 존재와 상관없이 데이터를 송출합니다.

    데이터의 Loss 나 목적지 여부는 신경쓰지 않는 구조입니다.

     

    ColdHot 이라고 이름붙인 이유는 온도와는 크게 상관없습니다.

    그저 데이터의 전달방식이 얼마나 다른 요인들과 Independent 하고 active 한지에 따라 Hot 이나 Cold 인지를 나누는 기준인 것 같네요.

     

    Hot Observable.

    Hot Observable 은 Observable Chain 의 다른 요인들과 독립적으로 Data Emission 을 수행합니다.

    Subscriber 가 있든 없든, 데이터의 목적지가 있든 없든지 개의치 않고 Data Emission 을 수행합니다. 

    RxJava 에서 Hot Observable 을 구현하는 class 들이 따로 존재합니다.

     

    - PublishSubject

    - Connectable

     

     

    PublishSubject.

     

    PublishSubject 클래스를 사용하여 Hot Observable 을 구현할 수 있습니다.

    Observable 이 데이터 스트림을 내장하고 있는 구조인 것과 반대로

    PublishSubject 는 외부 코드에서 직접 Data Emission 을 구현합니다.

    이를 Subject Pattern 이라고 합니다.

     

    Observable 이 데이터 생산을 책임지고 Observer 가 데이터의 소비를 책임지는 구조와 달리

    Subject 는 Subject API 를 통해서 Observable 와 Observer 역할을 모두 수행합니다.

    데이터를 생성하고 소비하는 역할을 모두 수행할 수 있습니다.

     

    아래는 관련된 예시입니다.

    PublishSubject 는 1 부터 20까지의 Integer 를 Emit 합니다.

    1 부터 10까지의 데이터를 Emit 하는 과정에는 Subscriber 가 없습니다.

    그리고 11부터 20까지의 데이터를 Emit 하는 과정에는 Subscriber 가 존재합니다.

    Subject Pattern 상 Subscriber 의 존재와 무관하게 Data Emission 이 실행됩니다.

    그래서 아래의 코드에선 11 부터 20 까지의 데이터만 소비할 수 있습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.subjects.PublishSubject;
    
    public class Main {
    
      public static void main(String[] args) {
    
        PublishSubject<Integer> subject = PublishSubject.create();
        for (int i = 1; i <= 10; i++) {
          subject.onNext(i);
        }
        subject.subscribe((value) -> {
          System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value));
        });
        for (int i = 11; i <= 20; i++) {
          subject.onNext(i);
        }
    
      }
    }

     

    <실행 결과>

    thread : main, value : 11
    thread : main, value : 12
    thread : main, value : 13
    thread : main, value : 14
    thread : main, value : 15
    thread : main, value : 16
    thread : main, value : 17
    thread : main, value : 18
    thread : main, value : 19

     

     

    BehaviorSubject.

    BehaviorSubject 는 PublishSubject 와 유사합니다.

    차이점은 마지막으로 생성한 데이터를 기억하는지 여부입니다.

    PublishSubject 는 "Fire and Forget" 방식으로 데이터를 생성합니다.

    반면 BehaviorSubject 는 생성했던 마지막 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,

    마지막 데이터부터 Emission 을 시작합니다.

     

    아래 예시는 BehaviorSubject 가 마지막으로 생성했던 데이터를 기억해두었다가,

    새로운 Subscriber 의 initialValue 로 전달하는 과정을 코드로 작성하였습니다.

     

    1 부터 5 까지의 데이터가 onNext 되는 동안에는 어떠한 Subscriber 도 존재하지 않습니다.

    6 부터 10 까지의 데이터를 onNext 하는 동안 하나의 Subscriber 가 존재합니다.

    이때, 데이터의 Consume 이 수행됩니다.

     

    11 부터 15 까지의 데이터를 onNext 하는 동안에는 2개의 Subscriber 들이 존재합니다.

    새롭게 생성된 Subscriber 는 observeOn 연산자를 통해서 "RxCachedThreadScheduler-1" 쓰레드에서 작업이 수행되도록 설정하였습니다.

    그리고 value 를 10 부터 15까지 consume 하는 것을 확인할 수 있습니다.

    BehaviorSubject 는 10 이라는 latest value 를 기억해두었다가 새로운 Subscriber 에게 전달합니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.schedulers.Schedulers;
    import io.reactivex.rxjava3.subjects.BehaviorSubject;
    
    public class Main {
    
      public static void main(String[] args) {
    
        BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(0);
        for (int i = 1; i <= 5; i++) {
          subject.onNext(i);
        }
        subject.subscribe((value) -> {
                  System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value));
                });
        for (int i = 6; i <= 10; i++) {
          subject.onNext(i);
        }
    
        subject
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value));
                });
    
        for (int i = 11; i <= 15; i++) {
          subject.onNext(i);
        }
    
        while (true) {
        }
    
    
      }
    }

     

    <실행 결과>

    thread : main, value : 5
    thread : main, value : 6
    thread : main, value : 7
    thread : main, value : 8
    thread : main, value : 9
    thread : main, value : 10
    thread : main, value : 11
    thread : RxCachedThreadScheduler-1, value : 10
    thread : main, value : 12
    thread : RxCachedThreadScheduler-1, value : 11
    thread : main, value : 13
    thread : RxCachedThreadScheduler-1, value : 12
    thread : main, value : 14
    thread : RxCachedThreadScheduler-1, value : 13
    thread : RxCachedThreadScheduler-1, value : 14
    thread : main, value : 15
    thread : RxCachedThreadScheduler-1, value : 15

     

    ReplaySubejct.

    ReplaySubjectPublishSubject 와 유사합니다.

    차이점은 데이터의 Replay 여부입니다.

    반면 ReplaySubject 는 생성했던 데이터를 기억해두었다가 새로운 Subscriber 가 생기면,

    첫 데이터부터 Emission 을 Replay 합니다.

    카프카의 Consumer 로 비유하지면 offset 을 "earliest" 으로 설정하는 경우와 비슷합니다.

     

    관련된 예시입니다.

    실행 결과는 보시면 ReplaySubject 의 역할을 한눈에 파악하실 수 있을 겁니다.

    Subscriber 가 등록되기 이전의 모든 데이터들이 Replay 되어 소비됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.schedulers.Schedulers;
    import io.reactivex.rxjava3.subjects.ReplaySubject;
    
    public class Main {
    
      public static void main(String[] args) {
    
        ReplaySubject<Integer> subject = ReplaySubject.create();
        for (int i = 1; i <= 5; i++) {
          subject.onNext(i);
        }
        subject.subscribe((value) -> {
                  System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value));
                });
        for (int i = 6; i <= 10; i++) {
          subject.onNext(i);
        }
    
        subject
                .observeOn(Schedulers.io())
                .subscribe((value) -> {
                  System.out.println(String.format("thread : %s, value : %s", Thread.currentThread().getName(), value));
                });
    
        for (int i = 11; i <= 15; i++) {
          subject.onNext(i);
        }
    
        while (true) {
        }
    
    
      }
    }

     

    <실행 결과>

     thread : main, value : 1
    thread : main, value : 2
    thread : main, value : 3
    thread : main, value : 4
    thread : main, value : 5
    thread : main, value : 6
    thread : main, value : 7
    thread : main, value : 8
    thread : main, value : 9
    thread : main, value : 10
    thread : main, value : 11
    thread : RxCachedThreadScheduler-1, value : 1
    thread : main, value : 12
    thread : RxCachedThreadScheduler-1, value : 2
    thread : main, value : 13
    thread : RxCachedThreadScheduler-1, value : 3
    thread : main, value : 14
    thread : RxCachedThreadScheduler-1, value : 4
    thread : main, value : 15
    thread : RxCachedThreadScheduler-1, value : 5
    thread : RxCachedThreadScheduler-1, value : 6
    thread : RxCachedThreadScheduler-1, value : 7
    thread : RxCachedThreadScheduler-1, value : 8
    thread : RxCachedThreadScheduler-1, value : 9
    thread : RxCachedThreadScheduler-1, value : 10
    thread : RxCachedThreadScheduler-1, value : 11
    thread : RxCachedThreadScheduler-1, value : 12
    thread : RxCachedThreadScheduler-1, value : 13
    thread : RxCachedThreadScheduler-1, value : 14
    thread : RxCachedThreadScheduler-1, value : 15

     

     

    ConnectableObservable.

    ConnectableObservable 은 Cold Observable 을 Hot Observable 로 변경할 수 있는 클래스입니다.

    Subject Pattern 과 달리 Observable 과 Observer 의 존재를 각각 유지하며, Observable 의 Data Emission 시점을 결정할 수 있습니다.

     

    관련된 예시를 먼저 살펴보겠습니다.

     

    아래 예시는 ConnectableObservable 을 생성하는 예시입니다.

    Observable 의 publish 함수를 통해서 ConnectableObservable 을 생성할 수 있습니다.

    아래의 예시에선 어떠한 데이터도 consume 되지 않습니다.

    왜냐하면 ConnectableObservable 은 connect 함수를 통해서 ConnectableObservable 를 실행시켜야하는 구조입니다.

    connect 함수를 통해서 Hot Observable 방식의 Broadcasting 이 시작됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.observables.ConnectableObservable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.range(1, 10);
        ConnectableObservable<Integer> connectableObservable = observable.publish();
        connectableObservable.subscribe((value) -> {
          System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value));
        });
    
      }
    }

     

    Connect 이후에 Consume 가능함.

     

    아래 예시는 connect 함수를 사용하여 ConnectableObservable 을 활성화시킨 케이스입니다.

    connect 이후에 subscriber 의 consume 이 시작됩니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.observables.ConnectableObservable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.range(1, 10);
        ConnectableObservable<Integer> connectableObservable = observable.publish();
        connectableObservable.subscribe((value) -> {
          System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value));
        });
        connectableObservable.connect();
    
      }
    }

     

    <실행 결과>

    thread is main, value is 1
    thread is main, value is 2
    thread is main, value is 3
    thread is main, value is 4
    thread is main, value is 5
    thread is main, value is 6
    thread is main, value is 7
    thread is main, value is 8
    thread is main, value is 9
    thread is main, value is 10

     

    Replay 가 가능할까?

     

    connect 이후에 생성된 Subscriber 가 과거의 데이터를 Consume 을 할 수 없습니다.

    왜냐하면 기본적으로 Replay 되지 않습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.observables.ConnectableObservable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.range(1, 10);
        ConnectableObservable<Integer> connectableObservable = observable.publish();
        connectableObservable.subscribe((value) -> {
          System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value));
        });
        connectableObservable.connect();
    
        connectableObservable.subscribe((value) -> {
          System.out.println("it can not do replay!");
        });
      }
    }

     

    <실행 결과>

    thread is main, value is 1
    thread is main, value is 2
    thread is main, value is 3
    thread is main, value is 4
    thread is main, value is 5
    thread is main, value is 6
    thread is main, value is 7
    thread is main, value is 8
    thread is main, value is 9
    thread is main, value is 10

     

     

    Broadcasting.

     

    Hot Observable 은 기본적으로 Broadcasting 을 위한 Observable 입니다.

    여러 Subscriber 들에게 데이터를 제공합니다.

    Cold Observable 를 여러 Subscriber 들이 Consume 하게 되면

    Observable 의 불필요한 로직의 수행이 중복됩니다.

    이게 무슨 의미냐하면

    하나의 Observable 에 여러 Subscriber 가 붙게 되면,

    여러 Subscriber 에게 데이터를 전송하기 위해서 Observable 이 Subscriber 갯수만큼 데이터를 전송하게 됩니다.

    즉, Observable 의 Data Emission 로직이 중복으로 수행됩니다.

     

    Cold Observable 에 여러 Subscriber 가 연결될 때.

    Cold Observable 방식에서 여러 Subscriber 가 연결될 때,

    Observable 을 데이터 생성 로직을 중복으로 수행합니다.

    아래의 "here is unnecassary logic!" 문자열이 중복으로 수행됨을 알 수 있습니다.

    만약 이러한 로직이 Database 조회나 heavy 한 계산을 해야하는 경우라면, 퍼포먼스에 악영향을 끼칩니다.

    말그대로 불필요한 과정이죠.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(emitter -> {
          System.out.println("here is unnecessary logic!");
          emitter.onNext(1);
          emitter.onComplete();
        });
        
        for (int i = 1; i <= 5; i++) {
          observable.subscribe((value) -> {
            System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value));
          });
        }
      }
    }

     

    here is unnecessary logic!
    thread is main, value is 1
    here is unnecessary logic!
    thread is main, value is 1
    here is unnecessary logic!
    thread is main, value is 1
    here is unnecessary logic!
    thread is main, value is 1
    here is unnecessary logic!
    thread is main, value is 1

     

    Hot Observable 에 여러 Subscriber 가 연결될 때.

    ConnectableObservable 을 통해서 Hot Observable 을 구현하였습니다.

    5개의 Subscriber 가 존재하구요.

    아래 코드를 실행하면, Observable 의 emit 관련 로직이 1회만 실행됨을 알 수 있습니다.

    "Fire and Forget" 방식으로 모든 Consumer 들에게 데이터를 Broadcasting 합니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.observables.ConnectableObservable;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> observable = Observable.create(emitter -> {
          System.out.println("here is unnecessary logic!");
          emitter.onNext(1);
          emitter.onComplete();
        });
        ConnectableObservable<Integer> connectableObservable = observable.publish();
        for (int i = 1; i <= 5; i++) {
          connectableObservable.subscribe((value) -> {
            System.out.println(String.format("thread is %s, value is %s", Thread.currentThread().getName(), value));
          });
        }
    
        connectableObservable.connect();
      }
    }

     

    <실행 결과>

    here is unnecessary logic!
    thread is main, value is 1
    thread is main, value is 1
    thread is main, value is 1
    thread is main, value is 1
    thread is main, value is 1

    반응형
Designed by Tistory.