서론
프로젝트에서 API 호출 시 발생하는 네트워크 에러처리에 대한 클래스를 StateFlow에서 Channel에서 변경하게 되었습니다. 기존에 사용하던 StateFlow는 중복된 에러를 방출하지 않는다는 문제가 있었습니다. 단순히 API 호출 실패로 인해 화면 이동이 안된다거나, 특정 동작이 반복될 가능성이 매우 희박한 곳에선 해당 문제를 발견하지 못했지만 이후 북마크 버튼을 클릭같이 단발적으로 반복해서 발생하는 이벤트에 대해선 문제가 발생했습니다.
Channel 뜯어보기
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
Channel은 3가지 인자를 전달할 수 있습니다.
capacity는 채널의 데이터를 저장할 버퍼의 용량, onBufferOverflow는 버퍼가 가득 찼을 때의 동작을 지정할 수 있습니다. 결과적으로 Channel은 capacity와 onBufferOverFlow을 조합해 다양한 활용이 가능합니다.
Capacity
1. UNLIMITE
- Int로 표현할 수 있는 최대 값만큼 버퍼를 생성
- 데이터 소비 유무에 상관 없이 데이터를 생성하기 때문에 OOM(Out Of Memory)가 발생할 수 있습니다.
2. RENDEZVOUS
- 버퍼가 0개 즉, 버퍼가 존재하지 않음
- 데이터를 소비해야만 다음 데이터가 생성되는 형태로 쉽게 말해 생산과 소비가 교차로 일어납니다.
3. BUFFERED
- 고정된 크기의 버퍼를 사용해 데이터를 소비하지 않을 시 버퍼의 크기 만큼 데이터를 캐싱합니다.
- 주석을 살펴보면 Channel 생성 시 기본 버퍼 크기를 사용하도록 설정할 수 있습니다.
val channel1 = Channel<Int>(64) // 동일한 동작
val channel2 = Channel<Int>(Channel.BUFFERED) // 위와 같은 결과
4. CONFLATED
- 버퍼가 1개인 채널
- 이미 버퍼에 데이터가 존재한 상태에서 새로운 데이터를 생산할 경우 새로운 값으로 교체됩니다.
BufferOverFlow
SUSPEND
- 데이터를 더이상 생산하지 않고 일시 중단
DROP_OLDEST
- 가장 오래된 데이터를 제거하고 새로운 데이터를 생산(First-in First-out, FIFO)
DROP_LATEST
- 가장 최신 데이터를 제거하고 새로운 데이터를 생산(Last In, First Out, LIFO)
onUndeliveredElement
- 원소가 정상적으로 처리되지 않을 때 호출되는 콜백
- 보통 채널이 닫히거나 취소되었음을 의미하지만 send, receive, hasNext 등이 에러를 던질 때도 발생
- 보통 채널에서 자원을 닫을 때 사용
Capacity와 onBufferOverflow를 조합해동작하는 예시를 살펴보겠습니다.
1. RENDEZVOUS + SUSPEND
- onBufferOverflow가 기본값 SUSPEND인 경우 버퍼가 없는 Rendezvous 채널을 반환합니다.
fun main(): Unit = runBlocking {
val rendezvousChannel = Channel<Int>()
launch {
val sendTime = System.currentTimeMillis()
rendezvousChannel.send(1)
println("[${System.currentTimeMillis() - sendTime}ms] rendezvousChannel.send(1)")
println("끝났다!")
}
delay(2000)
val receiveTime = System.currentTimeMillis()
rendezvousChannel.receive()
println("[${System.currentTimeMillis() - receiveTime}ms] rendezvousChannel.receive()")
}
기본적으로 Rendezvous 채널은 receive로 구독하기 전까지 Suspend 되기 때문에 종료를 알리는 Prinlnt이 약 2초 후 실행됩니다.
fun main(): Unit = runBlocking {
val rendezvousChannel = Channel<Int>()
val startTime = System.nanoTime()
launch {
repeat(5) { value ->
val sendTime = (System.nanoTime() - startTime) / 1_000_000_000.0
rendezvousChannel.send(value)
println("[${String.format("%.3f", sendTime)}s] send($value)")
}
}
delay(2000)
launch {
repeat(5) {
val receiveTime = (System.nanoTime() - startTime) / 1_000_000_000.0
val received = rendezvousChannel.receive()
println("[${String.format("%.3f", receiveTime)}s] receive($received)")
}
}
}
앞서 Rendezvous에 대해 알아본 것 처럼 생산과 소비가 교차로 일어나는 것을 확인할 수 있습니다.
2. BUFFERED(5)
fun main(): Unit = runBlocking {
val dropOldestChannel = Channel<Int>(5)
val startTime = System.nanoTime()
launch {
repeat(5) { value ->
val sendTime = (System.nanoTime() - startTime) / 1_000_000_000.0
dropOldestChannel.send(value)
println("[${String.format("%.3f", sendTime)}s] send($value)")
}
}
delay(2500)
launch {
repeat(5) {
val receiveTime = (System.nanoTime() - startTime) / 1_000_000_000.0
val received = dropOldestChannel.receive()
println("[${String.format("%.3f", receiveTime)}s] receive($received)")
}
}
}
버퍼에 충분한 크기를 주었다면 데이터가 모두 캐싱되어 유실되는 값 없이 전달되는 것을 확인할 수 있습니다.
3. BUFFERED(1) + DROP_OLDEST
fun main(): Unit = runBlocking {
val dropOldestChannel = Channel<Int>(1, BufferOverflow.DROP_OLDEST)
val startTime = System.nanoTime()
launch {
repeat(5) { value ->
val sendTime = (System.nanoTime() - startTime) / 1_000_000_000.0
dropOldestChannel.send(value)
println("[${String.format("%.3f", sendTime)}s] send($value)")
}
}
delay(2000)
launch {
repeat(1) {
val receiveTime = (System.nanoTime() - startTime) / 1_000_000_000.0
val received = dropOldestChannel.receive()
println("[${String.format("%.3f", receiveTime)}s] receive($received)")
}
}
}
Channel(1, BufferOverflow.DROP_OLDEST)는 버퍼 크기가 1인 채널로, 새로운 값이 들어오면 가장 오래된 값이 삭제되며 가장 최근 값이 reveive 됩니다.
4. BUFFERED(1) + DROP_LATEST
fun main(): Unit = runBlocking {
val dropOldestChannel = Channel<Int>(1, BufferOverflow.DROP_LATEST)
val startTime = System.nanoTime()
launch {
repeat(5) { value ->
val sendTime = (System.nanoTime() - startTime) / 1_000_000_000.0
dropOldestChannel.send(value)
println("[${String.format("%.3f", sendTime)}s] send($value)")
}
}
delay(2000)
launch {
repeat(1) {
val receiveTime = (System.nanoTime() - startTime) / 1_000_000_000.0
val received = dropOldestChannel.receive()
println("[${String.format("%.3f", receiveTime)}s] receive($received)")
}
}
}
해당 채널도 버퍼 크기가 1인 채널로, 새로운 값이 들어오면 가장 최근 값이 삭제되며 가장 처음 들어온 값이 reveive 됩니다.
마무리
채널을 처음 접했을 땐 너무 어렵게 느껴져서 손대지 조차 못했었는데 계속 보다보니 매력적인 녀석인 것 같습니다. 실제로 최근 채널을 사용해서 ViewModel의 One Time Event 를 다루는 방식을 많이 사용하는 것 같은데 SharedFlow에 비교했을 때 replay 를 사용하는 방식과 유사한 것 같습니다. 무엇보다 소비하기 전 까지 값을 캐싱하는 방식이 BUFFERED 하나만 전달하면 된다니 보면 볼수록 괜찮은 녀석일지도 ...? 다음 포스팅에선 실제로 채널을 사용해 이벤트를 처리하는 방식을 소개해보겠습니다.
'KOTLIN' 카테고리의 다른 글
다시 읽는 Effective Kotlin - Item39. 태그 클래스보다는 클래스 계층을 사용하라 (2) | 2025.03.23 |
---|---|
다시 읽는 Effective Kotlin - Item33. 생성자 대신 팩토리 함수를 사용하라 (2) | 2025.03.02 |
Kotlin Value Class With Project Valhalla (4) | 2024.11.22 |
[Kotlin]Sealed Class란 무엇일까 ? (0) | 2024.03.22 |
Coroutine SharedFlow (0) | 2024.03.09 |