Single 클래스
Observable 클래스는 데이터를 무관하게 발행할 수 있지만, Single 클래스는 오직 1개의 데이터만 발행하도록 한정한다.
데이터 하나가 발행과 동시에 종료된다는 특징을 가지고 있다.
onNext와 onComplete 가 onSuccess로 통합되어 onSuccess와 onError 함수로 구성된다.
Single<String> source = Single.just("Single").subscribe(System.out::println);
Single 클래스는 Observable에서도 사용할 수 있는 다양한 방법이 존재한다.
Observable에서 Single 객체를 생성해서 사용하면 되는데, Observable 특성상 한 개 이상의 데이터 값을 발행할 수 있는데, Single 객체를 생성할 경우 조건에 부합하는 하나의 데이터만 발행하고 onSuccess를 통해 종료되기 때문에 여러 개의 데이터 중 하나의 데이터만 발행된다.
Observable.just("Observable to Single").single("Default").subscribe(System.out::println);
Observable.fromArray(ary).first("Default").subscribe(System.out::println);
두 번째 예시의 경우, fromArray를 통해 여러 개의 데이터를 인자로 받았지만, first 함수를 통해 Single 클래스를 생성하여 사용하기 때문에 첫 번째 데이터만 발행하고 끝나게 된다.
Maybe 클래스
RxJava 2에 처음 도입된 Observable의 특수 형태.
Single에 onComplete 메서드가 추가된 형태로 Single에서 데이터를 하나도 발행하지 않고 완료하는 경우를 추가한 클래스이다.
데이터 발행 개수가 0개 또는 1개인 클래스로 onSuccess, onComplete, onError 3개의 이벤트가 존재하게 된다.
뜨거운 Observable
구독자의 여부와 상관없이 데이터를 발행한다.
즉, 구독자는 Observable에서 발행되는 데이터를 처음부터 모두 수신하는 것을 보장할 수 없는 Observable이다.
뜨거운 Observable은 배압을 고려해야 한다.
배압이란 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생한다.
차가운 Observable
어떤 함수를 호출하여도 구독자가 구독을 해야 데이터를 발행하는 Observable. 즉, subscribe()를 호출해야 데이터를 발행하는 Observable 형태이다.
차가운 Observable은 Subject 객체 / ConnectableObservable 클래스를 활용하여 뜨거운 Observable 객체로 변환할 수 있다.
Subject 클래스
차가운 Observable을 뜨거운 Observable로 바꿔주며,
Observable의 속성(데이터 발행)과 구독자(데이터 처리)의 속성이 모두 있다는 특징을 가지고 있다.
AsyncSubject 클래스
Observable에서 발행한 마지막 데이터를 얻어 올 수 있는 Subject 클래스.
몇 개의 데이터가 발행되던 상관없이, 마지막 데이터만 발행한다.
AsyncSubject<Integer> subject = AsyncSubject.create();
subject.subscribe(date -> System.out.println("#1 : " + data);
subject.onNext(1);
subject.onNext(2);
subject.subscribe(date -> System.out.println("#2 : " + data);
subject.onNext(3);
subject.onNext(4);
subject.subscribe(date -> System.out.println("#3 : " + data);
subject.onComplete();
subject.onNext(5);
subject.subscribe(date -> System.out.println("#4 : " + data);
// 출력 값
#1 : 4
#2 : 4
#3 : 4
#4 : 4
구독하는 타이밍과 상관없이 해당 Observable에서 마지막에 발행된 값을 가져온다.
onComplete() 이벤트가 발생 한 후의 onNext는 무시되기 때문에 5가 아니라 4의 값이 출력값으로 나온다.
BehaviorSubject 클래스
구독자가 구독을 하면 가장 최근 값 혹은 기본 값을 넘겨주는 클래스.
Observable이 데이터를 발행하기 전에 구독하는 경우, 가장 최근 값이 없기 때문에 기본 값을 넘겨주게 된다.
BehaviorSubject<String> subject = BehaviorSubject.createDefault("default");
subject.subscribe(data -> System.out.println("#1 : " + data);
subject.onNext("1");
subject.onNext("2");
subject.subscribe(data -> System.out.println("#2 : " + data);
subject.onNext("3");
subject.onComplete();
// 출력 값
#1 : default
#1 : 1
#1 : 2
#2 : 2
#1 : 3
#2 : 3
PublishSubject 클래스
가장 평범한 Subject 클래스로, 구독자가 subscribe()를 호출하면 값을 발행하기 시작한다.
기본 값은 없으며 구독한 이후에 Observable에서 발행한 데이터를 구독자는 그대로 전달받는다.
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(data -> System.out.println("#1 : " + data);
subject.onNext("1");
subject.onNext("2");
subject.subscribe(data -> System.out.println("#2 : " + data);
subject.onNext("3");
subject.onComplete();
// 출력 값
#1 : 1
#1 : 2
#1 : 3
#2 : 3
2번째 구독자가 구독한 이후에 발행된 값은 3밖에 없기 때문에 2번째 구독자는 3의 값만 출력한다.
ReplaySubject 클래스
구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해 주는 클래스로, 사용 시 주의해야 한다.
메모리 누수가 발생할 가능성을 염두에 두고 사용해야 한다.
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data -> System.out.println("#1 : " + data);
subject.onNext("1");
subject.onNext("2");
subject.subscribe(data -> System.out.println("#2 : " + data);
subject.onNext("3");
subject.onComplete();
// 출력 값
#1 : 1
#1 : 2
#2 : 1
#2 : 2
#1 : 3
#2 : 3
ConnectableObservable 클래스
차가운 Observable을 뜨거운 Observable로 변환하는 클래스.
원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용.
publish() 함수를 호출하여 ConnectableObservable 객체를 생성하기 때문에 subscribe()를 호출해도 아무런 동작이 일어나지 않고, connect() 함수를 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행한다.
즉, publish() 함수가 connect()를 호출할 때 가지 데이터를 발행하지 않도록 블로킹해 준다.
String[] value = {"1", "2", "3"};
Observable<String> src = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(i -> value[i]) // value의 i 번째 값을 가져온다.
.take(value.length); // value 배열의 길이만큼의 값을 가져온다.
ConnectableObservable<String> source = src.publish();
source.subscribe(data -> System.out.println("#1 : " + data);
source.subscribe(data -> System.out.println("#2 : " + data);
source.connect();
CommonUtils.sleep(250);
source.subscribe(data -> System.out.println("#3 : " + data);
// 출력 값
#1 : 1
#2 : 1
#1 : 2
#2 : 2
#1 : 3
#2 : 3
#3 : 3
100ms마다 값을 하나씩 발행하고, 3번째 구독자가 구독하기까지 250ms만큼 지연시간이 있기 때문에 3번째 구독자는 300ms에 발행하는 데이터 "3"만 출력이 가능하다.
'Language > RxJava' 카테고리의 다른 글
[RxJava] 4장. 리액티브 연산자의 활용 2 - 변환 연산자 (0) | 2020.06.05 |
---|---|
[RxJava] 4장. 리액티브 연산자의 활용 1 - 생성 연산자 (0) | 2020.06.05 |
[RxJava] 3장. 리액티브 연산자 입문 (0) | 2020.06.03 |
[RxJava] 2장. Observable 처음 만들기 -1 (0) | 2020.06.03 |
[RxJava] 1장. 리액티브 프로그래밍 소개 (0) | 2020.06.03 |