본문 바로가기

Language/RxJava

[RxJava] 7장. 디버깅과 예외 처리 3 - 흐름 제어

728x90

흐름 제어

Observable이 데이터를 발행하는 속도와 옵서버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수.

주로 데이터를 받아서 처리하는 속도가 발행되는 속도보다 느릴 경우에 발생한다.

 

sample() 함수

특정 시간 동안 가장 최근에 발행된 데이터만 걸러준다.

즉, 아무리 많은 데이터가 들어와도 해당 시간 구간의 마지막 데이터만 발행하고 나머지는 무시한다.

 

String[] data = {"1", "2", "3", "4", "5"};

// 시간 측정용
CommonUtils.exampleStart();

Observable<String> source100 = Observable.fromArray(data)
	.take(4)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source300 = Observable.juest(data[4])
	.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source = Observable.concat(source100, source300)
	.sample(300L, TimeUnit.MILLISECONDS);
    
Source.subscribe(Log::it);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | 232 | value = 2
Thread-1 | 532 | value = 4

 

100ms 지연 후 4개의 데이터를 발행하고, 마지막 데이터를 300ms 지연 후 발행하는 Observable 2개를 concat으로 결합한 후 300ms 간격으로 샘플링 하는 예제이다.

data의 값인 1,2,3,4는 각 100ms 차이로 발행되고, 5는 4가 발행 수 300ms 후에 발행되게 된다.

 

3이 발행되는 시간은 332ms인데, 300ms 간격으로 샘플링 하였기 때문에 첫 번째 출력 값은 3이 아닌 2가 되는 것이다.

마찬가지로, 5가 발행되는 시간은 832ms인데, 두 번째 출력되는 시간은 600ms이기 때문에 가장 마지막에 발행한 4를 출력하게 되는 것이다.

 

여기서, 위의 예제처럼 sample 함수의 실행이 끝나기 전에 Observable이 종료되는 경우, 마지막 값을 발행하려면 emitLast 인자를 true로 설정해주면 된다.

 

Observable<String> source2 = Observable.concat(source100, source300)
	.sample(300L, TimeUnit.MILLISECONDS, true);
    
// 출력 결과
Thread-1 | 232 | value = 2
Thread-1 | 532 | value = 4
Thread-3 | 632 | value = 5

 

 

buffer() 함수

sample은 특정 시간 간격을 기준으로 가장 최근에 발행된 데이터만 넘겨주고 무시하는 반면,

buffer() 함수는 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.

count 값을 인자로 받아 count 값 만큼의 데이터를 모아서 List<T>에 한번에 발행한다.

 

String[] data = {"1", "2", "3", "4", "5", "6"};

// 시간 측정용
CommonUtils.exampleStart();

Observable<String> source100 = Observable.fromArray(data)
	.take(3)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source300 = Observable.juest(data[3])
	.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source200 = Observable.juest(data[4],data[4])
	.zipWith(Observable.interval(200L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source = Observable.concat(source100, source300, source200)
	.buffer(3);
    
source.subscribe(Log::it);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | 503 | value = [1, 2, 3]
Thread-3 | 1203 | value = [4, 5, 6]

 

100ms 간격으로 3개, 300ms 간격으로 1개, 200ms 간격으로 2개의 데이터를 발행시킨 후 buffer를 사용해 3개씩 모아서 발행시키는 예제이다.

처음 3개는 100ms 간격으로 발행되어 503ms 시점에 발행되었고, 그 이후 3개의 데이터는 발행되는데 총 700ms 이후에 완료되기 때문에 1203ms 시점에 두 번째 데이터를 모아서 발행시킨다.

 

여기서, buffer 함수에 두 번째 인자를 넣어서 skip할 데이터의 개수를 설정할 수 있다.

 

...

Observable<String> source = Observable.concat(source100, source300, source200)
	.buffer(2,3);
    
...

// 출력 결과
Thread-1 | 403 | value = [1, 2]
Thread-3 | 1003 | value = [4, 5]

count가 2이고 skip이 3이면 2개의 데이터를 모으고 1개의 데이터를 스킵한다는 것이다.

즉, skip은 count 값 보다 항상 커야하며, count 개수만큼 데이터를 모으고 skip - count 개수만큼 데이터를 스킵한다.

 

 

throttleFirst()와 throttleLast() 함수

throttleFirst주어진 조건에서 가장 먼저 입력된 값을 발행하고, throttleLast주어진 조건에서 가장 마지막에 입력된 값을 발행한다.

여기서, throttleFirst와 throttleLast는 정반대의 의미가 아니다.

throttleFirst는 조건에 부합하는 데이터가 입력된 후 일정 시간 동안 다른 데이터가 발행되지 못하도록 방지하지만,

throttleLast는 sample 함수처럼 고정된 시간 간격 안에서 조건에 부합하는 마지막 데이터만 발행한다.

 

throttleFirst 함수

계산 스케줄러에서 실행된다. 즉, 비동기로 동작하도록 설계.

시간 간격, 시간 단위를 인자로 받아서 사용한다. ( .throttleFirst(200L, TimeUnit.MILLISECONDS) )

 

Observable<String> source = Observable.concat(source100, source300, source200)
	.throttleFirst(200L, TimeUnit.MILLISECONDS)
	.subscribe(Log::it);
    
CommonUtils.sleep(1000);


// 출력 결과
Thread-1 | 100 | value = 1
Thread-1 | 300 | value = 3
Thread-2 | 600 | value = 4
Thread-3 | 800 | value = 5
Thread-3 | 1000 | value = 6

 

위의 buffer 함수와 같은 Observable을 사용한다고 생각하고 예제를 보자.

100ms 간격으로 3개, 300ms 간격으로 1개, 200ms 간격으로 2개의 데이터가 발행된다.

각각 100, 200, 300, 600, 800, 1000ms 시점에 데이터가 발행된다고 생각하고 200ms 간격으로 끊어서 생각해보면 된다.

100, 200을 발행 후 첫 번째 값인 100을 출력하는 것을 제외하고는 모두 해당 간격에 하나의 데이터만 발행되어 발행되는 시점의 시간으로 출력되게 된다.

 

throttleLast 함수

sample 함수와 기본 개념은 동일하다.

 

 

window() 함수

groupBy 함수와 개념적으로 비슷하다. groupBy 함수는 특정 조건에 맞는 입력값들을 그룹화해 별도의 Observable을 병렬로 만든다.

window 함수는 throttleFirst나 sample 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다.

즉, 흐름 제어 기능에 groupBy 함수와 비슷한 Observable 분리 기능을 갖추었다고 생각하면 된다.

 

window 함수는 두 가지 원형이 존재한다.

  • public final Observable<Observable<T>> window(long count)
    • 현재 스레드를 그대로 사용한다.
    • 입력받은 count 개수만큼 데이터가 발행되면 새로운 Observable을 생성한다.
  • public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
    • 어떤 필터링 작업을 수행해야 하기 때문에 계산 스케줄러를 사용한다.

첫 번째 원형의 경우, 현재 스레드를 그대로 사용하며 입력 받은 count 갯수만큼 데이터가 발행되면 새로운 Observable을 생성한다는 의미이다.

두 번째 원형의 경우, 어떤 필터링 작업을 수행해야 하기 때문에 현재 스레드가 아닌 계산 스케줄러를 사용해야 한다.

 

String[] data = {"1", "2", "3", "4", "5", "6"};

// 시간 측정용
CommonUtils.exampleStart();

Observable<String> source100 = Observable.fromArray(data)
	.take(3)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source300 = Observable.juest(data[3])
	.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source200 = Observable.juest(data[4],data[4])
	.zipWith(Observable.interval(200L, TimeUnit.MILLISECONDS), (a,b) -> a);
    
Observable<String> source = Observable.concat(source100, source300, source200)
	.window(3);
    
source.subscribe(observable -> {
	Log.dt("new Observable");
	observable.subscribe(Log::it);
});

CommonUtils.sleep(1000);
COmmonUtils.exampleComplete();

// 출력 결과
Thread-1 | 301 | debug = new Observable
Thread-1 | 303 | value = 1
Thread-1 | 403 | value = 2
Thread-1 | 503 | value = 3
Thread-2 | 806 | debug = new Observable
Thread-2 | 806 | value = 4
Thread-2 | 1006 | value = 5
Thread-2 | 1206 | value = 6

 

buffer에서 사용한 예제를 그대로 가져왔으며, buffer 대신 window를 사용하였다.

하나의 Observable 당 3개의 데이터를 발행하기 때문에 3개의 데이터 발행 후 새로운 Observable을 생성한다.

 

 

debounce() 함수

빠르게 연속 이벤트를 처리하는 흐름 제어 함수.

계산 스케줄러에서 동작.

어떤 이벤트가 입력되고 timeout에서 지정한 시간 동안 추가 이벤트가 발생하지 않으면 마지막 이벤트를 최종적으로 발행한다.

 

String[] data = {"1", "2", "3", "4"};

Observable<String> source = Observable.concat(
	Observable.timer(100L, TimeUnit.MILLISECONDS).map(val -> data[0]),
	Observable.timer(300L, TimeUnit.MILLISECONDS).map(val -> data[1]),
	Observable.timer(100L, TimeUnit.MILLISECONDS).map(val -> data[2]),
	Observable.timer(300L, TimeUnit.MILLISECONDS).map(val -> data[3])
	.debounce(200L, TimeUnit.MILLISECONDS);
    
source.subscribe(Log::i);
CommonUnits.sleep(1000);


// 출력 결과
Thread-1 | value = 1
Thread-1 | value = 3
Thread-1 | value = 4

 

데이터가 발행되는 시간은 100ms, 400ms, 500ms, 800ms 이며 debound의 시간 간격은 200ms이다.

따라서 첫 번째 데이터가 발행된 시점으로부터 200ms 내에 추가적인 데이터 발행이 없으므로 1이 그대로 발행된다.

하지만, 두 번째 데이터 2가 발행된 시점으로 부터 100ms 이후에 3이 발행되므로 2는 무시되고 3이 발행되게 된다.

728x90