-
RxJava Thread Scheduling 알아보기Language/ReactiveX 2023. 9. 28. 10:55728x90반응형
- 목차
관련된 글들.
https://westlife0615.tistory.com/320
https://westlife0615.tistory.com/2
소개.
Observable Chain 을 구성하는 Observable 들은 Main Thread 에서 수행될 수도 있고, 새로운 쓰레드에서 실행될 수도 있습니다.
이를 Observable 의 Task Scheduling 이라고 부르는데요.
Observable 이 수행해야하는 Task 를 상황에 맞게 적절한 쓰레드에 배치할 수 있습니다.
이에 관한 상세한 내용을 알아보도록 하겠습니다.
Built-In Thread Pool.
RxJava 에서 Observable Chain 의 개별 Observable 을 특정 Thread Pool 의 Thread 자원을 사용하도록 지정할 수 있습니다.
예를 들어,
network IO 처리가 대부분인 Observable A 는 Thread Pool A 에 할당하고
복잡한 계산이 많은 Observable B 은 Thread Pool B 에 할당하는 식으로
Observable 과 Thread Pool 을 연결지을 수 있습니다.
특히 RxJava 에서 제공하는 built-in Thread Pool 들이 있는데요.
그 방식들에 대해서 알아보도록 하겠습니다.
Schedulers.io().
Schedulers.io() 는 IO 에 특화된 작업을 수행하기 위한 Thread Pool 을 제공합니다.
RxJava 가 제공하는 built-in Scheduler 이며,
Network IO, Disk IO, Database request/response 등 같은 IO 관련 Observable 이 할당되도록 설계되었습니다.
만약
- Observable A 는 다른 서비스를 크롤링하는 Http Request 를 많이 수행하는 Task 이다
- Observable B 는 Log 를 수집하는 File IO 를 많이 수행하는 Task 이다
- Observable A 는 MySQL 에 Read/Write 요청이 많은 Task 이다
같은 경우에는 Schedulers.io() 를 활용한 Observable 스케줄링이 필요할 수 있습니다.
IO 처리에 특화된 Thread 의 특성상 keep alive time 의 값을 충분히 길게 주어 Idle Thread 가 쓰레드 풀에서 제거되지 않게 하며,
CPU Core 갯수에 맞추어 쓰레드풀을 구성하기 보단
쓰레드풀의 사이즈를 넉넉히 주어 IO 에 특화된 Thread Pool 을 제공합니다.
즉,
- 사전에 생성된 쓰레드가 많이 준비된 쓰레드 풀이라고 생각하시면 됩니다.
- keep alive time 이 길어서 오랜된 Idle Thread 라고 쉽게 제거되지 않습니다.
기본적인 설정값은 아래와 같습니다.
keepAliveTime : 60000000000
그리고 CoreThreadSize 와 MaximumThreadSize 는 정확한 값을 알 수가 없었는데요.
실험적으로 확인해보았을 때, 특정한 limit 없이 가능한 많이 추가되는 것 같았습니다.
아래 코드는 1만개의 데이터를 비동기로 처리하는 코드인데요.
IO Thread Pool 의 사이즈가 RxCachedThreadScheduler-4860 까지 늘어남을 확인할 수 있었습니다.
그리고 사이즈를 터부니없이 많은 Thread 를 생성하는 경우에
unable to create native thread: possibly out of memory or process/resource limits reached
과 같은 에러가 발생합니다.
아마도 IO Thread Pool 은 limit 없는 Thread Pool 인 것으로 예측됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { var io = Schedulers.io(); Observable<Integer> observable = Observable.range(1, 100000); observable .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer) .subscribeOn(io) .map(value -> { System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer); return value; })) .subscribe((value) -> {}); while (true) {} } }
<실행 결과>
// 생략 ... thread is RxCachedThreadScheduler-358, value is 83716 thread is RxCachedThreadScheduler-6252, value is 83702 thread is RxCachedThreadScheduler-6251, value is 83700 thread is RxCachedThreadScheduler-2476, value is 83714 thread is RxCachedThreadScheduler-4858, value is 83713 thread is RxCachedThreadScheduler-2478, value is 83706 thread is RxCachedThreadScheduler-3520, value is 83698 thread is RxCachedThreadScheduler-6249, value is 83699 thread is RxCachedThreadScheduler-3523, value is 83696 thread is RxCachedThreadScheduler-5, value is 83697 thread is RxCachedThreadScheduler-4860, value is 83695 // 생략 ...
Schedulers.computation().
computation Thread Pool 은 CPU Core 연산에 특화된 Task 를 위한 Thread Pool 입니다.
Thread Pool 의 size 는 CPU Core 의 갯수로 맞춰집니다.
IO Waiting 없는 계산에 특화된 Task 를 수행하는 Observable 은 computation Thread Pool 로 스케쥴링되는 것이 좋을 것 같습니다.
아래는 관련된 코드 예시입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { var computation = Schedulers.computation(); Observable<Integer> observable = Observable.range(1, 20); observable .flatMap((Function<Integer, ObservableSource<Integer>>) integer -> Observable.just(integer) .subscribeOn(computation) .map(value -> { System.out.printf("thread is %s, value is %s \n", Thread.currentThread().getName(), integer); return value; })) .subscribe((value) -> {}); while (true) {} } }
<실행 결과>
thread is RxComputationThreadPool-1, value is 1 thread is RxComputationThreadPool-4, value is 4 thread is RxComputationThreadPool-5, value is 5 thread is RxComputationThreadPool-10, value is 10 thread is RxComputationThreadPool-3, value is 3 thread is RxComputationThreadPool-6, value is 6 thread is RxComputationThreadPool-2, value is 2 thread is RxComputationThreadPool-9, value is 9 thread is RxComputationThreadPool-8, value is 8 thread is RxComputationThreadPool-7, value is 7 thread is RxComputationThreadPool-8, value is 18 thread is RxComputationThreadPool-9, value is 19 thread is RxComputationThreadPool-2, value is 12 thread is RxComputationThreadPool-6, value is 16 thread is RxComputationThreadPool-3, value is 13 thread is RxComputationThreadPool-10, value is 20 thread is RxComputationThreadPool-5, value is 15 thread is RxComputationThreadPool-4, value is 14 thread is RxComputationThreadPool-1, value is 11 thread is RxComputationThreadPool-7, value is 17
subscribeOn.
subscribeOn 은 Observable 의 스케줄링을 위한 연산자입니다.
Observable 과 Thread Pool 사이의 관계를 맺는 연산자라고 생각하시면 됩니다.
만약 A Observable 이 B Thread Pool 와 연결되어 있다면,
A Observable 의 실행 로직은 모두 B Thread Pool 의 Thread 에서 실행됩니다.
subscribeOn 은 Observable Chain 에서 Upstream Observable 에 적용되는 연산자입니다.
즉, Producer 와 Consumer 중에서 Producer 에게 적용되는 연산자입니다.
예시와 함께 알아보도록 하겠습니다.
source Observable 에 subscribeOn 적용.
아래 예시는 dataSource 라는 이름의 Observable 에 subscribeOn(Schedulers.io()) 를 적용시킨 예시입니다.
dataSource 에 해당하는 Observable 은 IO Scheduler 의 Thread Pool 에서 실행됨을 확인할 수 있습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .subscribeOn(Schedulers.io()) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
observable's thread is RxCachedThreadScheduler-1 observer's thread is RxCachedThreadScheduler-1, value is 1 observer's thread is RxCachedThreadScheduler-1, value is 2 observer's thread is RxCachedThreadScheduler-1, value is 3
subscribeOn 을 여러번 적용하는 경우엔 어떻게 될까 ?
아래 예시는 subscribeOn 으로 다른 Scheduler 를 설정하는 경우입니다.
subscribeOn 이 중복 적용되는 경우에는 가장 먼저 설정된 Scheduler 가 우선 적용됩니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main { public static void main(String[] args) { Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is RxCachedThreadScheduler-1 observer's thread is RxCachedThreadScheduler-1, value is 1 observer's thread is RxCachedThreadScheduler-1, value is 2 observer's thread is RxCachedThreadScheduler-1, value is 3
observeOn.
observeOn 은 subscribeOn 과 다르게 downstream Observable 들의 스케줄링을 위한 연산자입니다.
즉, 상위 Producer 의 Consumer 들의 스케줄링을 위한 연산자입니다.
바로 예시를 보여드리도록 하겠습니다.
저는 다양한 케이스를 경험하는 것이 중요하다고 생각합니다.
subscriber 를 observeOn 로 스케줄링하기.
테스트를 위해서 ThreadPool 은 4개 생성하였습니다.
각각의 Pool 이름은 아래와 같습니다.
- pool-1-thread
- pool-2-thread
- pool-3-thread
- pool-4-thread
가장 기본적으로 subscriber 를 pool-1-thread 쓰레드 풀에 할당해보았습니다.
실행결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main observer's thread is pool-1-thread-1, value is 1 observer's thread is pool-1-thread-1, value is 2 observer's thread is pool-1-thread-1, value is 3
consumer 가 여러개 존재할 경우.
아래의 경우는 map 과 subscribe 연산자가 존재합니다.
map 연산은 pool-1-thread 쓰레드풀에 할당하였고,
subscribe 연산은 pool-2-thread 쓰레드풀에 할당하였습니다.
실행 결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .map((value) -> { System.out.printf("map consumer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); return value; }) .observeOn(Schedulers.from(executorService2)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main map consumer's thread is pool-1-thread-1, value is 1 map consumer's thread is pool-1-thread-1, value is 2 observer's thread is pool-2-thread-1, value is 1 map consumer's thread is pool-1-thread-1, value is 3 observer's thread is pool-2-thread-1, value is 2 observer's thread is pool-2-thread-1, value is 3
Subscriber 가 2개 이상인 경우.
Subscriber 는 3개 생성해보았습니다.
그리고 Subscriber 모두 thread-1-pool 쓰레드풀에 할당되도록 설정하였습니다.
3개의 Subscriber 은 thread-1-pool 쓰레드풀과 아래와 같이 매칭됩니다.
Subscriber1 -> thread-1-pool-1
Subscriber2 -> thread-1-pool-2
Subscriber3 -> thread-1-pool-3
thread-1-pool 쓰레드풀에서 3개의 쓰레드가 사용되었고,
Subcriber 는 1개의 쓰레드를 사용하고 있습니다.
자세한 결과는 아래와 같습니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); dataSource .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과 >
observable's thread is main observer's thread is pool-1-thread-1, value is 1 observer's thread is pool-1-thread-1, value is 2 observer's thread is pool-1-thread-1, value is 3 observable's thread is main observer's thread is pool-1-thread-2, value is 1 observer's thread is pool-1-thread-2, value is 2 observer's thread is pool-1-thread-2, value is 3 observable's thread is main observer's thread is pool-1-thread-3, value is 1 observer's thread is pool-1-thread-3, value is 2 observer's thread is pool-1-thread-3, value is 3
다양한 케이스.
Observable 은 병렬처리하는 팁.
아래 예시는 데이터 스트림을 병렬로 동시처리하기 위한 방식입니다.
flatMap 과 subscribeOn 를 활용하여 각각의 연산을 새로운 쓰레드에 할당할 수 있습니다.
쓰레드풀의 사이즈에 맞게 최대한 쓰레드풀의 리소스를 활용할 수 있는 방식입니다.
package org.example; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService executorService1 = Executors.newFixedThreadPool(10); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newFixedThreadPool(10); ExecutorService executorService4 = Executors.newFixedThreadPool(10); Observable<Integer> dataSource = Observable.create(emitter -> { System.out.printf("observable's thread is %s \n", Thread.currentThread().getName()); for (int i = 0; i < 20; i++) emitter.onNext(i); emitter.onComplete(); }); dataSource .flatMap((Function<Integer, ObservableSource<Integer>>) value -> Observable.just(value) .subscribeOn(Schedulers.from(executorService4)) .map((val) -> { System.out.printf("mapper's thread is %s, value is %s \n", Thread.currentThread().getName(), val); return val; }) ) .observeOn(Schedulers.from(executorService1)) .subscribe(value -> { System.out.printf("observer's thread is %s, value is %s \n", Thread.currentThread().getName(), value); }); while (true) {} } }
<실행 결과>
observable's thread is main mapper's thread is pool-4-thread-1, value is 0 mapper's thread is pool-4-thread-2, value is 1 mapper's thread is pool-4-thread-9, value is 8 mapper's thread is pool-4-thread-10, value is 9 mapper's thread is pool-4-thread-2, value is 11 mapper's thread is pool-4-thread-5, value is 4 mapper's thread is pool-4-thread-3, value is 2 mapper's thread is pool-4-thread-6, value is 5 mapper's thread is pool-4-thread-4, value is 3 mapper's thread is pool-4-thread-7, value is 6 mapper's thread is pool-4-thread-4, value is 18 mapper's thread is pool-4-thread-6, value is 17 mapper's thread is pool-4-thread-1, value is 16 mapper's thread is pool-4-thread-3, value is 15 observer's thread is pool-1-thread-1, value is 0 mapper's thread is pool-4-thread-5, value is 14 mapper's thread is pool-4-thread-2, value is 13 mapper's thread is pool-4-thread-10, value is 12
반응형'Language > ReactiveX' 카테고리의 다른 글
RxJava Terminating Operator 알아보기 (0) 2023.10.02 RxJava Combining Operator 알아보기 (0) 2023.10.01 RxJava Observable 종류 알아보기 (Single, Maybe, Completable, Flowable) (0) 2023.10.01 RxJava Observable 알아보기 (0) 2023.09.29 ReactiveX 알아보기 (0) 2019.07.23