
SharedFlow란 ?
`SharedFlow`는 이름 그대로 여러 구독자가 하나의 데이터 스트림을 공유할 수 있도록 설계된 `Flow`로, 방출된 값을 모든 구독자에게 전달하는(브로드 캐스팅) 방식으로 동작한다. 즉, 하나의 데이터 흐름을 여러 코루틴이 동시에 관찰할 수 있도록 설계된 Hot Flow다.
SharedFlow의 특징
- `collect` 함수를 호출한 코루틴이 없어도 동작하며 명시적으로 종료시키지 않는 한 절대로 완료되지 않는다.
- 구독자는 기본적으로 `collect` 함수가 호출된 이후에 방출된 값을 수집한다.
- `collect` 함수를 여러번 호출해도 하나의 데이터 스트림이 유지된다.
이런 특징을 가진 SharedFlow의 동작 방식을 시간의 흐름에 따라 시각적으로 표현하면 다음과 같다.

0이 방출되는 시점엔 구독자가 없지만 `SharedFlow`는 이와 상관없이 0을 정상적으로 방출한다. 이는 `SharedFlow`가 구독자 존재 여부와 관계없이 방출자가 값을 방출할 수 있는 `HotFlow`임을 보여준다.
이후 첫 번째 `collect` 함수가 호출되면 그 시점 이후에 방출된 값인 1,2,3이 수집돼 처리된다.
반면 두 번째 `collect` 함수는 1이 방출되고 나서 호출되기 때문에 2와 3만을 수집한다. 이 처럼 SharedFlow는 수집이 시작된 이후에 방출된 값만을 수집할 수 있도록 한다.
마지막으로 `SharedFlow` 데이터 스트림 구조를 살펴보자. `collect` 일시 중단 함수가 두 번 호출되었음에도 불구하고 `SharedFlow`는 하나의 데이터 스트림을 그대로 유지하는 것을 볼 수 있다. 이는 `SharedFlow`의 핵심적인 특징으로, 여러 구독자가 동일한 데이터 스트림을 공유한다.
SharedFlow 만들고 데이터 방출하기
`SharedFlow` 인터페이스에는 캐시를 위한 `replayCache` 프로퍼티와 수집을 위한 `collect` 일시 중단 함수만 정의돼 있으며, 데이터를 방출하기 위한 함수는 존재하지 않는다.
public interface SharedFlow<out T> : Flow<T> {
...
public val replayCache: List<T>
...
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
이는 데이터의 방출하는 역할과 수집하는 역할을 명확히 분리하기 위한 설계 때문이다. `SharedFlow`는 수집만 가능한 불변 인터페이스이며, 데이터를 방출하려면 `SharedFlow`를 확장하는 가변 인터페이스인 `MutableSharedFlow`를 사용해야 한다.
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
override suspend fun emit(value: T)
public fun tryEmit(value: T): Boolean
}
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
`MutableSharedFlow`는 `emit` 일시 중단 함수와 `tryEmit` 함수를 통해 데이터를 방출할 수 있다.
`MutableSharedFlow` 객체를 생성하고 이를 변수에 할당한 후 데이터를 방출하고 처리하는 예제를 작성해 보자.
fun main() = runBlocking<Unit> {
val sharedFlow: MutableSharedFlow<String> = MutableSharedFlow()
sharedFlow.emit("collect 전에 방출되는 값")
launch {
sharedFlow.collect { value ->
println("$value 처리")
}
}
delay(1000L) // 1000밀리초간 대기
sharedFlow.emit("collect 후에 방출되는 값")
}
이 코드는 다음과 같은 순서대로 동작한다.
- `sharedFlow`에 대해 `emit("collect 전에 방출되는 값")`을 호출해 데이터를 방출한다.
- `launch` 코루틴을 생성해 `sharedFlow`에서 방출된 값을 수집해 출력하도록 한다.
- `delay(1000L)`을 호출해 1000밀리 초 동안 대기한다.
- `sharedFlow.emit("collect 후에 방출되는 값")`을 호출한다.
`sharedFlow`에 대해 `collect`를 호출한 시점 이후에 방출된 값만 `FlowCollector` 에 의해 처리되기 때문에 코드를 실행하면 다음과 같은 결과가 나온다.
// 결과
Collect 후에 방출되는 값 처리
실행 결과에서 알 수 있듯이 collect 전에 방출되는 값은 수집되지 않고 collect 후에 방출되는 값만 `FlowCollector`에 의해 처리된다. 이처럼 `SharedFlow`에 대해 `collect` 함수를 호출하면 기본적으로 호출 이후에 방출된 값만 수집된다.
replayCache
`collect` 일시 중단 함수를 호출하기 이전에 방출된 값을 수집하기 위해 `SharedFlow`는 `replayCache`라는 프로퍼티를 제공하는데, `replayCache`에는 `SharedFlow`에서 방출된 데이터가 저장된다.
public interface SharedFlow<out T> : Flow<T> {
public val replayCache: List<T>
}
`replayCache`에 데이터가 저장된 이후에 `collect` 일시 중단 함수가 호출되면 이곳에 저장된 값들이 `FlowCollect`에 전달되어 `collect` 일시 중단 함수가 호출되기 전의 데이터를 처리할 수 있게 된다.
`replayCache`의 크기는 `MutableSharedFlow`생성 시 `replay` 매개변수를 설정하면 된다. 예를 들어 `collect` 일시 중단 함수가 호출됐을 때 가장 최근에 방출된 데이터 한 개를 전달받고 싶다면 `replay = 1`을 입력하면 된다.
val sharedFlow: MutableSharedFlow<Int> = MutableSharedFlow(replay = 1)
`replayCache`의 동작을 확인하기 위해 다음 코드를 살펴보자
1. replay = 1
fun main() = runBlocking<Unit> {
val sharedFlow: MutableSharedFlow<Int> = MutableSharedFlow(replay = 1)
launch {
sharedFlow.emit(0) // 시작하자마자 0 방출 - replayCache: [0]
delay(1000L)
sharedFlow.emit(1) // 1000밀리초 후에 1 방출 - replayCache: [1]
delay(1000L)
sharedFlow.emit(2) // 2000밀리초 후에 2 방출 - replayCache: [2]
delay(1000L)
sharedFlow.emit(3) // 3000밀리초 후에 3 방출 - replayCache: [3]
}
launch {
delay(500L)
sharedFlow.collect { value -> // 500밀리초에 첫 collect 호출
println("첫 번째 collect를 통해 수집된 값: $value")
}
}
launch {
delay(1500L)
sharedFlow.collect { value -> // 1500밀리초에 둘째 collect 호출
println("두 번째 collect를 통해 수집된 값: $value")
}
}
}
이 코드는 다음과 같이 동작한다
- `MutablesharedFlow`의 `replayCache`의 크기가 1로 설정
- 1000밀리 초 간격으로 0, 1, 2, 3을 순차적으로 방출
- 500밀리 초 시점과 1500밀리 초 시점에 `MutableSharedFlow`에 대해 `collect` 일시 중단 함수 호출
각 단계를 하나씩 시각적으로 표현하면 다음과 같다.

`MutableSharedFlow`에서 각 값이 방출될 때마다 해당 값이 `replayCache`에 저장된다. 예제에서는 `replay`가 1로 설정돼 있으므로 캐시에는 항상 최근에 방출된 값 하나만 유지된다.
따라서 `[0]`이 방출되면 `replayCache`에는 `[0]`이 저장되고, 이후 1, 2, 3이 차례로 방출되면서 캐시는 각각 `[1] -> [2] -> [3]`이 저장되어 다음과 같이 출력된다.
// 결과
첫 번째 collect를 통해 수집된 값: 0
첫 번째 collect를 통해 수집된 값: 1
두 번째 collect를 통해 수집된 값: 1
첫 번째 collect를 통해 수집된 값: 2
두 번째 collect를 통해 수집된 값: 2
첫 번째 collect를 통해 수집된 값: 3
두 번째 collect를 통해 수집된 값: 3
이처럼 캐시가 모두 차면 가장 최근에 방출된 값이 저장되기 때문에 가장 최근에 방출된 값과 캐시에 저장된 값이 같아진다. 다음으로 `replay`를 2로 설정해 보자
1. replay = 2
fun main() = runBlocking<Unit> {
val sharedFlow: MutableSharedFlow<Int> = MutableSharedFlow(replay = 2)
launch {
sharedFlow.emit(0) // 시작하자마자 0 방출 - replayCache: [0]
delay(1000L)
sharedFlow.emit(1) // 1000밀리초 후에 1 방출 - replayCache: [0, 1]
delay(1000L)
sharedFlow.emit(2) // 2000밀리초 후에 2 방출 - replayCache: [1, 2]
delay(1000L)
sharedFlow.emit(3) // 3000밀리초 후에 3 방출 - replayCache: [2, 3]
}
launch {
delay(500L)
sharedFlow.collect { value -> // 500밀리초에 첫 collect 호출
println("첫 번째 collect를 통해 수집된 값: $value")
}
}
launch {
delay(1500L)
sharedFlow.collect { value -> // 1500밀리초에 둘째 collect 호출
println("두 번째 collect를 통해 수집된 값: $value")
}
}
}
`replay=2`로 설정했으므로 `replayCache`에 저장될 수 있는 최대 원소의 수는 두 개가 되어 다음과 같이 첫 번째 collect와 두 번째 collect 모두 0, 1, 2, 3 전부를 수집하는 것을 볼 수 있다.
// 결과
첫 번째 collect를 통해 수집된 값: 0
첫 번째 collect를 통해 수집된 값: 1
두 번째 collect를 통해 수집된 값: 0
두 번째 collect를 통해 수집된 값: 1
첫 번째 collect를 통해 수집된 값: 2
두 번째 collect를 통해 수집된 값: 2
첫 번째 collect를 통해 수집된 값: 3
두 번째 collect를 통해 수집된 값: 3
이 코드의 타임라인을 시각화하면 다음과 같다.

replayCache에 어떤 값이 저장됐는지 직접 확인하고 싶다면 `sharedFlow`의 `replayCache`를 출력해 보 면 된다.
fun main() = runBlocking<Unit> {
val sharedFlow: MutableSharedFlow<Int> = MutableSharedFlow(replay = 2)
sharedFlow.emit(0)
println(sharedFlow.replayCache) // [0] 출력
delay(1000L)
sharedFlow.emit(1)
println(sharedFlow.replayCache) // [0, 1] 출력
delay(1000L)
sharedFlow.emit(2)
println(sharedFlow.replayCache) // [1, 2] 출력
delay(1000L)
sharedFlow.emit(3)
println(sharedFlow.replayCache) // [2, 3] 출력
}
완료되지 않는 SharedFlow
앞서 만든 코드들을 실행하면 프로세스가 종료되지 않고 계속 실행 상태에 머문다. 이는 `SharedFlow`가 완료되지 않고 데이터를 무한히 방출하는 `Flow`이기 때문이다. `SharedFlow`의 수집을 중단하고 싶다면 `SharedFlow` 자체를 종료하는 것이 아니라 수집 중인 코루틴을 취소해야한다.
fun main() = runBlocking<Unit> {
val sharedFlow: MutableSharedFlow<String> = MutableSharedFlow()
val collectJob = launch {
sharedFlow.collect { value ->
println("MutableSharedFlow에서 방출된 값: $value")
}
}
delay(1000L)
collectJob.cancel() // launch 코루틴 취소
}
extraBufferCapacity
`MutableSharedFlow`의 버퍼 크기는 `replay`와 `extraBufferCapacity`을 더해 결정된다.
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
){
val bufferCapacity0 = replay + extraBufferCapacity
}
먼저 다음과 같이 `replay`를 0으로, `extraBufferCapacity`를 1로 두고 코드를 실행해보자.
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
// 총 버퍼의 크기 1
val sharedFlow = MutableSharedFlow<String>(
replay = 0,
extraBufferCapacity = 1
)
launch {
sharedFlow.collect { value ->
delay(1000L) // 처리에 1000밀리초 걸림
println("[${getElapsedTime(startTime)}] $value 처리 완료")
}
}
yield()
println("[${getElapsedTime(startTime)}] 데이터1 방출 시작")
sharedFlow.emit("데이터1")
println("[${getElapsedTime(startTime)}] 데이터1 방출 완료")
println("[${getElapsedTime(startTime)}] 데이터2 방출 시작")
sharedFlow.emit("데이터2")
println("[${getElapsedTime(startTime)}] 데이터2 방출 완료")
println("[${getElapsedTime(startTime)}] 데이터3 방출 시작")
sharedFlow.emit("데이터3")
println("[${getElapsedTime(startTime)}] 데이터3 방출 완료")
}
fun getElapsedTime(startTime: Long): String =
"지난 시간: ${System.currentTimeMillis() - startTime}밀리초"
이 경우 데이터2는 버퍼에 저장되기 때문에 데이터 1의 처리 완료를 기다리지 않고 방출 완료된다.
// 결과
[지난 시간: 0밀리초] 데이터1 방출 시작
[지난 시간: 0밀리초] 데이터1 방출 완료
[지난 시간: 0밀리초] 데이터2 방출 시작
[지난 시간: 0밀리초] 데이터2 방출 완료 // 데이터 1의 처리 완료를 기다리지 않고 방출 완료됨
[지난 시간: 0밀리초] 데이터3 방출 시작
[지난 시간: 1000밀리초] 데이터1 처리 완료
[지난 시간: 1000밀리초] 데이터3 방출 완료
[지난 시간: 2000밀리초] 데이터2 처리 완료
[지난 시간: 3000밀리초] 데이터3 처리 완료
다음으로 `replay=1, extraBufferCapacity=1`로 두고 코드를 실행하면 버퍼의 크기가 2가 되기 때문에 모든 데이터가 지연없이 방출되는 것을 확인할 수 있다.
// 결과
[지난 시간: 6밀리초] 데이터1 방출 시작
[지난 시간: 11밀리초] 데이터1 방출 완료
[지난 시간: 11밀리초] 데이터2 방출 시작
[지난 시간: 11밀리초] 데이터2 방출 완료
[지난 시간: 11밀리초] 데이터3 방출 시작
[지난 시간: 14밀리초] 데이터3 방출 완료
[지난 시간: 1018밀리초] 데이터1 처리 완료
[지난 시간: 2025밀리초] 데이터2 처리 완료
[지난 시간: 3028밀리초] 데이터3 처리 완료
`replay`와 `extraBufferCapacity`는 둘 다 `SharedFlow`의 버퍼 공간에 영향을 주지만 그 목적이 다르다.
| 대상 | replay | extraBufferCapacity |
| 목적 | 이전 값을 전달하기 위한 캐시 | 소비 속도가 느린 구독자를 위한 버퍼 공간 |
| 대상 | 새 구독자 | 기존 구독자 |
| 저장되는 값 | 구독 전에 방출된 최근 값 | 구독자가 처리하지 못한 값 |
| 예시 | `replay = 1`이면 새 구독자는 최근 값 1개를 즉시 받음 | `extraBufferCapacity = 1`이면 수집자가 바빠도 값 1개를 추가로 저장 가능 |
이제 남은 것은 방출자가 버퍼에 값을 저장할 수 없는 상황에서 어떤 일이 일어나는지이다. `MutableSharedFlow`는 값을 방출하기 위해 `emit`과 `tryEmit` 함수를 제공한다. 두 함수 모두 값을 방출한다는 목적은 같지만 버퍼가 가득 찼거나 구독자가 값을 처리할 준비가 되지 않은 상황에서 동작 방식이 다르다.
emit vs tryEmit
`emit`은 일시 중단 함수로, 구독자가 있고 버퍼에 더 이상 값을 저장할 공간이 없다면 값을 방출할 수 있는 상태가 될 때까지 호출한 코루틴을 일시 중단하며 이후 구독자가 이전 값을 처리하거나 버퍼에 공간이 생기면 다시 재개되어 값을 방출한다.
구독자가 없는 상황에서 `emit` 함수가 어떻게 동작하는지 살펴보자.
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val sharedFlow = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 0)
println("[${getElapsedTime(startTime)}] 데이터1 방출 시작")
sharedFlow.emit("데이터1")
println("[${getElapsedTime(startTime)}] 데이터1 방출 완료")
println("[${getElapsedTime(startTime)}] 데이터2 방출 시작")
sharedFlow.emit("데이터2")
println("[${getElapsedTime(startTime)}] 데이터2 방출 완료")
println("[${getElapsedTime(startTime)}] 데이터3 방출 시작")
sharedFlow.emit("데이터3")
println("[${getElapsedTime(startTime)}] 데이터3 방출 완료")
}
새 구독자를 위한 `replay` 캐시도 없고, 느린 구독자를 위한 `extraBufferCapacity` 공간도 없으며 `collect`를 호출하는 구독자도 없다. 하지만 실행해 보면 `emit`은 중단되지 않고 바로 완료된다.
// 결과
[지난 시간: 0밀리초] 데이터1 방출 시작
[지난 시간: 1밀리초] 데이터1 방출 완료
[지난 시간: 1밀리초] 데이터2 방출 시작
[지난 시간: 1밀리초] 데이터2 방출 완료
[지난 시간: 1밀리초] 데이터3 방출 시작
[지난 시간: 1밀리초] 데이터3 방출 완료
`emit`은 기다릴 대상이 없으므로 바로 끝나지만 값을 받을 구독자도 없고 저장 공간도 없기 때문에 데이터는 유실된다. 그렇다면 구독자가 있는 상황에서는 어떨까? 구독자가 값을 처리하는 속도보다 방출 속도가 빠르면 `emit`은 계속 즉시 완료될 수 있을까? 이를 이해하려면 `emit`과 `tryEmit`의 차이를 살펴봐야 한다.
tryEmit
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
public fun tryEmit(value: T): Boolean
}
`tryEmit`은 일반 함수로 값을 즉시 방출하고 성공 여부를 `Boolean` 으로 반환한다. 값을 바로 방출할 수 있으면 `true`, 버퍼가 가득 차 있어 즉시 방출할 수 없으면 `false`를 반환한다. `tryEmit`이 `true`를 반환하는 대표적인 경우는 다음 두 가지다.
- 구독자가 없는 경우
- 구독자가 있고 버퍼에 여유 공간이 있는 경우
구독자가 없는 경우에는 값을 기다릴 대상이 없기 때문에 `tryEmit`은 성공한다. 다만 `replay`가 설정돼 있지 않다면 이 값은 저장되지 않고 사라진다. 즉, `true`가 반환됐다고 해서 반드시 어떤 구독자가 그 값을 수집했다는 의미는 아니다. 다음 예제를 살펴보자.
fun main() = runBlocking<Unit> {
val sharedFlow = MutableSharedFlow<String>(
extraBufferCapacity = 1
)
println("구독자가 생기기 전 tryEmit: ${sharedFlow.tryEmit("이벤트")}")
launch {
sharedFlow.collect { value ->
delay(1000L)
println("$value 처리 완료")
}
}
yield()
println("구독자가 생긴 후 첫 번째 tryEmit: ${sharedFlow.tryEmit("데이터1")}")
delay(100L)
println("구독자가 생긴 후 두 번째 tryEmit: ${sharedFlow.tryEmit("데이터2")}")
delay(100L)
println("구독자가 생긴 후 세 번째 tryEmit: ${sharedFlow.tryEmit("데이터3")}")
}
이 코드는 다음과 같이 동작한다.
- `extraBufferCapacity`를 1로 설정한 `SharedFlow`생성
- 구독자가 생기기 전에 `tryEmit`을 통해 데이터를 방출해 그 결과를 출력
- 각 데이터를 처리하는 데 1000밀리초가 걸리는 구독자를 하나 생성하고 100밀리초 간격으로 `tryEmit`을 통해 데이터를 방출
// 결과
구독자가 생기기 전 tryEmit: true
구독자가 생긴 후 첫 번째 tryEmit: true
구독자가 생긴 후 두 번째 tryEmit: true
구독자가 생긴 후 세 번째 tryEmit: false
데이터1 처리 완료
데이터2 처리 완료
결과는 다음과 같다.
- 구독자가 생기기 전에 호출한 `tryEmit`: 구독자가 없으므로 성공
- 구독자가 생긴 뒤 첫 번째 `tryEmit` : 구독자에게 곧바로 전달될 수 있으므로 성공
- 두 번째 `tryEmit`: 구독자가 아직 첫 번째 값을 처리 중이지만 `extraBufferCapacity`가 1이므로 버퍼에 저장되어 성공
- 세 번째 `tryEmit` : 호출하는 시점에는 버퍼에 이미 `데이터2`가 들어 있으므로 더 이상 값을 저장할 공간이 없어 즉시 실패하고 `false`를 반환
같은 상황에서 `emit`을 사용했다면 세 번째 값은 실패하지 않고 버퍼에 공간이 생길 때까지 코루틴이 일시 중단된다.
이 차이가 `emit`과 `tryEmit`을 구분하는 핵심이다. 따라서 반드시 방출되어야 하는 값이라면 `emit`을, 값의 유실을 감수할 수 있다면 `tryEmit`을 사용할 수 있다.
또 하나 주의할 점은 버퍼를 설정하지 않은 `MutableSharedFlow`에서의 `tryEmit` 동작이다. 기본 설정의 `MutableSharedFlow`는 `replay = 0`, `extraBufferCapacity = 0`이므로 구독자가 있는 상태에서는 값을 임시로 저장할 공간이 없다. 따라서 `tryEmit` 함수를 사용할 때는 반드시 버퍼를 설정해야 한다.
버퍼를 설정하지 않은 상태에서 구독자가 있는 `MutableSharedFlow`에 `tryEmit` 함수를 호출하면 항상 방출에 실패한다. 이를 확인하기 위해 다음 코드를 살펴보자
fun main() = runBlocking<Unit> {
val sharedFlow = MutableSharedFlow<String>()
launch {
sharedFlow.collect { value ->
delay(1000L) // 처리에 1000밀리초 걸림
println("$value 처리 완료")
}
}
yield()
println("구독자가 생긴 후 첫 번째 tryEmit: ${sharedFlow.tryEmit("데이터1")}")
delay(100L)
println("구독자가 생긴 후 두 번째 tryEmit: ${sharedFlow.tryEmit("데이터2")}")
delay(100L)
println("구독자가 생긴 후 세 번째 tryEmit: ${sharedFlow.tryEmit("데이터3")}")
}
버퍼가 설정되지 않아 버퍼에 원소를 전달할 수 없어 방출이 실패해 모든 `tryEmit` 함수가 false를 반환하는 것을 볼 수 있다.
// 결과
구독자가 생긴 후 첫 번째 tryEmit: false
구독자가 생긴 후 두 번째 tryEmit: false
구독자가 생긴 후 세 번째 tryEmit: false
참조
코틀린 코루틴 리액티브 프로그래밍: Flow와 Channel
ㄴㅇㄴㅇ
ㅇㄴㄷreplayCache
'KOTLIN' 카테고리의 다른 글
| Kotlin Koog에 대해 알아보자 (0) | 2026.05.25 |
|---|---|
| Kotlin Generic Type System (2) | 2025.04.10 |
| Abstract class vs interface in Kotlin (2) | 2025.04.01 |
| 다시 읽는 Effective Kotlin - Item39. 태그 클래스보다는 클래스 계층을 사용하라 (2) | 2025.03.23 |
| 다시 읽는 Effective Kotlin - Item33. 생성자 대신 팩토리 함수를 사용하라 (2) | 2025.03.02 |
