본문 바로가기

KOTLIN

[Kotlin] Coroutine Flow

🚨Trouble Shooting

 기존 진행하던 프로젝트를 리팩토링 하는 과정에서 two-way Binding을 사용하던 도중 Live Data의 타입을 변경할 수 있는 Transformation.map Operator가 import가 안되는 issue가 발생했습니다.

Transformations.map 
AAC( Android Architecture Components) Library Utility Method
Live Data를 사용자가 원하는 형태로 변환


 

원인을 찾던 도중 androidx.navigation Library 2.6 버전부터 LiveData 대신 Kotlin의 StateFlow를 사용하도록 권장하고 있다는 것을 알게 되었습니다. 즉, 버전이 2.6 이상인 경우 Transformations.map 함수를 사용하는 것을 중단하고, StateFlow를 사용하도록 변경해야 한다고 합니다. 그래서 State Flow에 대해 알아보기 전에 먼저 Flow에 대해 알아보겠습니다.

 

No More Transformations.map in Kotlin

In my experience when I update the version here, I got a missing function error with red bold text in "Transformations.map" //Jetpack Component implementation 'androidx.

www.linkedin.com

💻 참고 영상


 

Flow와 관련하여 좋은 영상을 발견하여 첨부합니다.

출처 : https://www.youtube.com/watch?v=D8rUDoYCZlo&t=1375s


📕 Flow란 ?

Coroutine Flow는 Coroutine 상에서 Reactive Programming을 지원하기 위한 Data Stream 입니다.

📌 Reactive Programming 

데이터가 변경될 때 이벤트를 발생시켜 데이터를 계속해서 전달하도록 만드는 프로그래밍 방식입니다. Reactive Programming에는 하나의 데이터를 발행하는 발행자가 있고 중간 연산자를 통해 전달받은 데이터를 가공하고 이를 소비하는 데이터의 소비자에게 지속적으로 데이터를 전달합니다. 

이 과정을 데이터 스트림이라고 하며 데이터 스트림은 세가지로 구성되며 이 세가지가 flow의 핵심 구성요소 입니다.

  • Producer(생산자)
  • Intermediary(중간 연산자)
  • Consumer(소비자)

예제를 통해 각 생산자, 중간 연산자, 소비자가 어떤 역할을 하는지 알아보겠습니다.

👷‍♂️ Producer(생산자)

Producer는 flow { } 블록 내부에서 emit( ) 메소드를 사용해 데이터를 생성합니다.

안드로이드에서 생산자가 가져오는 데이터의 대표적인 Data Source는 두가지입니다.

  • 서버에서 Retrofit이나 OkHttp 등을 통해 REST API를 이용해 가져오는 데이터
  • Room, SqlLite같은 Local Database에서 가져오는 데이터
interface Flow<out T>{
    suspend fun collect(collector: FlowCollector<T>)
}

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

 

Flow interface는 내부적으로 suspend Function 형태의 collect 추상 메소드를 가지고 있어 동작시 Blocking 되지 않고 suspend(일시 중단) 됩니다. 

  • collect( ) : 데이터를 수집하는 Producer의 역할
  • FlowCollector( ) : Producer에게 데이터를 수집하여 전달하고 데이터를 발행
  • emit( ) :  value 매개변수로 전달된 데이터를 Producer에게 전달
fun main(){
    val integers: Flow<Int> = flow{// Flow 블록 선언
        for (i in 1..10){
            delay(100) // suspend
            emit(i) // suspend, Producer가 데이터 발행
        }
}
  • integers 라는 1부터 10까지의 정수를 생성하는 Coroutine Flow를 생성했습니다.
  • delay()는 코루틴의 일시정지 함수로 Thread.sleep()과는 다르게 Blocking 되지 않고 suspend 됩니다.
  • emit : Producer가 데이터를 생성합니다.

👨‍💻 Intermediary (중간 연산자)

주요 메소드

  • map : 데이터를 사용자가 원하는 형태로 가공할 수 있습니다.
  • filter : 조건에 맞지 않는 데이터를 제거합니다.
  • transform : 현재 Flow를 다른 Flow로 변환합니다.
  • take : 주어진 수 만큼의 Data를 가져옵니다.
  • zip : 두 개 이상의 Flow를 결합해 새로운 Flow를 생성합니다.
  • onEach : 각 데이터 마다 수행할 작업을 정의합니다.

fun main(){
    val integers: Flow<Int> = flow{// Flow 블록 선언
        for (i in 1..10){
            delay(100) // suspend
            emit(i) // suspend, Producer가 데이터 발행
        }
    }

    runBlocking {

        integers
            .filter {
                println("Filter $it")
                it % 2 == 0
            }
            .map {
                println("Map $it")
                "string $it"
            }
    }
}
  • runBlocking { ... } : 코루틴을 블록하는 빌더로 메인 함수에서 suspend 함수를 호출하기 위해 사용했습니다.
  • filter{ ... } : Producer에게 전달 받은 데이터를 필터링합니다.
  • map{ ... } : 데이터를 가공하는 역할을 수행합니다. 현재 코드에선 정수 형태의 데이터를 문자열로 변환합니다.

👨‍💻 Consumer (소비자)

📖 주요 메소드

  • collect : Flow의 모든 이벤트를 소비합니다.
  • toList : Flow의 모든 데이터를 리스트로 수집합니다.
  • toSet : Flow의 모든 데이터를 세트로 수집합니다.
  • first : 첫 번쨰 데이터를 반환합니다.
  • reduce : Flow의 이벤트를 축소해 하나의 결과를 생성합니다.
  • fold : 초기값을 기반으로 Flow의 이벤트를 축소하여 하나의 결과를 생성합니다.
  • onCompletion : Flow의 실행이 완료되면 수행할 작업을 정의합니다.

fun main(){
    val integers: Flow<Int> = flow{// Flow 블록 선언
        for (i in 1..10){
            delay(100) // suspend
            emit(i) // suspend, Producer가 데이터 발행
        }
    }

    runBlocking {

        integers
            .filter {
                println("Filter $it")
                it % 2 == 0
            }
            .map {
                println("Map $it")
                "string $it"
            }
            .collect{
                println("Collect $it")
            }
    }
}
  • collect{ ... } : 전달된 데이터를 소비합니다. 

실행 결과


📕 결론

Flow는 비동기 데이터 스트림을 생성하고 처리할 수 있어 대량의 데이터를 처리하는데 유리하고 데이터가 처리되기 전까지 suspend 상태를 유지하므로 메모리를 효율적으로 사용할 수 있습니다.

 

Github