본문 바로가기

Language/RxJava

[RxJava] 2장. Observable 처음 만들기 -1

728x90

Observable 클래스.

  • 옵서버 패턴을 구현.
  • 객체의 상태 변화를 관찰하는 관찰자 목록을 객체에 등록하여 상태에 변화가 있을 때마다 메서드를 호출.
  • 라이프 사이클은 존재하지 않음.

Observable은 세 가지 알림을 구독자에게 전달한다.

  1. onNext() : Observable이 데이터 발행을 알림.
  2. onComplete() : 모든 데이터의 발행을 완료했음을 알림. 단 한 번 만 발생하여 해당 이벤트가 발생한 후에 onNext()는 발생하면 안 된다. 마블 다이어그램에서 파이프(|)로 표시된다.
  3. onError() : Observable에서 어떤 이유로 에러가 발생했음을 알림. 발생 후에 onNext(), onComplete()는 발생하지 않음. 즉, onError 호출 시 Observable의 실행을 종료. 마블 다이어 그램에서 X로 표시.

 

Just() 함수

데이터를 발생하는 가장 쉬운 방법.

 

Observable.just(1,2).subscribe(System.out::println);

 

와 같이 사용되며, 다양한 Type이 인자로 들어갈 수 있다.

인자로 넣은 데이터를 차례로 발행하며, 인자는 최소 1개에서 최대 10개까지 발행 가능.

또한, 인자는 모두 같은 타입으로만 발행해야 한다.

subscribe를 호출해야 실제 데이터 발행이 시작된다.

subscribe() 함수

just() 함수 등을 통해 데이터 흐름을 정의한 후 subscribe()를 호출해야 실제로 데이터가 발행.

데이터가 발행되는 시점을 조절할 수 있다.

subscribe는 인자가 0~3개까지 올 수 있다.

  • 인자가 0개 : onError 이벤트 발생 시, onErrorNotImplementedException을 던진다.
  • 인자가 1개 : onNext 이벤트 처리. onError 발생 시 위의 Exception을 던진다.
  • 인자가 2개 : onNext, onError 이벤트 처리
  • 인자가 3개 : onNext, onError, onComplete 이벤트 모두 처리.

dispose() 함수

Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수.

Observable이 onComplete를 호출했을 때, 자동으로 dispose()를 호출하여 구독 관계를 끊는다.

즉, onComplete가 정상적으로 발생했다면 dispose는 명시적으로 선언하지 않아도 된다.

isDisposed()를 통해 정상적으로 구독을 해지했는지 확인할 수 있다.

return 값이 true 이면 정상적으로 해지, false 이면 해지하지 못했다는 것이다.

create() 함수

just() 와 다르게 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.

 

Observable<Integer> source = Observable.create(
    (ObservableEmitter<Integer> emit) -> { 
        emit.onNext(1); 
        emit.onNext(2);
        emit.onComplete();
    });
source.subscribe(System.out::println);

 

이 코드는 위의 Just() 함수에서 설명했던 것과 동일한 동작을 하는 코드이다.

하지만, create로 선언하였기 때문에 onNext, onComplete 같은 알림을 직접 작성해서 사용하였다.

Observable.create() 함수 사용 시 주의할 점.

  1. Observable이 구독 해지되었을 때, 등록된 콜백을 모두 해제하지 않으면 메모리 누수(Memory Leak)이 발생한다.
  2. 구독자가 구독하는 동안에만 onNext, onComplete를 호출해야 한다.
  3. 에러 발생 시, 오직 onError 이벤트로만 에러를 처리해야 한다.
  4. 배압(back Pressure)을 직접 처리해야 한다.

단일 데이터가 아닌 다중 데이터를 처리하기 위해서는 fromXXX() 함수를 사용한다.

fromArray() 함수

배열에 들어있는 데이터를 처리할 때 사용하는 함수이다.

 

Integer[] ary = {1, 2, 3};
Observable<Integer> source = Observable.fromArray(ary).subscribe(System.out::println);

 

여기서 반드시 int가 아닌 Integer로 사용해야 한다.

int 배열을 사용하는 경우, int형 값 1,2,3이 나오는 것이 아닌 해당 배열의 주소 값이 출력되게 된다.

따라서, int형 배열을 사용하는 경우 toIntegerArray()를 사용해야 한다.

 

int[] ary = {1, 2, 3};
Observable<Integer> source = Observable.fromArray(toIntegerArray(ary))
        .subscribe(System.out::println);

 

fromIterable() 함수

Iterable 인터페이스를 구현한 클래스에서 Observable을 사용하는 경우.

Iterable 인터페이스를 구현하는 대표적은 클래스는 ArrayList, ArrayBlockingQueue, HashSet, LinkedList, Stack, Vector 등이 존재한다.

사용하는 방법은 위의 fromArray와 동일하게 사용하되, 객체 생성만 해당 클래스에 적합하게 생성해 주면 된다.

 

List<String> sample = new ArrayList<>();
sample.add("sample1");
sample.add("sample2");
sample.add("sample3");

Observable<String> source = Observable.fromIterable(sample).subscribe(System.out::println);

 

fromCallable() 함수

RxJava에서 제공하는 비동기 클래스 중 하나인 Callable 인터페이스를 사용한 Observable이다.

Callable 인터페이스는 비동기 실행 후 결과를 반환하는 call() 메서드를 정의해서 사용할 수 있다.

즉, 인자가 없으나 Return 타입은 존재하는 인터페이스란 것이다.

 

Callable<String> call = () -> {
    Thread.sleep(5000);
    return "sleep 5sec";
};

Observable<String> source = Observable.fromCallable(call).subscribe(System.out::println);

 

fromFuture() 함수

비동기 계산의 결과를 구할 때 사용.

Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환.

get() 호출 시, Callable 객체에서 구현한 계산 결과가 나올 때까지 블로킹 된다.

 

Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
        Thread.sleep(1000);
        return "future";
});

Observable<String> source = Observable.fromFuture(future).subscribe(System.out::println);
728x90