ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ReactiveX 알아보기
    Language/ReactiveX 2019. 7. 23. 08:40
    728x90
    반응형

     

     

    - 목차

     

     

    소개.

    Reactive X 패러다임에 대해서 알아보려고 합니다.

     

    Push and Pull.

    데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니다.

     

    Pull.

    Pull 모델에 대해서 이야기하기 이전에 데이터의 생성자소비자가 있다고 가정하겠습니다.

    데이터의 생성자데이터를 생성하는 서비스이고, 소비자는 데이터 생성자가 생성하는 데이터를 소비하는 서비스입니다.

    Pull 모델은 데이터 커뮤니케이션의 주체가 데이터 소비자인 데이터 커뮤니케이션 모델입니다.

     

    아래는 Pull 모델의 간단한 구조도입니다.

    데이터 소비자는 생산자가 생성한 데이터를 가져옵니다.

     

    이러한 방식은 Polling 이라고 하는데요.

    데이터 소비자는 자기 자신의 상황에 따라서 주체적으로 데이터를 생산자로부터 가져옵니다.

    대개 Pull Model 은 동기적인 방식을 취합니다.

    왜냐하면 데이터 소비자는 특정 데이터를 요청한 이후에 조회한 데이터를 활용해서 다른 작업을 진행하기 때문입니다.

    요청의 목적 자체가 요청의 응답 데이터를 활용할 의도인 것이죠.

    그래서 보통 Pull Model 은 동기방식을 취합니다.

    Http Request-Response 모델이 대표적인 예시입니다.

     

    반면, Pull 모델이 비동기방식을 취할 수도 있습니다.

    javascript 의 XMLHttpRequest 기능으로 HTTP 통신을 하거나 java 에서 Future 모델을 쓰는 경우는

    Callback 을 활용한 비동기 방식을 사용합니다.

     

     

    Push.

    Push 모델은 데이터 전송의 주도권이 데이터 생산자에게 있습니다.

    그리고 데이터가 생성될 때마다 데이터 생산자는 데이터 소비자들에게 직접적으로 데이터를 전달합니다.

    아래의 그림처럼 데이터 생산자는 데이터 소비자들의 정보를 알고 있습니다.

    그리고 자신과 연결된 소비자들에게 직접적으로 전달하는 메커니즘을 따릅니다.

    다만, 중간에 Kafka 나 RabbitMQ 같은 메시지 시스템을 사용할 수도 있습니다.

    이때는 데이터 생산자가 데이터 소비자들의 정보를 일일이 알지 않아도 됩니다.

    Direct Data Communication 의 역할을 메시지 시스템이 해주기 때문입니다.

     

    Push 모델에서는 소비자는 구독하는 시점부터 데이터를 소비할 수 있습니다.

    그래서 구독 시점보다 과거의 데이터들은 소비자가 확인할 수 없습니다.

    왜냐하면 데이터 생성자가 데이터를 생성하는 시점에 소비자들에게 데이터를 전달하기 때문입니다.

    이러한 모델은 실시간 데이터 처리 영역에서 사용하기 적합합니다.

     

     

    아래 코드는 Push 방식의 RxJava 예시 코드입니다.

    "observable1.subscribe(consumer);" 로직을 통해서 Consumer 가 Producer 에 등록됩니다.

    이 코드 라인을 통해서 subscribe 이 시작되고, Producer 는 데이터를 내보내기 시작합니다.

    (참고로 Observable 이 Producer 입니다. ReactiveX 에서 Observable 은 Producer 이면서 Data Stream 을 뜻합니다.)

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Predicate;
    
    public class Main {
      public static void main(String[] args) {
        
        Observable<Integer> observable1 = Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8);
        Predicate<Integer> filter = integer -> integer > 4;
        Observable<Integer> observable2 = observable1.filter(filter);
        Consumer<Integer> consumer = integer -> System.out.print(integer);
    
        System.out.print("first : ");
        observable1.subscribe(consumer);
        System.out.println();
    
        System.out.print("second : ");
        observable2.subscribe(consumer);
        System.out.println();
        
      }
    
    }

     

    observable.subscribe 이후로 Producer 는 Consumer 에게 데이터를 전달하기 시작합니다.

    Producer 가 전달하는 데이터는 Consumer 에게 전달되고, Consumer 는 등록된 데이터 처리 절차에 따라 소비를 시작합니다.

    first : 12345678
    second : 5678

     

    Sync vs Async.

    Sync 또는 Async 방식은 데이터 처리에 관여하는 Thread 의 수와 처리 순서를 보장하는지 여부를 의미합니다.

    쉽게 표현하면 Observable Pipeline 에서 하나의 Observable 을 처리하는 Thread 를 어떻게 할당하는지를 의미합니다.

    예를 들어보겠습니다.

    아래와 같은 Observable Pipeline 이 있습니다.

    데이터 소스 -> "곱하기 2" 하는 Step -> "제곱 2" 하는 Step -> Print Sink

    Sync 방식에선 하나의 Thread 가 Observable Pipeline [데이터 소스 -> "곱하기 2" 하는 Step -> "제곱 2" 하는 Step -> Print Sink] 을 모두 처리합니다.

    그리고 source 의 데이터 순서와 처리 순서가 동일합니다.

     

    source : 1, 2, 3

    process :

     

    1) 1 X 2 => 2 * 2 => print(4)

    2) 2 X 2 => 4 * 2 => print(16)

    3) 3 X 2 => 6 * 2 => print(36)

     

    처럼 데이터 소스의 순서와 처리 순서가 보장됩니다.

     

    반면, Async 방식에선 데이터의 처리 순서가 보장되지 않습니다.

    그리고 하나의 Thread 가 아니라 여러 Thread 가 Observable Pipeline 의 데이터 처리에 참여합니다.

    그래서 데이터 처리 결과를 보장할 수 없습니다.

    다만 여러 Thread 가 참여하기 때문에 데이터의 처리 속도를 향상시킬 수 있습니다.

     

    아래의 코드는 Observable Pipeline 을 동기 방식으로 처리하는 예시입니다.

    Synchronous 방식으로 처리된 결과는 모두 Main Thread 에서 순차적으로 처리됩니다.

    1 ~ 3 까지의 Source 데이터가 하나씩 순차적으로 처리됩니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Function;
    
    public class Main {
      public static void main(String[] args) throws InterruptedException {
    
        Observable<Integer> observable1 = Observable.range(1, 3);
        Function<Integer, Integer> doubleInput = (Integer value) -> {
          System.out.println("doubleInput : " + Thread.currentThread().getName() + " data is " + value);
          return value * 2;
        };
        Function<Integer, Double> squareInput = (Integer value) -> {
          System.out.println("squareInput : " + Thread.currentThread().getName() + " data is " + value);
          return Math.pow(value, 2);
        };
        Consumer<Double> consumer = (Double value) -> {
          System.out.println("consumer : " + Thread.currentThread().getName() + " data is " + value);
          System.out.println();
        };
    
        observable1
                .map(doubleInput)
                .map(squareInput)
                .subscribe(consumer);
    
        Thread.sleep(100000L);
      }
    
    }

     

    doubleInput : main data is 1
    squareInput : main data is 2
    consumer : main data is 4.0
    
    doubleInput : main data is 2
    squareInput : main data is 4
    consumer : main data is 16.0
    
    doubleInput : main data is 3
    squareInput : main data is 6
    consumer : main data is 36.0

     

    아래 예시는 비동기 방식으로 Observable Pipeline 을 처리하는 예시 코드입니다.

    RxJava 에서는 subscribeOn 과 ObservableOn 를 사용하여 비동기처리를 수행합니다.

    실행 결과를 보면  데이터의 처리에 Thread 5개가 활용되었으며,

    - RxComputationThreadPool-2

    - RxComputationThreadPool-7

    - RxComputationThreadPool-8

    - RxComputationThreadPool-9

    - RxComputationThreadPool-10

     

    결과의 순서도 다릅니다.

    데이터 처리에 사용되는 Thread 를 다양해지면서 처리의 속도를 늘릴 수 있습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Function;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
      public static void main(String[] args) throws InterruptedException {
    
        Observable<Integer> observable1 = Observable.range(1, 3);
        Function<Integer, Integer> doubleInput = (Integer value) -> {
          System.out.println("doubleInput : " + Thread.currentThread().getName() + " data is " + value);
          return value * 2;
        };
        Function<Integer, Double> squareInput = (Integer value) -> {
          System.out.println("squareInput : " + Thread.currentThread().getName() + " data is " + value);
          return Math.pow(value, 2);
        };
        Consumer<Double> consumer = (Double value) -> {
          System.out.println("consumer : " + Thread.currentThread().getName() + " data is " + value);
        };
    
        observable1
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.computation())
                .map(doubleInput)
                .observeOn(Schedulers.computation())
                .flatMap((Function<Integer, ObservableSource<Double>>) integer ->
                        Observable.just(integer)
                                .subscribeOn(Schedulers.computation())
                                .observeOn(Schedulers.computation())
                                .map(squareInput))
                .observeOn(Schedulers.computation())
                .subscribe(consumer);
    
        Thread.sleep(100000L);
      }
    
    }

     

    doubleInput : RxComputationThreadPool-2 data is 1
    doubleInput : RxComputationThreadPool-2 data is 2
    doubleInput : RxComputationThreadPool-2 data is 3
    squareInput : RxComputationThreadPool-7 data is 4
    squareInput : RxComputationThreadPool-8 data is 6
    squareInput : RxComputationThreadPool-9 data is 2
    consumer : RxComputationThreadPool-10 data is 16.0
    consumer : RxComputationThreadPool-10 data is 4.0
    consumer : RxComputationThreadPool-10 data is 36.0

     

    Immutable vs Mutable.

    RxJava 는 Observable Pipeline 에서 처리되는 데이터는 Immutable 방식을 취합니다.

    즉, Source 에서 Emit 하는 데이터는 그 다음 Pipeline 에서 Immutable 한 데이터로 변환되기 때문에

    Downstream 에서 수정한 데이터가 Upstream 에 반영되지 않습니다.

     

    RxJava 의 Pipeline 에서 다루어지는 Immutable 데이터에 대한 예시입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Function;
    
    import java.time.Instant;
    import java.time.temporal.ChronoUnit;
    
    public class Main {
      public static void main(String[] args) throws InterruptedException {
    
        Instant instantMinus2Days = Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, ChronoUnit.DAYS);
        Instant instantMinus1Days = Instant.now().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS);
        Instant instantToday = Instant.now().truncatedTo(ChronoUnit.DAYS).minus(0, ChronoUnit.DAYS);
    
        Observable<Instant> observable1 = Observable.fromArray(instantMinus2Days, instantMinus1Days, instantToday);
    
        Function<Instant, Instant> plus1Sec = (Instant value) -> {
          System.out.println("doubleInput : " + Thread.currentThread().getName() + " data is " + value);
          value = value.plus(1, ChronoUnit.SECONDS);
          return value;
        };
        Function<Instant, Instant> plus1Min = (Instant value) -> {
          System.out.println("squareInput : " + Thread.currentThread().getName() + " data is " + value);
          value = value.plus(1, ChronoUnit.MINUTES);
          return value;
        };
        Consumer<Instant> consumer = (Instant value) -> {
          System.out.println("consumer : " + Thread.currentThread().getName() + " data is " + value);
          System.out.println();
        };
    
        observable1
                .map(plus1Sec)
                .map(plus1Min)
                .subscribe(consumer);
    
    
        System.out.println("original data : " + instantMinus2Days);
        System.out.println("original data : " + instantMinus1Days);
        System.out.println("original data : " + instantToday);
    
        Thread.sleep(100000L);
      }
    
    }

     

    doubleInput : main data is 2023-09-27T00:00:00Z
    squareInput : main data is 2023-09-27T00:00:01Z
    consumer : main data is 2023-09-27T00:01:01Z
    
    doubleInput : main data is 2023-09-28T00:00:00Z
    squareInput : main data is 2023-09-28T00:00:01Z
    consumer : main data is 2023-09-28T00:01:01Z
    
    doubleInput : main data is 2023-09-29T00:00:00Z
    squareInput : main data is 2023-09-29T00:00:01Z
    consumer : main data is 2023-09-29T00:01:01Z
    
    original data : 2023-09-27T00:00:00Z
    original data : 2023-09-28T00:00:00Z
    original data : 2023-09-29T00:00:00Z

     

     

    Cold vs Hot.

    https://westlife0615.tistory.com/324

     

    RxJava Hot vs Cold Observable 알아보기

    - 목차 소개. Observable 의 Data Emission 방식에는 두가지가 존재합니다. 하나는 Hot Observable 그리고 다른 하나는 Cold Observable 방식입니다. Cold Observable 은 우리가 알고 있는 기존의 방식입니다. Observable

    westlife0615.tistory.com

     

     

    <추후에 알아볼 내용>

    
    Imperative vs Declarative

     

    반응형
Designed by Tistory.