스케줄러
- 비동기로 동작할 수 있도록 이용한다.
- subscribeOn, observeOn에 스케줄러를 지정하여 데이터의 흐름이 발생하는 스레드와, 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다.
String[] data = {"1", "2", "3"};
Observable<String> source = Observable.fromArray(data)
.doOnNext(data -> Log.d("Original data = " + data))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(data -> "#" + data);
source.subscribe(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | Original data = 1
Thread-1 | Original data = 2
Thread-1 | Original data = 3
Thread-2 | value = #1
Thread-2 | value = #2
Thread-2 | value = #3
subscribeOn, observeOn 에서 각각 새로운 스케줄러를 선언하였기 때문에, 출력 결과에서 데이터 입력과 데이터 발행에 사용되는 스레드가 다른것을 볼 수 있다.
여기서, observeOn을 사용하지 않으면, subscribeOn에서 지정한 스레드로 모든 로직을 수행하게 된다.
즉, 모든 출력 값이 Thread-1 에서 처리된다는 것이다.
또한, 별도로 스케줄러를 지정하지 않으면 현재(main) 스레드에서 동작을 실행한다.
스케줄러의 종류
- 뉴 스레드 스케줄러(newThread())
- 싱글 스레드 스케줄러(single())
- 계산 스케줄러(computation())
- IO 스케줄러(io())
- 트램펄린 스케줄러(trampoline())
- 메인 스레드 스케줄러 > 지원 안함
- 테스트 스케줄러 > 지원 안함
뉴 스레드 스케줄러
이름처럼 새로운 스레드를 생성한다.
Schedulers.newThread()를 인자로 넣으면 새로운 스레드를 만든다.
subscribeOn, observeOn 함수에서 많이 사용한다.
String[] num = {"1","2"};
Observable<String> source1 = Observable.fromArray(num)
.doOnNext(data -> Log.v("Original data : " + data))
.map(data -> "#" + data)
.subscribeOn(Schedulers.newThead())
.subscribe(Log::i);
CommonUtils.sleep(1000);
Observable<String> source2 = Observable.fromArray(num)
.doOnNext(doata -> Log.v("Original data : " + data))
.map(data -> "##" + data)
.subscribeOn(Schedulers.newThread())
.subscribeOn(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | Original data = 1
Thread-1 | value = #1
Thread-1 | Original data = 2
Thread-1 | value = #2
Thread-2 | Original data = 1
Thread-2 | value = ##1
Thread-2 | Original data = 2
Thread-2 | value = ##2
뉴 스케줄러를 사용했기 때문에 첫 번째 Observable과 두 번째 Observable의 실행 스레드가 다르게 된다.
계산 스케줄러
CPU에 대응하는 계산용 스케줄러.
계산 작업을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요하다.
내부적으로 스레드 풀을 생성하며, 스레드 개수는 기본적으로 프로세서 개수와 동일하다.
Observable<String> source = ... ;
source.map(val -> "#" + val)
.subscribeOn(Schedulers.computation())
.subscribe(Log::i);
위와 같이 subscribeOn(Schedulers.computation())으로 사용한다.
여러번 구독할 때, 계산 스케줄러를 각각 선언해주면 각각 다른 스레드로 동작하는 것을 볼 수 있다.
IO 스케줄러
네트워크상의 요청을 처리하거나 각종 입/출력 작업을 실행하기 위한 스케줄러.
계산 스케줄러와 다른 점은 기본으로 생성되는 스레드 개수가 다르다는 점이다.
입/출력 작업은 비동기로 실행되지만 결과를 얻기까지 대기 시간이 긴 편이다.
계산 스케줄러는 CPU의 개수만큼 스레드를 생성
IO 스케줄러는 필요할 때마다 스레드를 계속 생성.
subscribeOn(Schedulers.io()); 로 사용한다.
트램펄린 스케줄러
새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)를 생성하는 스케줄러.
새로운 스레드를 생성하지 않는 것과, 대기 행렬을 자동으로 만들어 준다는 것이 다른 스케줄러와 다른 점이다.
subscribeOn(Schedulers.trampoline()); 으로 사용한다.
싱글 스레드 스케줄러
단일 스레드를 별도로 생성하여 구독 작업을 처리.
생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용한다는 특징.
subscribeOn(Schedulers.single()) 으로 사용한다.
Observable<Integer> number = Observable.range(10,3);
Observable<String> chars = Observable.just("a","b","c");
number.subscribeOn(Schedulers.single())
.subscribe(Log::i);
chars.subscribeOn(Schedulers.single())
.subscribe(Log::i);
// 출력 결과
Thread-1 | value = 10
Thread-1 | value = 11
Thread-1 | value = 12
Thread-1 | value = a
Thread-1 | value = b
Thread-1 | value = c
출력 결과에서 볼 수 있듯이, 하나의 스레드를 사용하여 모든 값을 처리한다.
Executor 변환 스케줄러
기존에 사용하던 Executor 클래스를 재사용할 때만 한정적으로 사용한다.
subscribeOn(Schedulers.from(executor)); 으로 사용한다.
스케줄러를 이용한 예제.
CommonUtils.exampleStart();
Observable<String> source = Observable.just(FIRST_URL)
.subscribeOn(Schedulers.io())
.map(OKHttpHelper::get)
.concatWith(Observable.just(SECOND_URL)
.map(OKHttpHelper::get));
source.subscribe(Log::it);
CommonUtils.sleep(1000);
네트워크 통신을 하는 부분에서, 콜백 지옥을 벗어날 수 있는 방법이다.
FIRST_URL에 대한 결과 값에 SECOND_URL의 결과 값을 붙이는 형식으로, 첫 번째 값에 대한 결과가 나오지 않으면 두 번재 값에 대한 로직을 타지 않는다.
이는 첫 번째 값에 대한 콜백을 받고 두 번째 값에 대한 연산을 진행하는 것과 같은 방식으로, 콜백을 사용하지 않고 동일한 기능을 구현할 수 있다.
Observable<String> source1 = Observable.just(FIRST_URL)
.subscribeOn(Schedulers.io())
.map(OKHttpHelper::get);
Observable<String> source2 = Observable.just(SECOND_URL)
.subscribeOn(Schedulers.io())
.map(OKHttpHelper::get);
Observable.zip(source1, source2, (first, second) -> a + " , " + b)
.subscribe(Log::it);
// 출력 결과
thread-1 | 315 | value = a , b
위의 예제는 concatWith을 사용했다면, 아래의 예제는 zip을 사용하는 방법이다.
concatWith은 첫 번째 Observable과 두 번째 Observable을 순차적으로 진행하여 결과 값을 이어 붙였다면,
zip의 경우 두 Observable을 동시에 실행하므로 더 짧은 수행시간에 같은 결과를 보일 수 있다.
observeOn() 함수
ObserveOn 함수는 처리된 결과를 구독자에게 전달하는 스레드를 지정할 때 사용한다.
subscribeOn 함수는 구독자가 subscribe 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정
observeOn 함수는 처리된 결과를 구독자게에 전달하는 스레드를 지정.
subscribeOn 함수는 처음 지정한 스레드를 고정시키므로 다시 subscribeOn 함수를 호출해도 무시한다.
하지만, observeOn 함수는 호출할 때 마다 스레드를 새로 지정해준다.
'Language > RxJava' 카테고리의 다른 글
[RxJava] 6장. 안드로이드의 RxJava 활용 2 - RecyclerView 클래스 (0) | 2020.06.11 |
---|---|
[RxJava] 6장. 안드로이드의 RxJava 활용 1 - RxAndroid 소개 (0) | 2020.06.11 |
[RxJava] 4장. 리액티브 연산자의 활용 4 - 조건 연산자 및 기타 연산자 (0) | 2020.06.08 |
[RxJava] 4장. 리액티브 연산자의 활용 3 - 결합 연산자 (0) | 2020.06.08 |
[RxJava] 4장. 리액티브 연산자의 활용 2 - 변환 연산자 (0) | 2020.06.05 |