본문 바로가기

카테고리 없음

DevFest Android in Korea 2024 코루틴 방탈출

 

DevFest Android in Korea 2024에 다녀왔다. 어느덧 4번째 컨퍼런스인데 이번 컨퍼런스가 유난히 잼있고 유익했었다. 사실 컨퍼런스의 내용을 온전히 이해한건 오늘이 처음인데 게임 개발자에서 안드로이드 개발자가 되신 연사자분의 스토리부터 컴포즈와 파이어 베이스를 활용해 서버 드리븐으로 앱을 개발하는 방법 등 정말 흥미롭고 다채로운 내용이 많았다. 이벤트로 코루틴 방탈출을 준비해주셨는데 문제는 이거 하느라 정작 가장 듣고 싶었던 현우님의 세션을 듣지 못했다(말이 방탈출이겠지 했는데 진짜 탈출 못할줄 몰랐다). 분명 정세영님의 코틀린 - 코루틴의 정석을 정독하며 한번씩 다 접했던 내용이었는데 ... 그래서 문제를 다시 풀면서 개념을 정리해봐야겠다.

🪑 Step 0

class Step0 {

    @Test
    fun `연습 문제`() = runTest {
        val job = launch(Dispatchers.IO) {
            repeat(1_000_000) {
                Thread.sleep(1_000)
                println("Not cancelled yet :(")

                // TODO: 이곳에 한 줄만 추가하여 코루틴이 정상적으로 취소되도록 만들어보세요.
                ensureActive()
            }
        }
        delay(100)
        job.cancelAndJoin()
        assertThat(job.isCancelled).isTrue()
    }
}

 

📌 코루틴 취소와 순차성

1. cancel

Job 객체에 cancel을 호출하면 코루틴을 즉시 취소되지 않는다. cancel은 Job 객체 내부의 취소 확인용 플래그를 "취소 요청됨"으로 변경해 코루틴이 취소되어야 한다는 것만 알린다. 이후 미래의 어느 시점에 코루틴의 취소 요청 여부를 체크하고 취소한다.

 

2. cancelAndJoin

코루틴 취소 이후에 실행하려는 로직이 있다면 코루틴은 취소의 순차성 보장을 위해 cancelAndJoin을 제공한다.  하지만 이 cancel과 cancelAndJoin 모두 코루틴을 바로 취소시키지 않는다. 이들은 Job 객체 내부에 있는 취소 확인용 플래그를 바꾸기만 하며 코루틴이 이 플래그를 확인하는 시점에 취소된다. 즉, 코루틴이 취소를 확인할 수 있는 시점이 없다면 취소되지 않는다.

📌 코루틴의 취소 확인 시점

코루틴은 일시 중단 지점이나 코루틴이 실행을 대기하는 시점에 취소를 확인한다. 이 시점을 만들 수 있는 세 가지 방법은 다음과 같다.

 

1. delay를 사용한 취소

delay는 일시 중단(suspend) 함수로 파라미터로 전달받은 시간 만큼 호출부의 코루틴을 일시 중단 한다. 하지만 이 방법은 작업을 특정 시간동안 강제로 일시 중단 시킨다는 점에서 효율적이지 않다.

 

2. yield를 사용한 취소

직역하면 "양보"라는 뜻으로 코루틴이 자신이 사용하던 스레드를 양보한다. 스레드 사용을 양보한다는 것은 스레드 사용을 중단하는 의미로 yield를 호출한 코루틴이 일시 중단되며 이 시점에 취소 여부를 체크한다. 하지만 이 방법 또한 스레드 사용이 양보되면서 일시 중단되는 문제가 있다. 코루틴이 경량 스레드라고 하더라도 일시 중단되는 것은 작업을 비효율적으로 만든다.

 

3. CoroutineScope.isActive를 사용한 취소

CoroutineScope는 코루틴이 활성화됐는지 확인할 수 있는 Boolean 타입의 프로퍼티인 isActivite를 제공한다. 코루틴에 취소가 요청되면 isActivity 프로퍼티의 값은 false로 바뀌며 취소되도록 만들 수 있다. 다만 이 방법 또한 취소 상태를 매번 확인해야 한다. 예를 들어 다음 코드 처럼 반복문의 조건으로 지정해줄 수 있다. 

fun main() = runBlocking {
    val whileJob: Job = launch(Dispatchers.Default) {

        // 코루틴 취소 요청 -> isActive가 false로 변경
        // 코루틴을 잠시 멈추지 않고 스레드 사용을 양보하지 않아 효율적
        while(this.isActive) {
            println("작업 중")
        }
    }
    delay(100L)
    whileJob.cancel()
}

 

4. ensureActivity를 사용한 취소

이번 문제를 통해 처음 알게된 메소드다. 현재 코루틴의 범위가 활성 상태인지 확인하는 함수로 해당 작업(Job)이 더 이상 활성 상태가 아니라면 CancellationException을 던진다. 

🪑 Step 1

class Step1Solution {

    @Test
    fun `종류별 Scope`() = runTest {
        // given
        val actual: StringBuilder = StringBuilder()

        // when
        val deferred = async {
            delay(500)
            actual.append(1)
        }
        launch {
            delay(200)
            actual.append(2)
        }
        coroutineScope {
            launch {
                delay(300)
                actual.append(3)
            }
            actual.append(4)
        }
        deferred.await()
        actual.append(5)

        // then
        val expected = "42315"

        // assert문 수정하지 마세요!
        assertHashcode(actual, expected)
    }
}

 

1. launch 

결과값이 없는 Job 객체를 반환하는 코루틴 빌더로 내부 delay와는 별개로 다음 작업을 수행할 수 있다.

 

2. async & wait

Deferred 객체를 반환하는 코루틴 빌더다. Deffered<T> 타입의 객체를 반환해 Job과 같이 코루틴을 추상화한 객체지만 코루틴으로부터 생성된 결괏값을 감싸는 기능을 추가지며 아래 코드와 같이 코루틴의 결과로 <T> 타입을 반환한다. 이 결괏값 수신 대기를 위해 await 함수를 제공하는데 await의 대상이 된 Deferred 코루틴이 실행 완료될 때까지 await 함수를 호출한 코루틴을 일시 중단한다.

fun main() = runBlocking {
    // 1. 시작 시간 기록
    val startTime = System.currentTimeMillis()

    // 2. 플랫폼1에서 등록한 관람객 목록을 가져오는 코루틴
    val participantDeferred1: Deferred<Array<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async arrayOf("James","Jason")
    }

    // 3. 결과가 수신 될 때까지 대기
    val participants1 = participantDeferred1.await()
}

 

3. coroutineScope

블록 내부의 모든 코루틴이 완료될 때 까지 현재 스레드를 차단한다.

 

앞서 살펴본 내용을 바탕으로 결과를 분석해보자.

 

deffered는 5초 delay되어 append하지 않는다. 여기서 혼동된 부분이 있다.

await를 호출하지 않았기 때문에 append되지 않은 것인가 ? delay로 인해 append되지 않은 것인가 ?

 

나는 당연히 1이 먼저 추가될 것으로 예상했다. 이유는 async 코루틴과 coroutinScope 모두 실행 즉시 실행되기 때문에 코드는 위에서 아래로 하향식으로 실행된다는 점을 생각, 위에 있는 async 코루틴이 먼저 실행될 것으로 예상했다. 하지만 몇 번을 실행해도 4가 먼저 출력되었다. 타이밍 문제일수도 있다는 생각에 100번 정도 광클하면서 반복해봤는데 모두 같은 결과였다. 내 스스론 답을 찾지 못했고 GDG 단톡방에 질문을 여쭈었다.

 

늦은 시간에 질문 드리는 것도 죄송했는데 너무 감사하게도 답변을 주셨다. 놀랍게도 각 빌더로 생성된 코루틴의 실행 속도가 원인이었다. 실행 속도를 측정해볼 생각은 꿈에도 못했는데 깜짝 놀랐다. 

코루틴의 실행 속도 : coroutineScope > launch > async
  • 다시 돌아와서 coroutineScope 내의 작업 중 delay가 없어 4는 즉시 실행된다.
    • "4"
  • 2를 추가하는 launch 코루틴의 0.2ms delay가 종료되어 2가 추가된다. 
    • "42"
  • coroutineScope 내부의 3을 추가하는 launch 코루틴의 0.3ms의 delay가 종료되어 3이 추가된다. 
    • "423"
  • 1을 추가하는 async 코루틴이 0.3ms의 delay가 종료되어 1이 추가된다. 
    • "4231"
  • await()를 호출해 내부 작업이 완료될 때 까지 스레드를 일시 중단 하지만 내부 작업이 완료된 상태이기 때문에 async 코루틴을 종료한다.
  • 모든 작업이 완료된 후에 5가 추가된다.
    • "42315"

🪑 Step 2

class Step2 {

    @Test
    fun `SharedFlow replay`() = runTest {
        // given
        val actual: StringBuilder = StringBuilder()
        val sharedFlow = MutableSharedFlow<Int>(replay = 1)

        // when
        val emitterJob = launch {
            sharedFlow.emit(1)
            delay(100)
            sharedFlow.emit(2)
            delay(100)
            sharedFlow.emit(3)
            delay(100)
            sharedFlow.emit(4)
        }

        val collectorJob1 = launch {
            sharedFlow.collect(actual::append)
        }

        delay(150)
        val collectorJob2 = launch {
            sharedFlow.collect(actual::append)
        }

        emitterJob.join()
        collectorJob1.cancelAndJoin()
        collectorJob2.cancelAndJoin()

        // then
        val expected = "1223344" // TODO: 결과값 예상
        /*
            TODO: 간단한 풀이과정 작성
         */

        // assert문 수정하지 마세요!
        assertHashcode(actual, expected)
    }
}

 

SharedFlow

여러 구독자가 동시에 같은 데이터를 공유할 수 있도록 설계된 Flow로 replay = n은 SharedFlow가 최근에 방출한 n개의 값을 새로 구독하는 수집자에게 재생하도록 설정한다. replay = 1로 설정하면 SharedFlow는 가장 최근에 방출된 1개의 값을 새로 구독하는 수집자에게 제공한다.

 

이 때 collectorJob1과 collectorJob2은 같은 SharedFlow를 구독하기 때문에 동일한 값을 수집하지만 collectorJob2은 동일한 값을 구독하는 collectorJob1보다 150ms 후에 실행된다. 이 때 replay = 1 로 선언되어 있기 때문에 항상 최신 값만을 전달하기 때문에 150ms dalay된 후 구독을 시작하는 collectorJob2에선 1이 유실된다.

🪑 Step 3

class Step3Solution {

    @Test
    fun `코루틴 예외 전파`() = runTest {
        // given
        val actual: StringBuilder = StringBuilder()

        // when
        val job = launch {
            try {
                launch {
                    delay(150)
                    actual.append(1)
                }
                supervisorScope {
                    val deferred = async {
                        delay(100)
                        throw RuntimeException("E2")
                    }
                    launch {
                        delay(200)
                        throw RuntimeException("E3")
                    }
                    deferred.await()
                }
            } catch (e: Exception) {
                actual.append(e.message)
            }
        }
        job.join()

        // then
        val expected = "E21"

        // assert문 수정하지 마세요!
        assertHashcode(actual, expected)
    }
}

 

코루틴의 기본 구조로 코루틴 안에 launch, async 등의 코루틴 빌더를 통해 자식 코루틴을 생성할 수 있으며 최상위 코루틴은 부모 코루틴이 된다. 이 때 자식 코루틴에서 예외가 발생하면 이 예외가 부모 코루틴에 까지 전달되는데, 이러한 구조를 코루틴의 구조화된 동시성이라 한다. 이러한 구조를 깨고 자식 코루틴에서 발생한 예외를 부모 코루틴에게 까지 전달하지 않도록 할 때 사용되는 것이 바로 supervisorScope다. 하지만 asnyc 빌더에서 발생한 예외는 supervisorScope를 사용해도 부모 코루틴까지 전달된다. 그렇기 때문에 예외 메시지로 전달된 "E2"가 먼저 추가되고 1을 추가하는 launch 코루틴이 실행되 "EC1"이 출력된다.

🪑 Step 4

class Step4Solution {

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `StateFlow와 SharedFlow`() = runTest {
        // given
        val actual: StringBuilder = StringBuilder()

        val a = MutableStateFlow(1)
        val b = MutableStateFlow(true)
        val c = MutableSharedFlow<Boolean>()

        // when
        val collectorJob = launch {
            a
                .flatMapLatest { b.filter { it } }
                .flatMapLatest { c.filter { it } }
                .onEach { actual.append(it) }
                .collect()
        }
        val emitterJob = launch {
            delay(100)
            c.emit(true)
            b.value = false
            a.value = 10
            c.emit(false)
            b.value = true
            a.value = 5
        }
        emitterJob.join()
        collectorJob.cancelAndJoin()

        // then
        val expected = "true"

        // assert문 수정하지 마세요!
        assertHashcode(actual, expected)
    }
}

flatMapLatest

flow를 최신 데이터만을 사용해 새로운 Flow로 변환할 수 있도록 만든다. flow에서 발행된 데이터를 변환하는 도중 새로운 데이터가 발행된 경우, 변환 로직을 취소하고 새로운 데이터를 사용해 변환을 수행한다. 비슷한 함수로 collectLatest를 생각하면 이해가 쉬울 것 같다.

 

collectorJob

  • a.flatMapLatest { b.filter { it } }:
    • a의 값을 기준으로 flatMapLatest 연산자를 사용해 b를 필터링한다.
    • a의 값이 변경될 때마다 가장 최근의 b 흐름만 유지한다.
    • b.filter { it }는 b가 true일 때만 흐름을 유지하도록 필터링한다.
  • .flatMapLatest { c.filter { it } }:
    • b의 값이 true인 경우에만 c의 흐름으로 넘어간다.
    • c.filter { it }는 c가 true일 때만 값을 방출하도록 필터링한다.
  • .onEach { actual.append(it) }:
    • c에서 필터된 값이 있을 때마다 actual에 값을 추가한다.

emitterJob

  • c.emit(true): c에 true 값을 방출한다.
  • b.value = false: b의 값을 false로 변경한다. 이는 이후의 흐름에서 b가 false일 때 c의 값을 수집하지 않도록 만든다.
  • a.value = 10: a의 값을 10으로 변경한다. 이로 인해 flatMapLatest가 재실행된다.
  • c.emit(false): c에 false 값을 방출한다.
  • b.value = true: b의 값을 다시 true로 변경한다.
  • a.value = 5: a의 값을 5로 변경합니다.

마무리

주최자분께서 시작전 이런 말씀을 하셨다. 그 동안 코루틴을 그냥 viewModelScope 열고 디스패처 달고 사용하지 않았냐고... ㅎㅎㅎㅎ 정곡을 제대로 찔렸는데 진짜 문제 풀면서 하나도 모르겠더라. 이번 기회에 다시한번 코루틴을 공부해야겠다고 느겼다. 사실 이전에 조세영님이 운영하시는 채팅방에서 인프런에 코루틴 강의를 오픈하셨는데 채팅방 인원 대상으로 한정 인원으로 강의를 무료로 제공해주셔서 냅다 달려가서 바로 받았다. 코틀린 코루틴의 정석은 내가 정말 보고 싶어서 예약 구매까지 했던 책인데 돌아보니 한번 정독한걸론 택도 없는 것 같다. 온라인 강의도 무료로 받았으니 더욱 열심히 해야겠다. :) 

방탈출 프로젝트 GitHub