본문 바로가기

Language/RxJava

[RxJava] 4장. 리액티브 연산자의 활용 2 - 변환 연산자

728x90

변환 연산자

만들어진 데이터 흐름을 원하는 대로 변형할 수 있는 연산자.

 

concatMap() 함수

flatMap() 과 매우 비슷하다. 

flatMap의 경우 인터리빙(interleaving) 현상이 발생할 수 있기 때문에 데이터의 처리가 순차적으로 진행되지 않을 가능성이 있다.

하지만, concatMap의 경우 먼저 들어온 데이터 순서대로 처리하여 결과를 낼 수 있도록 보장해준다.

 

String[] num = {"1", "2", "3"};

Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
	.map(Long::intValue) // interval에서 발행되는 값은 Long 타입이므로, Integer 객체로 변환
	.map(index -> num[index])
	.take(num.length)
	.concatMap(number -> Observable.interval(200L, TimeUnit.MILLISECONDS)
		.map(notUsed -> "#" + number)
		.take(2))
	.subscribe(Log::it);

CommonUnits.sleep(1000);

// 출력 결과
Thread-1 | 200 | value = #1
Thread-1 | 400 | value = #1
Thread-2 | 600 | value = #2
Thread-2 | 800 | value = #2
Thread-3 | 1000 | value = #3
Thread-3 | 1200 | value = #3

 

interval을 사용하여 100ms 마다 Long 타입 값을 0부터 순차적으로 발행한다.

발행한 Long 타입 값을 Integer 객체로 변경하여 index 값으로 사용해 map 으로 해당 index 위치에 있는 배열 값을 발행한다.

take가 num 객체의 길이이기 때문에 num 배열에 저장 된 3개의 값을 모두 발행한다.

 

concatMap을 사용해서 200ms 마다 해당 값을 #을 붙여서 발행하는데, take가 2이기 때문에 2번씩 발행한다.

 

따라서, 출력 결과에서 볼 수 있는 것 처럼 같은 숫자가 2번씩 출력되며 발행되는 시간은 concatMap에서 선언한 interval의 시간 값 처럼 200ms 차이가 나게 된다.

 

위의 예제에서 알 수 있듯이 순차적으로 값이 발행되는 것을 볼 수 있다.

 

하지만, 위의 예제에서 concatMap을 flatMap으로 바꾸면 출력 값이 위처럼 순차적으로 나오는 것을 보장할 수 없다.

 

	.flatMap(number -> Observable.interval(200L, TimeUnit.MILLISECONDS)
	.map(notUsed -> "#" + number)
	.take(2))
        

// 출력 값
Thread-1 | 200 | value = #1
Thread-2 | 300 | value = #2
Thread-1 | 400 | value = #1
Thread-1 | 400 | value = #3
Thread-2 | 500 | value = #2
Thread-3 | 600 | value = #3

 

스레드의 번호도 일정하지 않고, 값 모두 순차적이지 못하다.

 

모든 값을 발행할 때 까지의 걸리는 시간은 flatMap이 짧은데, interleaving을 허용하기 때문이다.

 

 

switchMap() 함수

concatMap 함수가 interleaving이 발생할 수 있는 상황에서의 동작 순서를 보장해준다면, switchMap 함수는 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단한다.

또한, 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용한다. 이는 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문이다.

 

String[] num = {"1", "2", "3"};

Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
	.map(Long::intValue) // interval에서 발행되는 값은 Long 타입이므로, Integer 객체로 변환
	.map(index -> num[index])
	.take(num.length)
	.doOnNext(Log::dt) // 중간 결과를 확인할 수 있도록 로그를 발행
	.swtichMap(number -> Observable.interval(200L, TimeUnit.MILLISECONDS)
		.map(notUsed -> "#" + number)
		.take(2))
	.subscribe(Log::it);

CommonUnits.sleep(1000);

// 출력 결과
Thread-1 | 200 | debug = 1
Thread-1 | 300 | debug = 2
Thread-1 | 400 | debug = 3
Thread-3 | 600 | value = #3
Thread-3 | 800 | value = #3

 

concatMap에서 사용한 예제에서 concatMap을 switchMap으로 바꾸고, doOnNext를 추가하여 중간에 로그를 발행시켜 진행 순서를 확인할 수 있게 하였다.

100ms 간격으로 num에 있는 값들이 발행된다. 하지만, debug = 1 을 발행하고 #1을 발행하기까지는 200ms 의 지연시간이 존재한다.

즉, 200ms 에 1이 발행되었다면, #1은 400ms에 발행되어야 정상적인 로직이지만, 300ms에 2의 값이 발행된다.

중간에 다른 값이 발행되면 기존에 진행중이던 1의 작업을 중단하고 2를 #2로 발행하기 위한 작업을 진행한다.

2도 마찬가지로 500ms가 되기전에 400ms에 4가 발행되므로 2의 값 또한 같은 이유로 작업이 중단되므로 600ms가 되어야 #3이 발행되게 된다.

또한, take(2) 가 있으므로 800ms에 한번 더 #3이 발행되고 해당 함수가 끝나게 된다.

위의 예제에서 확인할 수 있는 것 처럼 switchMap을 사용하면 마지막에 들어온 데이터의 처리는 보장되게 된다.

 

 

 

groupBy() 함수

groupBy 함수는 어떤 기준으로 단일 Observable을 여러 개로 이루어진 Observable 그룹으로 만든다.

 

String[] value = {"1a", "1b", "2a", "2b"};

Observable<GroupObservable<String, String>> source =
	Observable.fromArray(value).groupBy(CustomUtils::getValue);
    
source.subscribe(obj ->
	{obj.subscribe(
		val -> System.out.println("Group : "+ obj.getKey() + " , Value : " + val));
	});
    

// CustomUtils.getValue()

public static String getValue(String object) {
	if(object == null || obejct.equals("")) return "No Type";
    if(object.endsWith("a")) return "A type";
    if(object.endsWith("b")) return "A type";
    return "Default";
}


// 출력 값
Group : A Type , Value : 1a
Group : B Type , Value : 1b
Group : A Type , Value : 2a
Group : B Type , Value : 2b

 

위의 getValue() 메서드는 임의로 구현한 것이다.

groupBy 메서드는 Observable 그룹을 만드는데, GroupedObservable을 인자로 넣어서 사용하면 된다.

GroupedObservable은 Observable과 동일하지만, getKey() 메서드를 기본적으로 제공하며, 해당 메서드를 통하여 구분된 그룹을 알 수 있다.

 

String 배열 value 값을 Observable에 넣고, getValue에서 return 되는 값에 Observable 그룹을 나눈다.

getValue에서는 String 배열의 a,b를 판단하여 그룹을 나누고, Observable을 구독하여 group과 해당 값을 출력하면 제대로 그룹핑이 되었는지 확인할 수 있다.

 

주로, filter를 통해 구하고자 하는 값들만 필터링하여 발행하는 형태로 사용한다.

 

source.subscribe(obj ->
	{obj.filter(val -> obj.getKey().equals("A Type"))
		.subscribe(
			val -> System.out.println("Group : "+ obj.getKey() + " , Value : " + val));
	});

 

 

Map() : 1개의 데이터를 다른 값이나 다른 타입으로 변환
flatMap() : 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장
groupBy() : 여러 개의 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수를 생성

scan() 함수

reduce() 함수와 비슷하다. reduce()는 모든 데이터가 입력된 후 마지막 1개의 데이터만을 구독자에게 발행한다.

반면, scan() 함수는 실행할 때 마다 입력 값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행한다.

 

String[] number = {"1", "2", "3"};

Observable<String> source = Observable.fromArray(number)
	.scan((num1, num2) -> num2 + "(" + num1 + ")")
	.subscribe(Log::i);
    
// 출력 결과
main | value = 1
main | value = 2(1)
main | value = 3(2(1))
728x90