Observable Chain 을 구성하는 Observable 들은 Main Thread 에서 수행될 수도 있고, 새로운 쓰레드에서 실행될 수도 있습니다.
이를 Observable 의 Task Scheduling 이라고 부르는데요.
Observable 이 수행해야하는 Task 를 상황에 맞게 적절한 쓰레드에 배치할 수 있습니다.
이에 관한 상세한 내용을 알아보도록 하겠습니다.
Built-In Thread Pool.
RxJava 에서 Observable Chain 의 개별 Observable 을 특정 Thread Pool 의 Thread 자원을 사용하도록 지정할 수 있습니다.
예를 들어,
network IO 처리가 대부분인 Observable A 는 Thread Pool A 에 할당하고
복잡한 계산이 많은 Observable B 은 Thread Pool B 에 할당하는 식으로
Observable 과 Thread Pool 을 연결지을 수 있습니다.
특히 RxJava 에서 제공하는 built-in Thread Pool 들이 있는데요.
그 방식들에 대해서 알아보도록 하겠습니다.
Schedulers.io() 는 IO 에 특화된 작업을 수행하기 위한 Thread Pool 을 제공합니다.
RxJava 가 제공하는 built-in Scheduler 이며,
Network IO, Disk IO, Database request/response 등 같은 IO 관련 Observable 이 할당되도록 설계되었습니다.
- Observable A 는 다른 서비스를 크롤링하는 Http Request 를 많이 수행하는 Task 이다
- Observable B 는 Log 를 수집하는 File IO 를 많이 수행하는 Task 이다
- Observable A 는 MySQL 에 Read/Write 요청이 많은 Task 이다
같은 경우에는 Schedulers.io() 를 활용한 Observable 스케줄링이 필요할 수 있습니다.
IO 처리에 특화된 Thread 의 특성상 keep alive time 의 값을 충분히 길게 주어 Idle Thread 가 쓰레드 풀에서 제거되지 않게 하며,
CPU Core 갯수에 맞추어 쓰레드풀을 구성하기 보단
쓰레드풀의 사이즈를 넉넉히 주어 IO 에 특화된 Thread Pool 을 제공합니다.
- 사전에 생성된 쓰레드가 많이 준비된 쓰레드 풀이라고 생각하시면 됩니다.
- keep alive time 이 길어서 오랜된 Idle Thread 라고 쉽게 제거되지 않습니다.
기본적인 설정값은 아래와 같습니다.
keepAliveTime : 60000000000
그리고 CoreThreadSize 와 MaximumThreadSize 는 정확한 값을 알 수가 없었는데요.
실험적으로 확인해보았을 때, 특정한 limit 없이 가능한 많이 추가되는 것 같았습니다.
아래 코드는 1만개의 데이터를 비동기로 처리하는 코드인데요.
IO Thread Pool 의 사이즈가 RxCachedThreadScheduler-4860 까지 늘어남을 확인할 수 있었습니다.
그리고 사이즈를 터부니없이 많은 Thread 를 생성하는 경우에
unable to create native thread: possibly out of memory or process/resource limits reached
과 같은 에러가 발생합니다.
아마도 IO Thread Pool 은 limit 없는 Thread Pool 인 것으로 예측됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { var io = Schedulers.io(); Observable<Integer> observable = Observable.range(1, 100000); observable .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer) .subscribeOn(io) .map(value -> { System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer); return value; })) .subscribe((value) -> {}); while (true) {} } }
<실행 결과>
// 생략 ... thread is RxCachedThreadScheduler-358, value is 83716 thread is RxCachedThreadScheduler-6252, value is 83702 thread is RxCachedThreadScheduler-6251, value is 83700 thread is RxCachedThreadScheduler-2476, value is 83714 thread is RxCachedThreadScheduler-4858, value is 83713 thread is RxCachedThreadScheduler-2478, value is 83706 thread is RxCachedThreadScheduler-3520, value is 83698 thread is RxCachedThreadScheduler-6249, value is 83699 thread is RxCachedThreadScheduler-3523, value is 83696 thread is RxCachedThreadScheduler-5, value is 83697 thread is RxCachedThreadScheduler-4860, value is 83695 // 생략 ...
computation Thread Pool 은 CPU Core 연산에 특화된 Task 를 위한 Thread Pool 입니다.
Thread Pool 의 size 는 CPU Core 의 갯수로 맞춰집니다.
IO Waiting 없는 계산에 특화된 Task 를 수행하는 Observable 은 computation Thread Pool 로 스케쥴링되는 것이 좋을 것 같습니다.
아래는 관련된 코드 예시입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { var computation = Schedulers.computation(); Observable<Integer> observable = Observable.range(1, 20); observable .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer) .subscribeOn(computation) .map(value -> { System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer); return value; })) .subscribe((value) -> {}); while (true) {} } }
<실행 결과>
thread is RxComputationThreadPool-1, value is 1 thread is RxComputationThreadPool-4, value is 4 thread is RxComputationThreadPool-5, value is 5 thread is RxComputationThreadPool-10, value is 10 thread is RxComputationThreadPool-3, value is 3 thread is RxComputationThreadPool-6, value is 6 thread is RxComputationThreadPool-2, value is 2 thread is RxComputationThreadPool-9, value is 9 thread is RxComputationThreadPool-8, value is 8 thread is RxComputationThreadPool-7, value is 7 thread is RxComputationThreadPool-8, value is 18 thread is RxComputationThreadPool-9, value is 19 thread is RxComputationThreadPool-2, value is 12 thread is RxComputationThreadPool-6, value is 16 thread is RxComputationThreadPool-3, value is 13 thread is RxComputationThreadPool-10, value is 20 thread is RxComputationThreadPool-5, value is 15 thread is RxComputationThreadPool-4, value is 14 thread is RxComputationThreadPool-1, value is 11 thread is RxComputationThreadPool-7, value is 17
subscribeOn 은 Observable 의 스케줄링을 위한 연산자입니다.
Observable 과 Thread Pool 사이의 관계를 맺는 연산자라고 생각하시면 됩니다.
만약 A Observable 이 B Thread Pool 와 연결되어 있다면,
A Observable 의 실행 로직은 모두 B Thread Pool 의 Thread 에서 실행됩니다.
subscribeOn 은 Observable Chain 에서 Upstream Observable 에 적용되는 연산자입니다.
즉, Producer 와 Consumer 중에서 Producer 에게 적용되는 연산자입니다.
예시와 함께 알아보도록 하겠습니다.
source Observable 에 subscribeOn 적용.
아래 예시는 dataSource 라는 이름의 Observable 에 subscribeOn(Schedulers.io()) 를 적용시킨 예시입니다.
dataSource 에 해당하는 Observable 은 IO Scheduler 의 Thread Pool 에서 실행됨을 확인할 수 있습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .subscribeOn(Schedulers.io()) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
observable's thread is RxCachedThreadScheduler-1 observer's thread is RxCachedThreadScheduler-1, value is 1 observer's thread is RxCachedThreadScheduler-1, value is 2 observer's thread is RxCachedThreadScheduler-1, value is 3
subscribeOn 을 여러번 적용하는 경우엔 어떻게 될까 ?
아래 예시는 subscribeOn 으로 다른 Scheduler 를 설정하는 경우입니다.
subscribeOn 이 중복 적용되는 경우에는 가장 먼저 설정된 Scheduler 가 우선 적용됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is RxCachedThreadScheduler-1 observer's thread is RxCachedThreadScheduler-1, value is 1 observer's thread is RxCachedThreadScheduler-1, value is 2 observer's thread is RxCachedThreadScheduler-1, value is 3
observeOn 은 subscribeOn 과 다르게 downstream Observable 들의 스케줄링을 위한 연산자입니다.
즉, 상위 Producer 의 Consumer 들의 스케줄링을 위한 연산자입니다.
바로 예시를 보여드리도록 하겠습니다.
저는 다양한 케이스를 경험하는 것이 중요하다고 생각합니다.
subscriber 를 observeOn 로 스케줄링하기.
테스트를 위해서 ThreadPool 은 4개 생성하였습니다.
각각의 Pool 이름은 아래와 같습니다.
- pool-1-thread
- pool-2-thread
- pool-3-thread
- pool-4-thread
가장 기본적으로 subscriber 를 pool-1-thread 쓰레드 풀에 할당해보았습니다.
실행결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main observer's thread is pool-1-thread-1, value is 1 observer's thread is pool-1-thread-1, value is 2 observer's thread is pool-1-thread-1, value is 3
consumer 가 여러개 존재할 경우.
아래의 경우는 map 과 subscribe 연산자가 존재합니다.
map 연산은 pool-1-thread 쓰레드풀에 할당하였고,
subscribe 연산은 pool-2-thread 쓰레드풀에 할당하였습니다.
실행 결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .map((value) -> { System.out.printf("map consumer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); return value; }) .observeOn(Schedulers.from(executorService2)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main map consumer's thread is pool-1-thread-1, value is 1 map consumer's thread is pool-1-thread-1, value is 2 observer's thread is pool-2-thread-1, value is 1 map consumer's thread is pool-1-thread-1, value is 3 observer's thread is pool-2-thread-1, value is 2 observer's thread is pool-2-thread-1, value is 3
Subscriber 가 2개 이상인 경우.
Subscriber 는 3개 생성해보았습니다.
그리고 Subscriber 모두 thread-1-pool 쓰레드풀에 할당되도록 설정하였습니다.
3개의 Subscriber 은 thread-1-pool 쓰레드풀과 아래와 같이 매칭됩니다.
Subscriber1 -> thread-1-pool-1
Subscriber2 -> thread-1-pool-2
Subscriber3 -> thread-1-pool-3
thread-1-pool 쓰레드풀에서 3개의 쓰레드가 사용되었고,
Subcriber 는 1개의 쓰레드를 사용하고 있습니다.
자세한 결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과 >
observable's thread is main observer's thread is pool-1-thread-1, value is 1 observer's thread is pool-1-thread-1, value is 2 observer's thread is pool-1-thread-1, value is 3 observable's thread is main observer's thread is pool-1-thread-2, value is 1 observer's thread is pool-1-thread-2, value is 2 observer's thread is pool-1-thread-2, value is 3 observable's thread is main observer's thread is pool-1-thread-3, value is 1 observer's thread is pool-1-thread-3, value is 2 observer's thread is pool-1-thread-3, value is 3
다양한 케이스.
Observable 은 병렬처리하는 팁.
아래 예시는 데이터 스트림을 병렬로 동시처리하기 위한 방식입니다.
flatMap 과 subscribeOn 를 활용하여 각각의 연산을 새로운 쓰레드에 할당할 수 있습니다.
쓰레드풀의 사이즈에 맞게 최대한 쓰레드풀의 리소스를 활용할 수 있는 방식입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); for (int i = 0; i < 20; i++) emitter.onNext(i); emitter.onComplete(); }); dataSource .flatMap((Function<Integer, ObservableSource<Integer>>) value -> Observable.just(value) .subscribeOn(Schedulers.from(executorService4)) .map((val) -> { System.out.printf("mapper's thread is %s, value is %s \n", Thread.currentThread().getName(), val); return val; }) ) .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main mapper's thread is pool-4-thread-1, value is 0 mapper's thread is pool-4-thread-2, value is 1 mapper's thread is pool-4-thread-9, value is 8 mapper's thread is pool-4-thread-10, value is 9 mapper's thread is pool-4-thread-2, value is 11 mapper's thread is pool-4-thread-5, value is 4 mapper's thread is pool-4-thread-3, value is 2 mapper's thread is pool-4-thread-6, value is 5 mapper's thread is pool-4-thread-4, value is 3 mapper's thread is pool-4-thread-7, value is 6 mapper's thread is pool-4-thread-4, value is 18 mapper's thread is pool-4-thread-6, value is 17 mapper's thread is pool-4-thread-1, value is 16 mapper's thread is pool-4-thread-3, value is 15 observer's thread is pool-1-thread-1, value is 0 mapper's thread is pool-4-thread-5, value is 14 mapper's thread is pool-4-thread-2, value is 13 mapper's thread is pool-4-thread-10, value is 12
