본문 바로가기

Android/Network

[Android] SSE를 사용하여 실시간 통신하는 방법.

728x90

회사에서 업무를 진행하다, SSE를 사용하여 통신을 해야 하는 케이스가 추가되었다.

 

Websocket를 통해서 실시간 통신을 구현해 본 경험은 있지만,

SSE를 사용해 본 경험은 없었기 때문에 이를 적용하기 위한 과정과 적용하는 간단한 샘플 코드를 작성해보고자 한다.


우선,

SSE란 무엇인가부터 알아보자.

 

SSE란,

Server-Sent Events의 약자로
서버에서 클라이언트로 단방향 실시간 데이터를 전송하는 웹 기술이다.

 

즉,

서버에서 클라이언트로 일방통행 통로를 하나 뚫어주고, 그 통로를 통해서 계속해서 데이터를 전달할 수 있다는 것이다.

 

필자가 맨 처음, WebSocket은 구현해 봤지만 SSE는 처음이다.라고 말한 이유는

WebSocket도 실시간 데이터를 전송하는 기술이지만, 이는 양방향 통신이 가능한 기술이기 때문이다.

 

그럼 그냥 더 상위 버전으로 보이는 WebSocket을 사용하면 되는데 왜 SSE를 사용하나? 싶겠지만

당연하다시피 각 기술마다의 장단점이 존재하고, 단방향 통신만 지원해도 되는 기능에 한해서는 SSE를 사용하는 것이 더 효율적이기 때문이다.

 

필자도 마찬가지로, 양방향 통신이 필요하지 않고 일방적으로 데이터를 수신하여 처리해야 하는,

실시간 알림이나 뉴스 피드와 같은 기능이 추가가 되어 이를 처음 사용하여 구현해 보았다.

 

SSE는 HTTP 프로토콜 유지하면서 데이터를 전송하는 기술이므로 우리가 늘 사용하던 okHttp를 사용하고 있기 때문에 해당 기능을 okHttp에서 지원하는지부터 확인해 보았다.

 

okHttp github에 들어가면 다음과 같은 라이브러리를 사용하여 구현할 수 있다고 나와있다.

testImplementation("com.squareup.okhttp3:okhttp-sse:4.12.0")
Experimental support for server-sent events. API is not considered stable and may change at any time.

 

이와 같은 문구와 함께 말이다.

즉, 해당 SSE 제공 API는 실험적인 API 이므로 언제나 변경될 수 있다는 것이다.

 

그래서 해당 라이브러리를 사용하여 어떻게 구현을 해야 하는가? 하고 찾아보다가, 해당 라이브러리를 사용하여 보다 편하게 SSE 구현을 도와주는 라이브러리를 찾았고, 해당 라이브러리를 사용하여 구현하는 방식으로 처리하였다.

물론 직접 okHttp의 라이브러리를 사용하여 구현하는 것이 좋겠지만, 시간 상 보다 빠르고 간편하게 구현할 수 있는 방법을 선택하게 되었다. 

implementation("com.launchdarkly:okhttp-eventsource:3.0.0")

 

4.1.1 버전이 가장 최신 버전이지만, 3.0.0 버전 이후로 변경된 함수를 사용하는데 필자는 정상적으로 원하는 기능이 동작하지 않아서, 이전 버전인 3.0.0 버전을 사용하였다.

 

3.0.0 버전과 4.1.1 버전의 큰 차이점은,

4.0.0 버전부터 

com.launchdarkly.eventsource.background

 

해당 패키지를 사용하는 것으로 변경되어

EventHandler -> BackgroundEventHandler,

EventSource -> BackgroundEventSource

로 변경이 되었고, 그에 따라 몇 가지 함수의 사용 방법이 변경되었다.

 

필자는 4.1.1 버전에서 동작을 하도록 수정을 하였으나, 생각보다 자주 문제가 발생하여 이를 해결하기보다는 버전을 조금 낮춰서 사용하는 것으로 수정하였다.

물론, 다른 라이브러리였다면 문제가 발생하는 지점을 더 깊게 찾아보고 수정해 사용하는 게 좋았겠지만, SSE 자체가 실험적인 라이브러리기 때문에 다음 버전이 나오게 되면 많은 버전이 변경될 수 있을 것이라 생각하고 우선 버전을 낮춰서 사용하는 것으로 선택하였다.

 

이제 위에 선언한 라이브러리 버전을 사용하여 구현을 해보도록 하자.

구현하는 방법을 찾아보면 SSE 기능은 3가지 단계로 작업이 이루어진다.

  1. 데이터를 전달해 줄 서버와 연결
  2. 데이터를 수신받음
  3. 서버와의 연결 해제

okHttp를 사용한 통신이기 때문에 당연하다시피 이와 같은 http 통신 flow를 따를 것이다.

 

그럼 우선 1번인 서버와의 연결하는 로직을 구현해 보도록 하자.

EventSource.Builder(
    eventHandler,
    URI.create(SERVER_URL)
)
    .reconnectTime(3, TimeUnit.SECONDS)
    .build()
    .start()

 

서버와의 연결은 아주 간단하다.

EventSource Class는 라이브러리에서 제공해 주는 클래스로, 인스턴스는 항상 EventSource.Builder를 사용하여 구성되고 생성된다고 한다.

 

관련한 내용은 해당 클래스에 들어가면 이렇게 자세히 나와있으니 한 번쯤 읽어보는 것을 추천한다.

어떻게 사용하는 것인지 자세히 나와있다.

 

해당 클라이언트는 생성될 때 비활성화 된 상태로 생성되므로, start를 통해 실행시켜주어야 한다는 것과 마찬가지로, builder를 통해 생성하고, 마지막에 start를 통해 실행시켜 주면 된다.

 

어떻게 생성하는지 알았으니, connect 로직을 추가해 보도록 하자.

val startConnection = remember {
    {
        coroutineScope.launch(Dispatchers.IO) {
            try {
                eventSourceHolder.value =
                    EventSource.Builder(
                        eventHandler,
                        URI.create("https://stream.wikimedia.org/v2/stream/recentchange")
                    )
                        .reconnectTime(3, TimeUnit.SECONDS)
                        .build()
                eventSourceHolder.value?.start()
            } catch (e: Exception) {
                Log.e("SSE", "Error starting connection: ${e.message}")
                withContext(Dispatchers.Main) {
                    uiState.value = uiState.value.copy(
                        messageList = uiState.value.messageList +
                                SSEMessage.Error("연결 시작 에러: ${e.message}")
                    )
                }
            }
        }
    }
}

 

사용한 URL은 위키피디아의 실시간 업데이트를 받아올 수 있는 SSE Server이다.

SSE 예제를 만들 때 오픈된 API를 사용하려고 검색해 보았는데, 사용할 수 있는 데이터가 있어서 사용했을 뿐이니, 필요에 따라 url 주소를 변경하여 사용하길 바란다.

 

우선, 해당 Connect 로직을 별도의 함수로 빼내어 사용한 것이 아닌 remember를 사용한 변수에 선언한 것을 볼 수 있을 것이다.

이거는 선택 사항이긴 하지만, 필자는 remember를 사용하여 reComposition이 발생할 때 상태를 그대로 유지하기 위해서 이와 같이 사용하였다.

remember 내부에 선언된 람다 함수는 다른 데이터에 영향을 받지 않도록 구현을 했기 때문에 startConnection은 변경되지 않는다. 그렇기 때문에 해당 데이터는 재구성이 발생하더라도 데이터를 유지하게 되어, 동일한 연결을 유지할 수 있는 것이다.

 

물론, 해당 부분을 함수로 빼내어 사용해도 전혀 다른 점은 없다.

fun startSSEConnection(
    eventSourceHolder: MutableState<EventSource?>,
    eventHandler: EventHandler,
    coroutineScope: CoroutineScope,
    uiState: MutableState<SSEUIState>
) {
    coroutineScope.launch(Dispatchers.IO) {
        try {
            eventSourceHolder.value =
                EventSource.Builder(
                    eventHandler,
                    URI.create("https://stream.wikimedia.org/v2/stream/recentchange")
                )
                    .reconnectTime(3, TimeUnit.SECONDS)
                    .build()
            eventSourceHolder.value?.start()
        } catch (e: Exception) {
            Log.e("SSE", "Error starting connection: ${e.message}")
            withContext(Dispatchers.Main) {
                uiState.value = uiState.value.copy(
                    messageList = uiState.value.messageList +
                            SSEMessage.Error("연결 시작 에러: ${e.message}")
                )
            }
        }
    }
}

 

이와 같이 함수로 따로 빼내어 선언한 후, 해당 부분을 동일하게 호출해 주면 된다.

 

필자가 remember를 사용한 변수로 선언하여 사용한 이유는, 단순히 이렇게 선언하여 사용한 적이 많이 없었기 때문에 한번 해보았고, 이게 되네?라는 느낌을 받았기 때문에 남겨둔 정도의 가벼움이기 때문에 편하게 선언하여 사용하면 될 것이다.

 

여기서 eventHandler 부분은 우리가 통신하는 동안 데이터를 받을 때의 처리를 넣어주면 되므로, 해당 부분은 지금은 넘어가도록 하자.

 

이렇게 선언한 다음에, 필요할 때 해당 변수 혹은 함수를 호출하면 된다.

startConnection()

 

혹은

startSSEConnection(
    eventSourceHolder,
    eventHandler,
    coroutineScope,
    uiState
)

 

이렇게 말이다.

 

이렇게 연결을 해보았으니,

우선 비슷하고 간단한 연결을 끊는 부분을 구현해 보도록 하자.

 

연결을 끊으려고 해 보니, EventSource.Builder 객체가 필요하고, 해당 객체에서 단순하게 .close만 선언해 주면 되었다.

그런데 필자는 connect를 할 때 추가적으로 접근이 가능한 변수를 선언하지 않고 사용했기 때문에 Builder 객체를 우선 remember 변수에 넣어주도록 하자.

val eventSourceHolder = remember { mutableStateOf<EventSource?>(null) }

...

eventSourceHolder.value = EventSource.Builder(...)

 

이와 같은 형식으로 말이다.

 

빌더를 eventSourceHolder라는 변수에 넣었으므로, 해당 변수를 사용하여 연결을 끊도록 하자.

val closeConnection = remember {
    {
        coroutineScope.launch(Dispatchers.IO) {
            try {
                eventSourceHolder.value?.close()
                eventSourceHolder.value = null
            } catch (e: Exception) {
                Log.e("SSE", "Error closing connection: ${e.message}")
            }
        }
    }
}

 

startConnection과 마찬가지로 closeConnection을 선언하여 내부 람다 함수에 로직을 구현해 준다.

아주 간단하게, eventSourceHolder.value?.close() 를 통해 연결을 끊기만 해 주면 된다.

 

시작과 끝 지점을 작업했으므로, 중간에 데이터 수신을 위한 eventHandler를 구현해 보자.

public Builder(EventHandler handler, URI uri) {
  this(handler, uri == null ? null : HttpUrl.get(uri));
}

 

연결을 위해 사용했던 EventSource의 Builder에는 다음과 같이 EventHandler를 인자로 받아주고 있다.

그리고 그 EventHandler를 다시 확인해 보면, onOpen, onClosed, onMessage, onComment, onError라는 함수를 구현할 수 있게 되어있는 것을 확인할 수 있다.

 

즉, 5가지 함수를 override 하는 EventHandler 객체를 구현하여 사용하면 된다는 것이다.

val eventHandler = remember {
    object : EventHandler {
        // 연결이 성공적으로 되었을 때 호출
        override fun onOpen() {

        }

        // 연결이 종료되었을 때 호출
        override fun onClosed() {

        }

        // 서버로부터 메시지를 수신했을 때 호출
        override fun onMessage(event: String, messageEvent: MessageEvent) {

        }

        // 서버로부터 주석을 수신했을 때 호출
        override fun onComment(comment: String) {

        }

        // 에러가 발생했을 때 호출
        override fun onError(t: Throwable) {

        }
    }
}

 

주석을 보면 알 수 있듯이, 해당하는 경우 해당 함수로 데이터를 받아올 수 있게 된다.

 

필자는 MVI Pattern에 대해서 공부하고 적용해보고 있었기 때문에, MVI 패턴을 사용하여 해당 데이터를 저장해 보도록 하겠다.

sealed class SSEMessage {
    data class Connected(val message: String = "연결됨") : SSEMessage()
    data class Comment(val message: String) : SSEMessage()
    data class Disconnected(val message: String = "연결 종료") : SSEMessage()
    data class Error(val message: String) : SSEMessage()
}

 

EventHandler를 사용하여 메시지를 받기 때문에, Message 객체를 Sealed class로 선언하였고,

data class SSEUIState(
    val messageList: List<SSEMessage> = emptyList(),
    val isConnected: Boolean = false
)

 

data class로 State 객체를 선언해 주었다.

 

MVI Pattern은 보통 Event에 따라서 Sealed class를 선언하고, 각 이벤트가 발생했을 때 UIState 객체를 통해 데이터를 변경해주어야 한다.

그런데 지금과 같은 경우 조금 섞여있는 것을 알 수 있다.

UIState에 들어가는 MessageList 인데 sealed class 객체가 들어가 있기 때문이다. 하지만 사용하는 것에는 큰 차이가 없으므로 동일하게 사용해 주도록 한다.

override fun onOpen() {
    coroutineScope.launch {
        uiState.value = uiState.value.copy(
            isConnected = true,
            messageList = uiState.value.messageList + SSEMessage.Connected()
        )
    }
}

// 연결이 종료되었을 때 호출
override fun onClosed() {
    coroutineScope.launch {
        uiState.value = uiState.value.copy(
            isConnected = false,
            messageList = uiState.value.messageList + SSEMessage.Disconnected()
        )
    }
}

// 서버로부터 주석을 수신했을 때 호출
override fun onComment(comment: String) {
    coroutineScope.launch {
        uiState.value = uiState.value.copy(
            messageList = uiState.value.messageList +
                    SSEMessage.Comment("주석: $comment")
        )
    }
}

// 에러가 발생했을 때 호출
override fun onError(t: Throwable) {
    coroutineScope.launch {
        uiState.value = uiState.value.copy(
            isConnected = false,
            messageList = uiState.value.messageList +
                    SSEMessage.Error("에러: ${t.message}")
        )
    }
}

 

List형태의 데이터이기 때문에, 간단하게 현재 messageList에 추가되는 데이터를 리스트로 추가해주기만 하면 된다.

override fun onMessage(event: String, messageEvent: MessageEvent) {
    coroutineScope.launch {
        try {
            val firstChar = messageEvent.data.firstOrNull()?.toString() ?: ""
            uiState.value = uiState.value.copy(
                collectedChars = uiState.value.collectedChars + firstChar
            )
        } catch (e: Exception) {
            uiState.value = uiState.value.copy(
                statusMessages = listOf("파싱 에러: ${e.message}")
            )
        }
    }
}

 

수신하는 메시지와 같은 경우, 한 번에 정말로 많은 데이터가 계속해서 들어오기 때문에 실시간으로 데이터를 수신한다는 것을 보여주기에는 불필요해서 첫 번째 한 글자만 가져오도록 데이터를 수정하였다.

 

여기서 전체 데이터를 확인하고 싶다면, firstChar를 사용하지 않고, messageEvent.data 데이터를 확인해 보면 될 것이다.

 

이렇게 간단하게 구현이 되었다면, 이를 보여줄 UI를 그려주도록 하자.

LazyColumn(
    modifier = Modifier
        .fillMaxWidth()
        .weight(1f)
) {
    items(uiState.value.messageList) { message ->
        when (message) {
            is SSEMessage.Connected -> StatusMessageCard(
                message = message.message,
                backgroundColor = Color.Gray
            )

            is SSEMessage.Comment -> StatusMessageCard(
                message = message.message,
                backgroundColor = Color.White
            )

            is SSEMessage.CollectedChars -> CollectedCharsCard(chars = message.chars)
            is SSEMessage.Disconnected -> StatusMessageCard(
                message = message.message,
                backgroundColor = Color.Cyan
            )

            is SSEMessage.Error -> StatusMessageCard(
                message = message.message,
                backgroundColor = Color.Red
            )
        }
    }
}

 

uiState.value.messageList는 수신되는 데이터에 따라 리스트가 계속해서 추가되기 때문에 실시간으로 LazyColumn의 리스트가 증가되는 것을 확인할 수 있을 것이다.

또한 색상도 이벤트 별로 다르게 표현하여 쉽게 확인할 수 있도록 구현을 해보았다만,

생각보다 데이터가 들어오는 속도가 너무 빨라서, 일부 코드를 수정하여 통신 중에 들어오는 데이터는 하나의 카드로만 보여주도록 코드를 수정한 후, 결과 화면을 보여주도록 하겠다.


이렇게 간단하게 SSE에 대한 구현을 해보았다.

라이브러리를 이전 버전을 사용하긴 했지만, 생각보다 정말 간단하게 SSE 구현을 할 수 있었던 것 같다.

지금까지 업무를 진행하면서 WebSocket을 구현해야 한다 라는 이야기는 많이 들었는데, SSE를 구현해서 사용해야 한다는 이야기는 처음 들어서 지금까지 한 번도 구현해 본 적 없었구나라는 것을 이제야 알게 되었다.

 

이번에 구현을 하면서 http 통신, SSE, Websocket에 대해서 다시 한번 공부하여 개념을 정리하고 구현해보면서 다시한번 네트워크 통신에 대해서 생각하게 되었다.

 

구현을 하고, 글을 쓰다 보니 생각보다 데이터가 많아서 코드를 개선하여 영상을 녹화하여 올렸는데, SSE와는 연관 없는 그냥 단순한 데이터 로직이 변경된 것이라서 해당 부분은 추가하지 않았지만, github에는 올려두었으니 필요하다면 확인하길 바란다.

 

해당 게시글에 사용한 예제는 Github에 올려두었다.

https://github.com/HeeGyeong/ComposeSample

 

GitHub - HeeGyeong/ComposeSample: This project provides various examples needed to actually use Jetpack Compose.

This project provides various examples needed to actually use Jetpack Compose. - HeeGyeong/ComposeSample

github.com

728x90