본문 바로가기

Language/RxJava

[RxJava] 5장. 스케줄러

728x90

스케줄러

  • 비동기로 동작할 수 있도록 이용한다.
  • subscribeOn, observeOn에 스케줄러를 지정하여 데이터의 흐름이 발생하는 스레드와, 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다.
String[] data = {"1", "2", "3"};

Observable<String> source = Observable.fromArray(data)
	.doOnNext(data -> Log.d("Original data = " + data))
	.subscribeOn(Schedulers.newThread())
	.observeOn(Schedulers.newThread())
	.map(data -> "#" + data);

source.subscribe(Log::i);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | Original data = 1
Thread-1 | Original data = 2
Thread-1 | Original data = 3
Thread-2 | value = #1
Thread-2 | value = #2
Thread-2 | value = #3

 

subscribeOn, observeOn 에서 각각 새로운 스케줄러를 선언하였기 때문에, 출력 결과에서 데이터 입력과 데이터 발행에 사용되는 스레드가 다른것을 볼 수 있다.

 

여기서, observeOn을 사용하지 않으면, subscribeOn에서 지정한 스레드로 모든 로직을 수행하게 된다.

즉, 모든 출력 값이 Thread-1 에서 처리된다는 것이다.

또한, 별도로 스케줄러를 지정하지 않으면 현재(main) 스레드에서 동작을 실행한다.

 

 

스케줄러의 종류

  • 뉴 스레드 스케줄러(newThread())
  • 싱글 스레드 스케줄러(single())
  • 계산 스케줄러(computation())
  • IO 스케줄러(io())
  • 트램펄린 스케줄러(trampoline())
  • 메인 스레드 스케줄러 > 지원 안함
  • 테스트 스케줄러 > 지원 안함

뉴 스레드 스케줄러

이름처럼 새로운 스레드를 생성한다.

Schedulers.newThread()를 인자로 넣으면 새로운 스레드를 만든다.

subscribeOn, observeOn 함수에서 많이 사용한다.

 

String[] num = {"1","2"};

Observable<String> source1 = Observable.fromArray(num)
	.doOnNext(data -> Log.v("Original data : " + data))
	.map(data -> "#" + data)
	.subscribeOn(Schedulers.newThead())
	.subscribe(Log::i);

CommonUtils.sleep(1000);


Observable<String> source2 = Observable.fromArray(num)
	.doOnNext(doata -> Log.v("Original data : " + data))
	.map(data -> "##" + data)
	.subscribeOn(Schedulers.newThread())
	.subscribeOn(Log::i);
    
CommonUtils.sleep(1000);


// 출력 결과
Thread-1 | Original data = 1
Thread-1 | value = #1
Thread-1 | Original data = 2
Thread-1 | value = #2
Thread-2 | Original data = 1
Thread-2 | value = ##1
Thread-2 | Original data = 2
Thread-2 | value = ##2

 

뉴 스케줄러를 사용했기 때문에 첫 번째 Observable과 두 번째 Observable의 실행 스레드가 다르게 된다.

 

계산 스케줄러

CPU에 대응하는 계산용 스케줄러.

계산 작업을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요하다.

내부적으로 스레드 풀을 생성하며, 스레드 개수는 기본적으로 프로세서 개수와 동일하다.

 

Observable<String> source = ... ;

source.map(val -> "#" + val)
	.subscribeOn(Schedulers.computation())
	.subscribe(Log::i);

 

위와 같이 subscribeOn(Schedulers.computation())으로 사용한다.

여러번 구독할 때, 계산 스케줄러를 각각 선언해주면 각각 다른 스레드로 동작하는 것을 볼 수 있다.

 

IO 스케줄러

네트워크상의 요청을 처리하거나 각종 입/출력 작업을 실행하기 위한 스케줄러.

계산 스케줄러와 다른 점은 기본으로 생성되는 스레드 개수가 다르다는 점이다.

 

입/출력 작업은 비동기로 실행되지만 결과를 얻기까지 대기 시간이 긴 편이다.

 

계산 스케줄러는 CPU의 개수만큼 스레드를 생성
IO 스케줄러는 필요할 때마다 스레드를 계속 생성.

 

subscribeOn(Schedulers.io()); 로 사용한다.

 

트램펄린 스케줄러

새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)를 생성하는 스케줄러.

새로운 스레드를 생성하지 않는 것과, 대기 행렬을 자동으로 만들어 준다는 것이 다른 스케줄러와 다른 점이다.

 

subscribeOn(Schedulers.trampoline()); 으로 사용한다.

 

싱글 스레드 스케줄러

단일 스레드를 별도로 생성하여 구독 작업을 처리.

생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용한다는 특징.

 

subscribeOn(Schedulers.single()) 으로 사용한다.

 

Observable<Integer> number = Observable.range(10,3);
Observable<String> chars = Observable.just("a","b","c");

number.subscribeOn(Schedulers.single())
	.subscribe(Log::i);
chars.subscribeOn(Schedulers.single())
	.subscribe(Log::i);
    
// 출력 결과
Thread-1 | value = 10
Thread-1 | value = 11
Thread-1 | value = 12
Thread-1 | value = a
Thread-1 | value = b
Thread-1 | value = c

 

출력 결과에서 볼 수 있듯이, 하나의 스레드를 사용하여 모든 값을 처리한다.

 

Executor 변환 스케줄러

기존에 사용하던 Executor 클래스를 재사용할 때만 한정적으로 사용한다.

 

subscribeOn(Schedulers.from(executor)); 으로 사용한다.

 

스케줄러를 이용한 예제.

CommonUtils.exampleStart();

Observable<String> source = Observable.just(FIRST_URL)
	.subscribeOn(Schedulers.io())
	.map(OKHttpHelper::get)
	.concatWith(Observable.just(SECOND_URL)
		.map(OKHttpHelper::get));

source.subscribe(Log::it);
CommonUtils.sleep(1000);

 

네트워크 통신을 하는 부분에서, 콜백 지옥을 벗어날 수 있는 방법이다.

FIRST_URL에 대한 결과 값에 SECOND_URL의 결과 값을 붙이는 형식으로, 첫 번째 값에 대한 결과가 나오지 않으면 두 번재 값에 대한 로직을 타지 않는다.

이는 첫 번째 값에 대한 콜백을 받고 두 번째 값에 대한 연산을 진행하는 것과 같은 방식으로, 콜백을 사용하지 않고 동일한 기능을 구현할 수 있다.

 

Observable<String> source1 = Observable.just(FIRST_URL)
	.subscribeOn(Schedulers.io())
	.map(OKHttpHelper::get);
    
Observable<String> source2 = Observable.just(SECOND_URL)
	.subscribeOn(Schedulers.io())
	.map(OKHttpHelper::get);
    
Observable.zip(source1, source2, (first, second) -> a + " , " + b)
	.subscribe(Log::it);

// 출력 결과
thread-1 | 315 | value = a , b

 

위의 예제는 concatWith을 사용했다면, 아래의 예제는 zip을 사용하는 방법이다.

concatWith은 첫 번째 Observable과 두 번째 Observable을 순차적으로 진행하여 결과 값을 이어 붙였다면,

zip의 경우 두 Observable을 동시에 실행하므로 더 짧은 수행시간에 같은 결과를 보일 수 있다.

 

 

observeOn() 함수

ObserveOn 함수는 처리된 결과를 구독자에게 전달하는 스레드를 지정할 때 사용한다.

 

subscribeOn 함수는 구독자가 subscribe 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정

observeOn 함수는 처리된 결과를 구독자게에 전달하는 스레드를 지정.

 

subscribeOn 함수는 처음 지정한 스레드를 고정시키므로 다시 subscribeOn 함수를 호출해도 무시한다.
하지만, observeOn 함수는 호출할 때 마다 스레드를 새로 지정해준다.
728x90