조건 연산자
Observable의 흐름을 제어하는 역할.
- amb() : 둘 중 어느 것이든 먼저 나오는 Observable을 채택
- takeUntil(other) : other Observable에서 데이터가 발행되기 전까지만 현재 Observable을 채택
- skipUntil(other) : other Observable에서 데이터가 발행될 때 까지 현재 Observable을 무시
- all() : Observable에 입력되는 값이 모두 특정 조건에 맞을 때만 true를 발행. 조건에 맞지 않는 값이 있으면 바로 false를 발행.
amb() 함수
여러 개의 Observable 중 하나의 Observable을 선택하는데, 선택 조건은 가장 먼저 데이터를 발행하는 Observable이다.
String[] num1 = {"1", "2", "3"};
String[] num2 = {"4", "5", "6"};
List<Observable<String>> sources = Arrays.asList(
Observable.fromArray(num1)
.doOnComplete(() -> Log.d("Observable #1 : onComplete")),
Observable.fromArray(num2)
.delay(100L, TimeUnit.MILLISECONDS)
.doOnComplete(() -> Log.d("Observable #2 : onComplete"))
);
Observable.amb(sources)
.doOnComplete(() -> Log.d("Result : onComplete"))
.subscribe(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
main | value = 1
main | value = 2
main | value = 3
main | debug = Observable #1 : onComplete
main | debug = Result : onComplete
첫 번째 Observable은 바로 발행, 두 번째 Observable은 100ms의 delay를 주었다.
결과를 보면 알 수 있듯이 두 번째 Observable은 무시하고 첫 번째 Observable만 발행하였으며, onComplete 또한 첫 번째 Observable에서 발생 후, 바로 최종 완료의 onComplete가 발생한 것을 볼 수 있다.
takeUntil(other) 함수
take() 함수에 조건을 설정할 수 있다.
인자로 받은 Observable에서 어떠한 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete)한다.
String[] num = {"1", "2", "3", "4"};
Observable<String> source = Observable.fromArray(num)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS, (val, notUsed) -> val)
.takeUntil(Observable.timer(300L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | value = 1
Thread-1 | value = 2
takeUntil을 통해 300ms 이후 데이터를 발행하기 때문에, 300ms가 되기 전에 발행 된 값말 출력한다.
skipUntil(other) 함수
takeUntil과 정 반대 함수로, 인자로 받은 Observable에서 데이터를 발행할 때 까지 현재 Observable의 데이터 발행을 무시한다.
String[] num = {"1", "2", "3", "4"};
Observable<String> source = Observable.fromArray(num)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS, (val, notUsed) -> val)
.skipUntil(Observable.timer(300L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | value = 3
Thread-1 | value = 4
takeUntil 의 예제에서 takeUntil를 skipUntil로 변경하기만 한 예제이다.
skipUntil 의 경우 takeUtil에서 출력하지 못한 값들을 출력한다.
all() 함수
주어진 조건에 100% 맞을 때만 true를 발행하고 조건에 맞지 않은 데이터가 발행하면 바로 false 값을 발행.
String[] num = {"a", "a", "a"};
Single<Boolean> source = Observable.fromArray(num)
.all(val -> "a".equals(val));
source.subscribe(Log::i);
// 출력 결과
main | value = true
기타 연산자
delay() 함수
단순하게 인자로 받은 time과 시간 단위 만큼 입력 받은 Observable의 데이터 발행을 지연시켜주는 역할.
계산 스케줄러에서 실행된다.
String[] data = {"1", "2", "3"};
Observable<String> source = Observable.fromArray(data)
.delay(100L, TimeUnit.MILLISECONDS);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | 240 | value = 1
Thread-1 | 240 | value = 2
Thread-1 | 240 | value = 3
/// dealy를 제거할 경우 출력 결과
Thread-1 | 140 | value = 1
Thread-1 | 140 | value = 2
Thread-1 | 140 | value = 3
delay의 여부에 따라 출력 결과 자체는 달라지지 않지만, 출력되는 시간에서 delay 만큼의 차이가 있게 된다.
interval : 주기적으로 Observable에서 값을 발행. 현재 스레드에서 실행
timer : 일정 시간이 지난 후 값을 발행. 계산 스케줄러에서 실행
defer : Callable을 등록해두고 실행을 지연.
timeInterval() 함수
어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지 알려주는 함수.
String[] data = {"1", "2", "3"};
CommonUnits.exampleStart();
Observable<Timed<String>> source = Observable.fromArray(data)
.delay(item -> {
CommonUtils.doSomething(); // 100ms 미만의 무작위로 스레드에 sleep을 실행
return Observable.just(item);
})
.timeInterval();
source.subscribe(Log::it);
CommonUnits.sleep(1000);
// 출력 결과
main | 100 | value = Timed[time=24, unit=MILLISECONDS, value = 1]
main | 150 | value = Timed[time=50, unit=MILLISECONDS, value = 2]
main | 230 | value = Timed[time=80, unit=MILLISECONDS, value = 3]
여기서 첫 번째 값의 time 값은 이전에 발행된 데이터가 없기 때문에 무시해도 된다.
'Language > RxJava' 카테고리의 다른 글
[RxJava] 6장. 안드로이드의 RxJava 활용 1 - RxAndroid 소개 (0) | 2020.06.11 |
---|---|
[RxJava] 5장. 스케줄러 (0) | 2020.06.09 |
[RxJava] 4장. 리액티브 연산자의 활용 3 - 결합 연산자 (0) | 2020.06.08 |
[RxJava] 4장. 리액티브 연산자의 활용 2 - 변환 연산자 (0) | 2020.06.05 |
[RxJava] 4장. 리액티브 연산자의 활용 1 - 생성 연산자 (0) | 2020.06.05 |