ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Thread Scheduling 알아보기
    Language/ReactiveX 2023. 9. 28. 10:55
    728x90
    반응형

     

    - 목차

     

     

    관련된 글들.

    https://westlife0615.tistory.com/320

     

    RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable)

    - 목차 관련된 글 https://westlife0615.tistory.com/317 RxJava Observable 알아보기 - 목차 관련된 글 https://westlife0615.tistory.com/2 ReactiveX 알아보기 - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Pus

    westlife0615.tistory.com

    https://westlife0615.tistory.com/2

     

    ReactiveX 알아보기

    - 목차 소개. Reactive X 패러다임에 대해서 알아보려고 합니다. Push and Pull. 데이터 커뮤니케이션에는 Push 와 Pull 두가지 방식이 있습니다. Pull. Pull 모델에 대해서 이야기하기 이전에 데이터의 생성

    westlife0615.tistory.com

     

    소개.

    Observable Chain 을 구성하는 Observable 들은 Main Thread 에서 수행될 수도 있고, 새로운 쓰레드에서 실행될 수도 있습니다.

    이를 Observable 의 Task Scheduling 이라고 부르는데요.

    Observable 이 수행해야하는 Task 를 상황에 맞게 적절한 쓰레드에 배치할 수 있습니다.

    이에 관한 상세한 내용을 알아보도록 하겠습니다.

     

    Built-In Thread Pool.

    RxJava 에서 Observable Chain 의 개별 Observable 을 특정 Thread Pool 의 Thread 자원을 사용하도록 지정할 수 있습니다.

    예를 들어,

    network IO 처리가 대부분인 Observable AThread Pool A 에 할당하고

    복잡한 계산이 많은 Observable BThread Pool B 에 할당하는 식으로

    ObservableThread Pool 을 연결지을 수 있습니다.

     

    특히 RxJava 에서 제공하는 built-in Thread Pool 들이 있는데요.

    그 방식들에 대해서 알아보도록 하겠습니다.

    Schedulers.io().

    Schedulers.io()IO 에 특화된 작업을 수행하기 위한 Thread Pool 을 제공합니다.

    RxJava 가 제공하는 built-in Scheduler 이며,

    Network IO, Disk IO, Database request/response 등 같은 IO 관련 Observable 이 할당되도록 설계되었습니다.

    만약

    - Observable A다른 서비스를 크롤링하는 Http Request 를 많이 수행하는 Task 이다

    - Observable BLog 를 수집하는 File IO 를 많이 수행하는 Task 이다

    - Observable AMySQL 에 Read/Write 요청이 많은 Task 이다

    같은 경우에는 Schedulers.io() 를 활용한 Observable 스케줄링이 필요할 수 있습니다.

     

    IO 처리에 특화된 Thread 의 특성상 keep alive time 의 값을 충분히 길게 주어 Idle Thread 가 쓰레드 풀에서 제거되지 않게 하며,

    CPU Core 갯수에 맞추어 쓰레드풀을 구성하기 보단

    쓰레드풀의 사이즈를 넉넉히 주어 IO 에 특화된 Thread Pool 을 제공합니다.

    즉,

    - 사전에 생성된 쓰레드가 많이 준비된 쓰레드 풀이라고 생각하시면 됩니다.

    - keep alive time 이 길어서 오랜된 Idle Thread 라고 쉽게 제거되지 않습니다.

     

    기본적인 설정값은 아래와 같습니다.

    keepAliveTime : 60000000000

     

    그리고 CoreThreadSize 와 MaximumThreadSize 는 정확한 값을 알 수가 없었는데요.

    실험적으로 확인해보았을 때, 특정한 limit 없이 가능한 많이 추가되는 것 같았습니다.

     

    아래 코드는 1만개의 데이터를 비동기로 처리하는 코드인데요.

    IO Thread Pool 의 사이즈가 RxCachedThreadScheduler-4860 까지 늘어남을 확인할 수 있었습니다.

    그리고 사이즈를 터부니없이 많은 Thread 를 생성하는 경우에

    unable to create native thread: possibly out of memory or process/resource limits reached

    과 같은 에러가 발생합니다.

    아마도 IO Thread Pool 은 limit 없는 Thread Pool 인 것으로 예측됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Function;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        var io = Schedulers.io();
    
        Observable<Integer> observable = Observable.range(1, 100000);
        observable
                .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer)
                        .subscribeOn(io)
                        .map(value -> {
                          System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer);
                          return value;
                        }))
                .subscribe((value) -> {});
    
        while (true) {}
    
      }
    
    
    }

     

    <실행 결과>

    // 생략 ...
    thread is RxCachedThreadScheduler-358, value is 83716 
    thread is RxCachedThreadScheduler-6252, value is 83702 
    thread is RxCachedThreadScheduler-6251, value is 83700 
    thread is RxCachedThreadScheduler-2476, value is 83714 
    thread is RxCachedThreadScheduler-4858, value is 83713 
    thread is RxCachedThreadScheduler-2478, value is 83706 
    thread is RxCachedThreadScheduler-3520, value is 83698 
    thread is RxCachedThreadScheduler-6249, value is 83699 
    thread is RxCachedThreadScheduler-3523, value is 83696 
    thread is RxCachedThreadScheduler-5, value is 83697 
    thread is RxCachedThreadScheduler-4860, value is 83695 
    // 생략 ...

     

     

    Schedulers.computation().

    computation Thread Pool 은 CPU Core 연산에 특화된 Task 를 위한 Thread Pool 입니다.

    Thread Pool 의 size 는 CPU Core 의 갯수로 맞춰집니다.

    IO Waiting 없는 계산에 특화된 Task 를 수행하는 Observable 은 computation Thread Pool 로 스케쥴링되는 것이 좋을 것 같습니다.

    아래는 관련된 코드 예시입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Function;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        var computation = Schedulers.computation();
    
        Observable<Integer> observable = Observable.range(1, 20);
        observable
                .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer)
                        .subscribeOn(computation)
                        .map(value -> {
                          System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer);
                          return value;
                        }))
                .subscribe((value) -> {});
    
        while (true) {}
    
      }
    
    
    }

    <실행 결과>

    thread is RxComputationThreadPool-1, value is 1 
    thread is RxComputationThreadPool-4, value is 4 
    thread is RxComputationThreadPool-5, value is 5 
    thread is RxComputationThreadPool-10, value is 10 
    thread is RxComputationThreadPool-3, value is 3 
    thread is RxComputationThreadPool-6, value is 6 
    thread is RxComputationThreadPool-2, value is 2 
    thread is RxComputationThreadPool-9, value is 9 
    thread is RxComputationThreadPool-8, value is 8 
    thread is RxComputationThreadPool-7, value is 7 
    thread is RxComputationThreadPool-8, value is 18 
    thread is RxComputationThreadPool-9, value is 19 
    thread is RxComputationThreadPool-2, value is 12 
    thread is RxComputationThreadPool-6, value is 16 
    thread is RxComputationThreadPool-3, value is 13 
    thread is RxComputationThreadPool-10, value is 20 
    thread is RxComputationThreadPool-5, value is 15 
    thread is RxComputationThreadPool-4, value is 14 
    thread is RxComputationThreadPool-1, value is 11 
    thread is RxComputationThreadPool-7, value is 17

     

    subscribeOn.

    subscribeOn 은 Observable 의 스케줄링을 위한 연산자입니다.

    Observable 과 Thread Pool 사이의 관계를 맺는 연산자라고 생각하시면 됩니다.

    만약 A Observable 이 B Thread Pool 와 연결되어 있다면,

    A Observable 의 실행 로직은 모두 B Thread Pool 의 Thread 에서 실행됩니다.

     

    subscribeOn 은 Observable Chain 에서 Upstream Observable 에 적용되는 연산자입니다.

    즉, Producer 와 Consumer 중에서 Producer 에게 적용되는 연산자입니다.

    예시와 함께 알아보도록 하겠습니다.

     

     

    source Observable 에 subscribeOn 적용.

     

    아래 예시는 dataSource 라는 이름의 Observable 에 subscribeOn(Schedulers.io()) 를 적용시킨 예시입니다.

    dataSource 에 해당하는 Observable 은 IO Scheduler 의 Thread Pool 에서 실행됨을 확인할 수 있습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
        });
    
        dataSource
                .subscribeOn(Schedulers.io())
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    observable's thread is RxCachedThreadScheduler-1 
    observer's thread is RxCachedThreadScheduler-1, value is 1 
    observer's thread is RxCachedThreadScheduler-1, value is 2 
    observer's thread is RxCachedThreadScheduler-1, value is 3

     

    subscribeOn 을 여러번 적용하는 경우엔 어떻게 될까 ?

    아래 예시는 subscribeOn 으로 다른 Scheduler 를 설정하는 경우입니다.

    subscribeOn 이 중복 적용되는 경우에는 가장 먼저 설정된 Scheduler 가 우선 적용됩니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    public class Main {
    
      public static void main(String[] args) {
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
        });
    
        dataSource
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    <실행 결과>

    observable's thread is RxCachedThreadScheduler-1 
    observer's thread is RxCachedThreadScheduler-1, value is 1 
    observer's thread is RxCachedThreadScheduler-1, value is 2 
    observer's thread is RxCachedThreadScheduler-1, value is 3

     

     

    observeOn.

     

    observeOn 은 subscribeOn 과 다르게 downstream Observable 들의 스케줄링을 위한 연산자입니다.

    즉, 상위 Producer 의 Consumer 들의 스케줄링을 위한 연산자입니다.

     

    바로 예시를 보여드리도록 하겠습니다.

    저는 다양한 케이스를 경험하는 것이 중요하다고 생각합니다.

     

    subscriber 를 observeOn 로 스케줄링하기.

    테스트를 위해서 ThreadPool 은 4개 생성하였습니다.

    각각의 Pool 이름은 아래와 같습니다.

    - pool-1-thread

    - pool-2-thread

    - pool-3-thread

    - pool-4-thread

     

    가장 기본적으로 subscriber 를 pool-1-thread 쓰레드 풀에 할당해보았습니다.

    실행결과는 아래와 같습니다.

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
      public static void main(String[] args) {
    
        ExecutorService executorService1 = Executors.newFixedThreadPool(10);
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newFixedThreadPool(10);
        ExecutorService executorService4 = Executors.newFixedThreadPool(10);
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
        });
    
        dataSource
                .observeOn(Schedulers.from(executorService1))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    <실행 결과>

    observable's thread is main 
    observer's thread is pool-1-thread-1, value is 1 
    observer's thread is pool-1-thread-1, value is 2 
    observer's thread is pool-1-thread-1, value is 3

     

     

    consumer 가 여러개 존재할 경우.

    아래의 경우는 map 과 subscribe 연산자가 존재합니다.

    map 연산은 pool-1-thread 쓰레드풀에 할당하였고,

    subscribe 연산은 pool-2-thread 쓰레드풀에 할당하였습니다.

    실행 결과는 아래와 같습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
      public static void main(String[] args) {
    
        ExecutorService executorService1 = Executors.newFixedThreadPool(10);
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newFixedThreadPool(10);
        ExecutorService executorService4 = Executors.newFixedThreadPool(10);
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
        });
    
        dataSource
                .observeOn(Schedulers.from(executorService1))
                .map((value) -> {
                  System.out.printf("map consumer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                  return value;
                })
                .observeOn(Schedulers.from(executorService2))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    <실행 결과>

    observable's thread is main 
    map consumer's thread is pool-1-thread-1, value is 1 
    map consumer's thread is pool-1-thread-1, value is 2 
    observer's thread is pool-2-thread-1, value is 1 
    map consumer's thread is pool-1-thread-1, value is 3 
    observer's thread is pool-2-thread-1, value is 2 
    observer's thread is pool-2-thread-1, value is 3

     

     

    Subscriber 가 2개 이상인 경우.

    Subscriber 는 3개 생성해보았습니다.

    그리고 Subscriber 모두 thread-1-pool 쓰레드풀에 할당되도록 설정하였습니다.

     

    3개의 Subscriber 은 thread-1-pool 쓰레드풀과 아래와 같이 매칭됩니다.

     

    Subscriber1 -> thread-1-pool-1

    Subscriber2 -> thread-1-pool-2

    Subscriber3 -> thread-1-pool-3

     

    thread-1-pool 쓰레드풀에서 3개의 쓰레드가 사용되었고,

    Subcriber 는 1개의 쓰레드를 사용하고 있습니다.

    자세한 결과는 아래와 같습니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
      public static void main(String[] args) {
    
        ExecutorService executorService1 = Executors.newFixedThreadPool(10);
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newFixedThreadPool(10);
        ExecutorService executorService4 = Executors.newFixedThreadPool(10);
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
        });
    
        dataSource
                .observeOn(Schedulers.from(executorService1))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
        dataSource
                .observeOn(Schedulers.from(executorService1))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
        dataSource
                .observeOn(Schedulers.from(executorService1))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    <실행 결과 >

    observable's thread is main 
    observer's thread is pool-1-thread-1, value is 1 
    observer's thread is pool-1-thread-1, value is 2 
    observer's thread is pool-1-thread-1, value is 3 
    observable's thread is main 
    observer's thread is pool-1-thread-2, value is 1 
    observer's thread is pool-1-thread-2, value is 2 
    observer's thread is pool-1-thread-2, value is 3 
    observable's thread is main 
    observer's thread is pool-1-thread-3, value is 1 
    observer's thread is pool-1-thread-3, value is 2 
    observer's thread is pool-1-thread-3, value is 3

     

     

    다양한 케이스.

     

    Observable 은 병렬처리하는 팁.

    아래 예시는 데이터 스트림을 병렬로 동시처리하기 위한 방식입니다.

     

    flatMap 과 subscribeOn 를 활용하여 각각의 연산을 새로운 쓰레드에 할당할 수 있습니다.

    쓰레드풀의 사이즈에 맞게 최대한 쓰레드풀의 리소스를 활용할 수 있는 방식입니다.

     

    package org.example;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.ObservableSource;
    import io.reactivex.rxjava3.functions.Function;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
      public static void main(String[] args) {
    
        ExecutorService executorService1 = Executors.newFixedThreadPool(10);
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newFixedThreadPool(10);
        ExecutorService executorService4 = Executors.newFixedThreadPool(10);
    
        Observable<Integer> dataSource = Observable.create(emitter -> {
          System.out.printf("observable's thread is %s \n", Thread.currentThread().getName());
          for (int i = 0; i < 20; i++) emitter.onNext(i);
          emitter.onComplete();
        });
    
        dataSource
                .flatMap((Function<Integer, ObservableSource<Integer>>) value -> Observable.just(value)
                        .subscribeOn(Schedulers.from(executorService4))
                        .map((val) -> {
                          System.out.printf("mapper's thread is %s, value is %s \n", Thread.currentThread().getName(), val);
                          return val;
                        })
    
                )
                .observeOn(Schedulers.from(executorService1))
                .subscribe(value -> {
                  System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value);
                });
    
        while (true) {}
      }
    
    
    }

     

    <실행 결과>

    observable's thread is main 
    mapper's thread is pool-4-thread-1, value is 0 
    mapper's thread is pool-4-thread-2, value is 1 
    mapper's thread is pool-4-thread-9, value is 8 
    mapper's thread is pool-4-thread-10, value is 9 
    mapper's thread is pool-4-thread-2, value is 11 
    mapper's thread is pool-4-thread-5, value is 4 
    mapper's thread is pool-4-thread-3, value is 2 
    mapper's thread is pool-4-thread-6, value is 5 
    mapper's thread is pool-4-thread-4, value is 3 
    mapper's thread is pool-4-thread-7, value is 6 
    mapper's thread is pool-4-thread-4, value is 18 
    mapper's thread is pool-4-thread-6, value is 17 
    mapper's thread is pool-4-thread-1, value is 16 
    mapper's thread is pool-4-thread-3, value is 15 
    observer's thread is pool-1-thread-1, value is 0 
    mapper's thread is pool-4-thread-5, value is 14 
    mapper's thread is pool-4-thread-2, value is 13 
    mapper's thread is pool-4-thread-10, value is 12
    반응형
Designed by Tistory.