본문 바로가기

Language/RxJava

[RxJava] 4장. 리액티브 연산자의 활용 1 - 생성 연산자

728x90

생성 연산자 : just(), fromXXX(), create() ...

변환 연산자 : map(), flatMap(), groupBy(), buffer() ...

필터 연산자 : filter(), take(), skip() ...

결합 연산자 : zip(), concat(), merge(), combineLatest()

조건 연산자 : takeUntil(), all() ...

에러 처리 연산자 : onErrorReturn(), retry() ...

기타 연산자 : subscribe(), reduce(), observeOn() ...

생성 연산자

  • 데이터 흐름을 만드는 연산자.
  • Observable을 만드는 연산자라고 생각하면 된다.

 

interval() 함수

일정 시간 간격으로 데이터 흐름을 생성하며, 현재 스레드가 아닌 계산 스케줄러에서 실행된다.

주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 발행한다.

* Long 객체는 long type 이 아닌 rapper class인 Long 객체를 의미한다.

 

// 원형 1
Observable<Long> interval1(long period, TimeUnit unit)

// 원형 2
Observable<Long> interval2(long initialDelay, long period, TimeUnit unit)

 

interval 함수는 2가지 형태가 존재한다.

unit 의 시간 기준으로 period 만큼 쉬었다가 데이터를 발행하며, 두번째 형태에서의 initialDelay 값을 줌으로써 최초 지연 시간을 조절할 수 있다.

 

// 원형 1
Observable<Long> interval1(100L, TimeUnit.MILLISECONDS)

// 원형 2
Observable<Long> interval1(0L, 100L, TimeUnit.MILLISECONDS)

 

위와 같은 형태로 interval을 사용한다고 했을 때,

두 경우 모두 100ms 의 Delay를 가지고 데이터를 발행하는 것은 똑같으나, 첫 번째 값을 발행되는 시간이 두 번째 타입이 첫 번째 타입보다 100ms 빠르게 된다.

 

즉, initialDelay 값을 설정하지 않으면 첫 번째 데이터도 period 만큼 지연 후에 발행이 되며, 해당 값을 설정하면 첫 번째 발행되는 값에 한정하여 지연시간이 period가 아닌 initiaDelay 만큼의 지연 후에 데이터가 발행되게 된다.

 

interval 함수는 기본적으로 영원히 지속되기 때문에 폴링 용도로 많이 사용한다.

 

* 폴링 (Polling) 이란 ?

하나의 장치(또는 프로그램)가 충돌 회피 또는 동기화 처리 등을 목적으로 다른 장치(또는 프로그램)의 상태를 주기적으로 검사하여 일정한 조건을 만족할 때 송수신 등의 자료 처리를 하는 방식.

 

Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONS)
	.map(data -> data + 1)
	.take(3)
	.subscribe(Log::it);
    
CommonUtils.sleep(1000);

// 출력 결과
TreadNmae | 100 | value = 1
TreadNmae | 200 | value = 3
TreadNmae | 300 | value = 2

 

Log 클래스의 it 메서드의 출력 방식

스레드 이름 | 발행 시간 | value = 값

형태로 출력한다. 그에 따라 출력 결과가 위와 같이 나오게 된다.

 

timer() 함수

interval() 과 유사하지만 한 번만 실행되는 함수이며, 발행되는 데이터는 interva() 함수의 첫 번째 값인 0L을 발행한다.

일정 시간 후 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생한다.

 

Obseravle<String> source = Observable.timer(300L, TimeUnit.MILLISECONDS)
	.map(notUsed -> {
		return new SimpleDataFormat("yyyy/MM/dd HH:mm:ss").format(new Date());
	}).subscribe(Log::it);
    
CommonUtils.sleep(1000);

 

map 에서 변수를 사용하지 않기 때문에 변수 이름을 notUsed로 명시.

300ms 가 지난 후 현재 시간을 출력하도록 구현한 코드이다.

 

range() 함수

주어진 값(n) 부터 m개의 Integer 객체를 발행한다.

interval, timer의 경우 Long 객체를 발행했지만, range의 경우 Integer 객체를 발행한다.

또한, interval, timer는 계산 스케줄러에서 실행되지만 range는 현재 스레드에서 실행된다.

따라서, range 함수는 반복문(for, while문)을 대체할 수 있다.

 

Observable<Integer> source = Observable.range(1,10)
	.filter(num -> num % 2 == 0)
	.subscribe(Log::i);

// 계산 스케줄러가 아닌 현재 스레드에서 실행되기 때문에 sleep은 사용하지 않는다.
// CommonUtils.sleep(1000);


// 출력 결과
main | value = 2
main | value = 4
main | value = 6
main | value = 8
main | value = 10

 

intervalRange() 함수

interval 과 range를 혼합해놓은 함수.

interval 처럼 일정한 시간 간격으로 값을 출력 하지만, range 처럼 시작 숫자(n)로 부터 m개 만큼의 값만 생성하고 onComplete를 발생한다.

즉, interval 처럼 무한하게 값을 발행하지 않는다.

 

Observable<Long> intervalRange(
	long start, long count, long initialDelay, long period, TimeUnit unit) { ... }

 

intervalRange의 인자는 interval의 인자 3개와 range의 인자 2개를 합친 5개의 인자를 받아서 사용한다.

각 인자의 이름에 따라 interval, range와 동일하게 동작한다.

 

* interval 함수에 take 함수를 조합하면 intervalRange 함수를 만들 수 있다.

take 함수는 n개 만큼의 값만 발행시키는 역할을 하므로 range와 동일하게 동작하도록 구현이 가능하다.

 

Observable<Long> inter = Observable.interval(100L, TimeUnit.MILLISECONDS)
	.map(num -> num+1)
	.take(5)
	.subscribe(Log::i);
    

Observable<Long> interR = Observable.intervalRange(1, 5, 100L, 100L, TimeUnit.MILLISECONDS)
	.subscribe(Log::i);

 

위 두개의 Observable의 발행 값은 동일하다.

 

defer() 함수

timer() 와 비슷하지만, 데이터 흐름 생성을 구독자가 subscribe() 를 호출할 때 까지 미룰 수 있다.

timer는 계산 스케줄러이지만 defer는 현재 스레드에서 실행된다.

 

구독자가 subscribe를 호출하면 그때 해당 데이터 흐름을 그대로 발행하는 차가운 Observable 이다.
즉, defer 함수를 활용하면 subscribe 함수를 호출할 때의 상황을 반영하여 데이터 흐름의 생성을 지연하는 효과를 보여준다.

repeat() 함수

단순히 반복 실행을 하는 함수.

주로 서버가 잘 살아있는지 확인 할 때 사용한다. (ping 혹은 heart beat이라고 부른다.)

 

String[] number = {"1", "2", "3"};

Observable<String> source = observable.fromArray(number)
	.repeat(3)
	.doOnComplete(() -> Log.d("onComplete"))
	.subscribe(Log::i);
    
// 출력 결과
main | value = 1
main | value = 2
main | value = 3
main | value = 1
main | value = 2
main | value = 3
main | value = 1
main | value = 2
main | value = 3
main | debug = onComplete

 

repeat(N) 에서 인자(N)를 입력하지 않으면 영원히 반복 실행을 하며, 입력한 인자(N) 만큼 반복 실행 후 onComplete가 호출 된다.

 

doOnComplete는 이름 그대로 onComplete가 실행 되었을 때 호출되는 함수이다.

728x90