결합 연산자
여러개의 Observable을 내가 원하는 Observable로 결합하는데 사용하는 연산자.
- zip : Observable에서 데이터를 모두 새로 발행했을 때 그것을 합해준다.
- combineLatest : 처음에 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하던 항상 최신 값으로 갱신.
- merge : 최신 데이터 여부와 상관없이 각 Observable에서 발행하는 데이터를 그대로 출력.
- concat : 입력된 Observable을 Observable 단위로 이어 붙여준다.
zip() 함수
각각의 Observable을 모두 확용해 2개 혹은 그 이상의 Observable을 결합한다.
최대 9개의 Observable을 결합할 수 있다.
A, B 2개의 Observable을 결합한다면, A, B Observable의 발행이 모두 끝나야 결합이 가능하다. 그 전에는 결합할 수 없다.
String[] shape = {"A", "B", "C"};
String[] color = {"1-C", "2-A", "3-B"};
Observable<String> source = Observable.zip(
Observable.fromArray(shape).map(Shape::getSuffix) , // 모양을 가져온다
Observable.fromArray(color).map(Shape::getColor) , // 색을 가져온다.
(suffix , color) -> color + suffix);
source.subscribe(Log::i);
// 출력 결과
main | value = 1-A
main | value = 2-B
main | value = 3-C
2개의 Observable을 선언하고, getSuffix, getColor 함수를 통하여 각 배열에서 원하는 값만 map으로 가져온다.
그 후, 두 값을 더하여 출력 결과와 같은 값을 구해낸다.
Observable<Integer> source = Observable.zip(
Observable.just(100, 200, 300) ,
Observable.just(10, 20, 30) ,
Observable.just(1, 2, 3),
Observable.interval(200L, TimeUnit.MILLISECONDS) ,
(a, b, c, time) -> a + b + c );
CommonUtils.exampleStart();
source.subscribe(Log::it);
CommonUtils.sleep(1000);
// 출력 결과
Thread-1 | 200 | value = 111
Thread-1 | 400 | value = 222
Thread-1 | 600 | value = 333
와 같이 3개 이상의 Observable을 zip으로 결합할 수 있으며, interval 함수를 사용하여 출력 값에 딜레이를 줄 수 있다.
출력 된 결과는 interval 함수에서 주어진대로 200ms 만큼의 delay를 주고 출력이 되며, 각 자릿수를 더하여 출력하기 때문에 111, 222, 333이 결과 값으로 출력되게 된다.
*
zip 연산자를 사용할 때 필요한 부분에서 여러 개의 Observable로 나누어서 생각하여 적용하는 것이 중요.
또한, index++ 와 같은 함수 사용시 부수효과가 발생하기 때문에, Pair 클래스를 사용하여 해결하면 된다.
zipWith()
zip 함수와 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈히 호출할 수 있는 장점이 있는 함수.
Observable<Integer> source = Observable.zip(
Observable.just(100, 200, 300) ,
Observable.just(10, 20, 30) ,
(a, b) -> a + b)
.zipWith(Observable.just(1, 2, 3) , (ab, c) -> ab + c);
source.subscribe(Log::i);
// 출력 결과
main | value = 111
main | value = 222
main | value = 333
2개의 Observable을 zip으로 결합하고 zipWith을 사용하여 세 번째 Observable을 결합하였다.
combineLatest() 함수
2개 이상의 Observable을 기반으로 Observable의 각각의 값이 변경될 때 갱신해주는 함수.
각 Observable은 최초의 발행을 모두 수행해야 구독자에게 발행되지만, 그 이후에는 Observable 중 어느 하나라도 갱신되면 구독자에게 발행한다.
입력할 수 있는 Observable 인자의 개수는 최대 9개 이다.
String[] shape = {"A", "B", "C"};
String[] num = {"1", "2", "3", "4"};
Observable<String> source = Observable.combineLatest(
Observable.fromArray(num)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS) ,
(num , notUsed) -> num) ,
Observable.fromArray(shape)
.zipWith(Observable.interval(150L, 200L, TimeUnit.MILLISECONDS) ,
(shape , notUsed) -> shape) ,
(v1, v2) -> v1 + v2);
source.subscribe(Log::i);
CommonUtils.sleep(1000);
// 출력 결과
Thread-2 | value = 1A
Thread-1 | value = 2A
Thread-1 | value = 3A
Thread-2 | value = 3B
Thread-1 | value = 4B
Thread-2 | value = 4C
출력 결과를 보면, 150ms 일 때 두 번째 Observable에서 값을 발행하기 때문에 Thread-2에서 값이 발행됨을 볼 수 있다. 그 후에 200ms, 300ms에서 첫 번째 Observable에서 값을 발행해서 2,3으로 늘어나며
350ms에서는 A가 B로 갱신, 400ms는 3이 4로, 550에 B가 C로 갱신되면서 모든 값의 발행이 끝이 난다.
merge() 함수
먼저 입력되는 데이터를 그대로 발행.
Observable<String> source = Observable.merge(source1, source2);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
merge를 통해 2개의 Observable을 합치며, 각 Observable에서 입력되는 순서대로 데이터를 발행한다.
즉, source1, source2에서 각각 151ms, 100ms 의 간격을 두고 출력한다면 다음과 같은 결과 값이 나올 것이다.
Thread-2 | 100 | value = "2번째 스레드"
Thread-1 | 151 | value = "1번째 스레드"
Thread-1 | 200 | value = "1번째 스레드"
Thread-1 | 300 | value = "1번째 스레드"
Thread-2 | 302 | value = "2번째 스레드"
Thread-1 | 400 | value = "1번째 스레드"
concat() 함수
2개 이상의 Observable을 이어 붙여주는 함수.
첫 번째 Observable에서 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다.
즉, 첫 번째 Observable에서 onComplete 이벤트가 발생하지 않으면 두 번째 Observable은 영원히 대기상태가 되어 메모리 누수의 위험을 가진다.
따라서, 반드시 완료(onComplete)될 수 있게 해주어야 한다.
결합 가능한 Observable의 수는 최대 4개 이다.
String[] num1 = {"1", "2", "3"};
String[] num2 = {"4", "5", "6"};
Observable<String> source1 = Observable.fromArray(num1)
.doOnComplete(() -> Log.d("onComplete"));
Observable<String> source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(index -> num2[index])
.take(num2.length)
.doOnComplete(() -> Log.d("onComplete"));
Observable<String> source = Observable.concat(source1, source2)
.doOnComplete(() -> Log.d("onComplete"));
source.subscribe(Log::i);
CommonUtils.Sleep(1000);
// 출력 결과
main | value = 1
main | value = 2
main | value = 3
main | debug = onComplete // source1 발행 완료
Thread-1 | value = 4
Thread-1 | value = 5
Thread-1 | value = 6
Thread-1 | debug = onComplete // source2 발행 완료
Thread-1 | debug = onComplete // source 발행 완료
*
doOnNext(), doOnComplete(), doOnError() 함수를 사용하여 Observable의 중간 상태를 확인할 수 있다.
이는 부수 효과를 발생시키는 단점을 가지고 있지만, 유지보수면을 생각하면 추가하여 확인할 수 있는 부분을 만드는 것이 더 좋을 수 있다.
'Language > RxJava' 카테고리의 다른 글
[RxJava] 5장. 스케줄러 (0) | 2020.06.09 |
---|---|
[RxJava] 4장. 리액티브 연산자의 활용 4 - 조건 연산자 및 기타 연산자 (0) | 2020.06.08 |
[RxJava] 4장. 리액티브 연산자의 활용 2 - 변환 연산자 (0) | 2020.06.05 |
[RxJava] 4장. 리액티브 연산자의 활용 1 - 생성 연산자 (0) | 2020.06.05 |
[RxJava] 3장. 리액티브 연산자 입문 (0) | 2020.06.03 |