본문 바로가기

Language/RxJava

[RxJava] 4장. 리액티브 연산자의 활용 4 - 조건 연산자 및 기타 연산자

728x90

조건 연산자

Observable의 흐름을 제어하는 역할.

 

  1. amb() : 둘 중 어느 것이든 먼저 나오는 Observable을 채택
  2. takeUntil(other) : other Observable에서 데이터가 발행되기 전까지만 현재 Observable을 채택
  3. skipUntil(other) : other Observable에서 데이터가 발행될 때 까지 현재 Observable을 무시
  4. all() : Observable에 입력되는 값이 모두 특정 조건에 맞을 때만 true를 발행. 조건에 맞지 않는 값이 있으면 바로 false를 발행.

amb() 함수

여러 개의 Observable 중 하나의 Observable을 선택하는데, 선택 조건은 가장 먼저 데이터를 발행하는 Observable이다.

 

String[] num1 = {"1", "2", "3"};
String[] num2 = {"4", "5", "6"};

List<Observable<String>> sources = Arrays.asList(
	Observable.fromArray(num1)
		.doOnComplete(() -> Log.d("Observable #1 : onComplete")),

	Observable.fromArray(num2)
		.delay(100L, TimeUnit.MILLISECONDS)
		.doOnComplete(() -> Log.d("Observable #2 : onComplete"))
);


Observable.amb(sources)
	.doOnComplete(() -> Log.d("Result : onComplete"))
	.subscribe(Log::i);

CommonUtils.sleep(1000);


// 출력 결과
main | value = 1
main | value = 2
main | value = 3
main | debug = Observable #1 : onComplete
main | debug = Result : onComplete

 

첫 번째 Observable은 바로 발행, 두 번째 Observable은 100ms의 delay를 주었다.

결과를 보면 알 수 있듯이 두 번째 Observable은 무시하고 첫 번째 Observable만 발행하였으며, onComplete 또한 첫 번째 Observable에서 발생 후, 바로 최종 완료의 onComplete가 발생한 것을 볼 수 있다.

 

takeUntil(other) 함수

take() 함수에 조건을 설정할 수 있다.

인자로 받은 Observable에서 어떠한 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete)한다.

 

String[] num = {"1", "2", "3", "4"};

Observable<String> source = Observable.fromArray(num)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS, (val, notUsed) -> val)
	.takeUntil(Observable.timer(300L, TimeUnit.MILLISECONDS));
    
source.subscribe(Log::i);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | value = 1
Thread-1 | value = 2

 

takeUntil을 통해 300ms 이후 데이터를 발행하기 때문에, 300ms가 되기 전에 발행 된 값말 출력한다.

 

skipUntil(other) 함수

takeUntil과 정 반대 함수로, 인자로 받은 Observable에서 데이터를 발행할 때 까지 현재 Observable의 데이터 발행을 무시한다.

 

String[] num = {"1", "2", "3", "4"};

Observable<String> source = Observable.fromArray(num)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS, (val, notUsed) -> val)
	.skipUntil(Observable.timer(300L, TimeUnit.MILLISECONDS));
    
source.subscribe(Log::i);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | value = 3
Thread-1 | value = 4

 

takeUntil 의 예제에서 takeUntil를 skipUntil로 변경하기만 한 예제이다.

skipUntil 의 경우 takeUtil에서 출력하지 못한 값들을 출력한다.

 

all() 함수

주어진 조건에 100% 맞을 때만 true를 발행하고 조건에 맞지 않은 데이터가 발행하면 바로 false 값을 발행.

 

String[] num = {"a", "a", "a"};

Single<Boolean> source = Observable.fromArray(num)
	.all(val -> "a".equals(val));

source.subscribe(Log::i);

// 출력 결과

main | value = true

 

 

기타 연산자

delay() 함수

단순하게 인자로 받은 time과 시간 단위 만큼 입력 받은 Observable의 데이터 발행을 지연시켜주는 역할.

계산 스케줄러에서 실행된다.

 

String[] data = {"1", "2", "3"};

Observable<String> source = Observable.fromArray(data)
	.delay(100L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);
CommonUtils.sleep(1000);

// 출력 결과
Thread-1 | 240 | value = 1
Thread-1 | 240 | value = 2
Thread-1 | 240 | value = 3


/// dealy를 제거할 경우 출력 결과
Thread-1 | 140 | value = 1
Thread-1 | 140 | value = 2
Thread-1 | 140 | value = 3

 

delay의 여부에 따라 출력 결과 자체는 달라지지 않지만, 출력되는 시간에서 delay 만큼의 차이가 있게 된다.

 

 

interval : 주기적으로 Observable에서 값을 발행. 현재 스레드에서 실행
timer : 일정 시간이 지난 후 값을 발행. 계산 스케줄러에서 실행
defer : Callable을 등록해두고 실행을 지연.

timeInterval() 함수

어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지 알려주는 함수.

 

String[] data = {"1", "2", "3"};

CommonUnits.exampleStart();

Observable<Timed<String>> source = Observable.fromArray(data)
	.delay(item -> {
		CommonUtils.doSomething();	// 100ms 미만의 무작위로 스레드에 sleep을 실행
		return Observable.just(item);
	})
	.timeInterval();


source.subscribe(Log::it);
CommonUnits.sleep(1000);


// 출력 결과
main | 100 | value = Timed[time=24, unit=MILLISECONDS, value = 1]
main | 150 | value = Timed[time=50, unit=MILLISECONDS, value = 2]
main | 230 | value = Timed[time=80, unit=MILLISECONDS, value = 3]

 

여기서 첫 번째 값의 time 값은 이전에 발행된 데이터가 없기 때문에 무시해도 된다.

728x90