ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Observable 알아보기
    Language/ReactiveX 2023. 9. 29. 10:10
    728x90
    반응형

    - 목차

     

    관련된 글

    https://westlife0615.tistory.com/2

     

    ReactiveX 알아보기

    - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니다. Pull. Pull 모델에 대해서 이야기하기 이전에 데이터의 생성

    westlife0615.tistory.com

     

    소개.

    Reactive Programming 에서 다루는 대상은 데이터 또는 이벤트의 흐름입니다.

    데이터 또는 이벤트의 흐름은 다른 표현으로는 Observable Pipeline 라고 부르는데요.

    ObservableReactive Programming 에서 다루는 중요한 대상인 만큼 이번 글에서 다뤄보려고 합니다.

     

    Observable 이란.

    Observable데이터 스트림이자 Reactive Programming 에서 Data Producer 에 해당합니다.

    즉, Observable 는 데이터를 생성하는 영역입니다.

    Observable 에 의해서 생성된 데이터는 데이터를 처리하는 Operator 들에 의해서 처리되는데요.

    Source 에 해당하는 Observable 과 수많은 Operator 들이 합쳐진 모양을 Observable Pipeline 이라고 합니다.

     

     

    Observable 생성하는 법

     

    from iterable to Observable.

    list 자료구조로부터 손쉽게 Observable 을 생성할 수 있습니다.

    아래가 그 예시입니다.

     

    - Observable.fromArray

    - Observable.fromIterable

    - Observable.fromStream

    - Observable.range

     

    Observable 의 Builder 함수들을 통해서 손쉽게 Observable 을 생성할 수 있습니다.

    위 함수들은 직관적으로 Observable 을 생성할 수 있습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.Consumer;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class Main {
      public static void main(String[] args) {
    
        Consumer<Integer> consumer = System.out::print;
    
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
        Observable observable1 = Observable.fromIterable(list);
        observable1.subscribe(consumer);
        System.out.println();
    
        Observable observable2 = Observable.fromArray(1, 2, 3, 4, 5, 6, 7);
        observable2.subscribe(consumer);
        System.out.println();
    
        Observable observable3 = Observable.fromStream(list.stream());
        observable3.subscribe(consumer);
        System.out.println();
    
        Observable observable4 = Observable.range(1, 7);
        observable4.subscribe(consumer);
        System.out.println();
    
      }
    
    }

     

    <실행 결과>

    1234567
    1234567
    1234567
    1234567

     

    from Callable to Observable.

    Callable Interface 구현체로부터 Observable 을 생성할 수도 있습니다.

    아래 예시는 1 ~ 7 의 데이터 스트림을 처리하는 Observable Pipeline 을 fromCallable 로 구현한 예시인데요.

    이러한 상황에서는 fromCallable 이 크게 유용하진 않습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Function;
    
    import java.util.List;
    import java.util.stream.IntStream;
    
    public class Main {
      public static void main(String[] args) {
    
        Consumer<Integer> consumer = System.out::print;
    
        Observable<Integer> observable1 = Observable.fromCallable(() -> 1);
        Observable<Integer> observable2 = Observable.fromCallable(() -> 2);
        Observable<Integer> observable3 = Observable.fromCallable(() -> 3);
        Observable<Integer> observable4 = Observable.fromCallable(() -> 4);
        Observable<Integer> observable5 = Observable.fromCallable(() -> 5);
        Observable<Integer> observable6 = Observable.fromCallable(() -> 6);
        Observable<Integer> observable7 = Observable.fromCallable(() -> 7);
        Observable<Integer> mergedObservable = Observable.mergeArray(observable1,observable2,observable3,observable4,observable5,observable6,observable7);
        mergedObservable.subscribe(consumer);
        System.out.println();
    
        Observable<List<Integer>> observableList = Observable.fromCallable(() -> IntStream.range(1, 8).boxed().toList());
        observableList
                .flatMap((Function<List<Integer>, ObservableSource<Integer>>) integers -> Observable.fromStream(integers.stream()))
                .subscribe(consumer);
        System.out.println();
    
      }
    
    }

     

    <실행 결과>

    1234567
    1234567

     

     

    fromCallable 빌더 함수의 적절한 활용법은 IO 와 Computation 로직에 적용하는 것입니다.

    Network IO, Disk IO, 복잡한 Computation 로직이 그 예시입니다.

    예를 들어보겠습니다.

     

     

    아래 예시는 naver, kakao, google, daum 의 웹사이트를 http get 요청으로 조회하는 Observable Pipeline 입니다.

    아래 예시는 여러가지 Database CRUD 요청이나 외부 서버와의 통신으로 확장할 수 있습니다.

    주목할 점은 observeOn(Schedulers.io()) 를 사용하는 점입니다.

    observeOn(Schedulers.io()) 를 통해서 http get 요청을 수행하는 로직을 비동기적으로 처리할 수 있습니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Function;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URL;
    import java.net.URLConnection;
    
    public class Main {
    
      public static void main(String[] args) throws InterruptedException {
    
        Consumer<Object> consumer = System.out::print;
        Function<String, String> httpGetRequest = (url) -> {
          System.out.println(Thread.currentThread().getName());
          URLConnection connection = (new URL(url)).openConnection();
          StringBuilder result = new StringBuilder();
          try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
            result.append(in.readLine());
          }
          return result.toString();
        };
    
        Observable<String> urlObservable = Observable.fromArray(
                "https://naver.com",
                "https://google.com",
                "https://daum.net",
                "https://kakao.com"
                );
    
        urlObservable
                .flatMap((Function<String, ObservableSource<?>>) s -> Observable.just(s).observeOn(Schedulers.io()).map(httpGetRequest))
                .subscribe(consumer);
        System.out.println();
    
        Thread.sleep(100000L);
    
      }
    
    }

     

     

    kakao -> daum -> google -> naver 순서대로 데이터가 흘러가지만,

    Http Get 요청을 수행하는 단계에선 병렬적으로 처리됩니다.

     

     

     

    from Future to Observable.

     

    Java Future 또는 CompletableFuture 으로 Observable 을 생성할 수 있습니다.

    Future 는 미래에 완료될 예정인 비동기 작업을 의미하는데요.

    그래서 Future 를 활용하여 Observable 을 생성하는 방법인 fromFuture 은

    위에서 설명한 fromCallable 과 사용 방식이 유사합니다.

     

     

    < 아래 링크는 Java Future 에 대한 설명글입니다. >

    https://westlife0615.tistory.com/318

     

    Java Future 알아보기

    - 목차 소개. java 의 Future 는 비동기 처리를 수행하도록 돕는 대상이자 java class 입니다. Future 는 javascript 의 Promise 와 유사한 행동양식을 보이며, 미래에 완료될 Task 를 의미합니다. Main Thread 에서

    westlife0615.tistory.com

     

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.functions.Consumer;
    
    import java.util.concurrent.CompletableFuture;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Consumer<Integer> consumer = System.out::print;
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 3);
        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> 4);
        CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> 6);
        CompletableFuture<Integer> future7 = CompletableFuture.supplyAsync(() -> 7);
    
        Observable.mergeArray(
                        Observable.fromFuture(future1),
                        Observable.fromFuture(future2),
                        Observable.fromFuture(future3),
                        Observable.fromFuture(future4),
                        Observable.fromFuture(future5),
                        Observable.fromFuture(future6),
                        Observable.fromFuture(future7)
        ).subscribe(consumer);
    
      }
    
    }

     

    <실행 결과>

    1234567

     

     

    반응형
Designed by Tistory.