IT/Kotlin

Coroutine Flow - 4부

물통꿀꿀이 2020. 11. 22. 00:19

Flow에 대해 4부로 계속 알아가보자.

Launching Flow

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}    

지금껏 flow를 설명할 때, 위의 코드처럼 collect를 마지막 operator로 사용했었다.

그러나 전에도 알아봤지만 collect는 downstream으로 들어오는 값을 순차적으로 처리하기 때문에 계속 대기해야 한다. 물론 onEach에서  action을 먼저 한다해도 결과적으로 collect가 마지막에 있기 때문에 별 차이가 없다.

하지만.!!

 

launchIn은 다르다.

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}   

collect 대신에 launchIn을 사용하는 것인데, this라는 param에서 대충 짐작할 수 있듯이 다른 코루틴에서 실행하는 것이다.

Done
Event: 1
Event: 2
Event: 3

결과가 collect와 다르게 Done이 먼저 발생한다. 그럼에도 종료되지 않는 이유는 runBlocking이 있기 때문...

위의 간단 예시보다 좀 더 고급스럽게 쓰기 위해서는 launchIn 인터페이스 문서에서 언급한것 처럼 아래 처럼 사용하는 것이다.

flow
    .onEach { value -> updateUi(value) }
    .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
    .catch { cause -> LOG.error("Exception: $cause") }
    .launchIn(uiScope)

upstream 혹은 작업중에 발생하는 예외, 완료등을 좀 더 나이스하게 처리 할 수 있다.

Cancellation Checks

그리고 순차적으로 작업 중에 멈춰야 할 일이 발생할 수 있다. 물론 여러 방법이 있을 수 있지만 operator 하나만 사용하면 된다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

바로 cancellable인데, 해당 operator를 사용하면 내부적으로 stream에 대한 cancellation 검사를 하기 때문에 특정 조건에서 cancel 발생시, stream을 종료 할 수 있다.

 

Reference

- kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow