본문 바로가기

Language/RxJava

[RxJava] 8장. 테스팅과 Flowable - Flowable 클래스

728x90

Flowable 클래스

  • RxJava 2.x에 새로 도입된 클래스.
  • 배압(backpressure) 이슈를 위해 별도 분리된 클래스.
  • Observable 클래스의 성능을 향상시키기 위해서 도입된 클래스로, 기존의 Observable 클래스에서 배압 처리가 불필요한 경우, 초기 로딩에 약간의 오버헤드가 존재했는데 이것을 제거함.

Flowable의 활용은 기본적으로 Observable과 동일하다.

toObservable(), toFlowable() 함수를 사용하여 Observable과 Flowable을 서로 변환한다.

 

Flowable.just("Hello flowable").subscribe(System.out::println);

Flowable.fromCallable(() -> {
	Thread.sleep(1000);
	return "Done";
}).subscribeOn(Schedulers.io())
	.observeOn(Schedulers.single())
	.subscribe(System.out::println, Throwable::printStackTrace);
    
Thread.sleep(2000);

 

Flowable은 Observable과 사용하는 방식이 거의 일치하기 때문에, 위의 예제에서 Flowable을 Observable로 바꾸어도 정상적으로 동작한다.

 

Observable과 Flowable의 선택 기준

Observable을 사용해야 하는 경우

  • 1,000개 미만의 데이터 흐름.
  • 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍.
  • 데이터 흐름이 본질적으로 동기 방식이지만, 프로젝트에서 사용하는 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 떄 사용. Observable은 보통 Flowable과 비교했을 때 성능 오버헤드가 낮다.

Flowable을 사용해야 하는 경우

  • 특정 방식으로 생성된 10,000개 이상의 데이터를 처리하는 경우. 메서드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 한다.
  • 디스크에서 파일을 읽어 들일 경우. 본직적으로 블로킹 I/O 방식을 활용하고, 내가 원하는 만큼 가져오는 방식(pull-based)으로 처리해야 하기 때문.
  • JDBC를 활용해 DB의 쿼리 결과를 가져오는 경우.
  • 네트워크 I/O를 실행하는 경우.

*

디스크에서 파일 읽기, DB 쿼리 가져오기, 네트워크 I/O 등은 차가운 Observable에 속한다.

데이터 발행의 속도와 데이터 처리 속도의 차이가 작다면 Flowable이 아닌 Observable을 사용해도 된다.

즉, 처리 속도 차이가 나더라도 sample, throttle, debounce와 같은 흐름 제어 함수를 활용하여 먼저 처리하는 것이 좋고, 이런 함수로 처리가 불가능할 경우 Flowable 클래스로 전환하는 것이 좋다.

 

Flowable을 활용한 배압 이슈 대응

  • onBackpressureBuffer() : 배압 이슈가 발생했을 때 별도의 버퍼에 저장. Flowable 클래스는 기본적으로 128개의 버퍼가 있다.
  • onBackpressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시.
  • onBackpressureLatest() : 처리할 수 없어 쌓이는 데이터를 무시하면서 최신 데이터만 유지.

위의 3개의 함수를 사용하여 배압 이슈를 대응한다.

onBackpressureBuffer() 함수

public final Flowable<T> onBackpressureBuffer();

public final Flowable<T> onBackpressureBuffer(boolean delayError);

public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow);

public final Flowable<T> onBackpressureBuffer
	(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy);
  • 첫 번째 오버로딩은 기본 값(128개)의 버퍼 개수가 있다.
  • 두 번째 오버로딩은 delayError 여부를 지정할 수 있다. true면 예외가 발생했을 때 버퍼에 쌓인 데이터를 모두 처리할 때 까지 예외를 던지지 않고, false면 예외가 발생했을 때 바로 다운스트림에 예외를 던진다. default 값은 false이다.
  • 세 번째 오버로딩은 capacity 인자로 버퍼의 개수를 지정하고 onOverflow 인자에 버퍼가 넘쳤을 때 실행할 동작을 지정한다.
  • 네 번째 오버로딩은 버퍼가 가득찼을 때 추가로 실행하는 전략을 지정할 수 있다. 지정할 수 있는 전략은 3개이다. 
    • ERROR : MissingBackpressureException 예외를 던지고 데이터 흐름을 중단한다.
    • DROP_LATEST : 버퍼에 쌓여 있는 최근 값을 제거한다.
    • DROP_OLDEST : 버퍼에 쌓여있는 가장 오래된 값을 제거한다.
CommonUtils.exampleStart();

Flowable.range(1, 50_000_000)
	.onBackpressureBuffer(128, () -> {}, BackpressureOverflowStrategy.DROP>OLDEST)
	.oberveOn(Schedulers.computation())
	.subscribe(data -> {
		CommonUtils.sleep(100);
		Log.it(data);
	}, error -> Log.e(err.toString());

onBackpressureDrop() 함수

버퍼가 가득 찼을 때 이후 데이터를 그냥 무시하는 함수.

 

CommonUtils.exampleStart();

Flowable.range(1, 50_000_000)
	.onBackpressureDrop()
	.oberveOn(Schedulers.computation())
	.subscribe(data -> {
		CommonUtils.sleep(100);
		Log.it(data);
	}, error -> Log.e(err.toString());

CommonUtils.sleep(20_000);

 

onBackpressureBuffer 예제에서 onBackpressureBuffer를 onBackpressureDrop 으로만 변경해준 예제이다.

이와 같이 변경하게 되면, 128개의 버퍼에 모든 값이 찼을 때, 데이터를 계산 스케줄러에서 128개의 버퍼에 있는 값들을 모두 출력해 주고 이후의 데이터는 무시한 채 예제가 끝나게 된다.

onBackpressureLatest() 함수

onBackpressureBuffer 함수와 onBackpressureDrop 함수의 기능을 섞은 것으로, 마지막 값을 발행할 수 있도록 해준다.

 

CommonUtils.exampleStart();

Flowable.range(1, 50_000_000)
	.onBackpressureLatest()
	.oberveOn(Schedulers.computation())
	.subscribe(data -> {
		CommonUtils.sleep(100);
		Log.it(data);
	}, error -> Log.e(err.toString());

CommonUtils.sleep(20_000);

 

이 예제 또한 onBackpressureDrop 함수를 onBackpressureLatest()로 변경하기만 한 예제이다.

출력되는 값은 onBackpressureDrop와 마찬가지로 128개의 버퍼에 있는 값을 모두 출력한 다음, 맨 마지막 값을 출력하게 된다.

즉, 1 ~ 128을 출력한 후 50,000,000 값을 마지막으로 출력한 후 예제가 끝나게 된다.

728x90