IT/Kotlin

Coroutine Flow - 1부

물통꿀꿀이 2020. 11. 21. 20:17

코틀린에서 제공하는 코루틴 중 Flow에 대해 살펴보려고 한다.

 

Flow는 이미 명명된 이름에서 짐작할 수 있듯이, 간단히 언급하면 Sequence의 비동기 버전 정도로 이해하면 될 것 같다.

좀 더 자세히 보면, Sequence는 Stream 단위의 데이터를 하나 처리하고 다음 하나 방식이지만, Flow는 Stream의 각 데이터 결과를 기다리지 않고 작업한다. (물론 값은 순차적으로 가져온다.)

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    simple().collect { value -> println(value) }
}

그럼 Flow를 사용한 코드를 작성해보면, 위와 같다. 

크게 이해하기 어려운 부분은 없으나 flow는 Flow Builder이고 emit은 yield와 같다고 생각하면 된다.

(추가로 Flow Builder를 사용하기 때문에 suspend는 생략할 수 있다.)

 

코드를 실행해보면 알겠지만, 결과를 알기전에 Flow의 특징을 살펴보면,

첫째로 Cold 이다. 

둘째로 마지막 operator가 호출될 때, 작업이 시작된다.

2가지로 나누었지만 사실 Cold의 특성을 모두 띄고 있다. (마지막 operator 실행 이후 순차적으로 작업이 시작되기 때문에)

 

이를 바탕으로 결과를 보면 단순히 1,2,3이지만 해당 결과가 비동기 Cold Stream 작업으로 나온 것임을 알 수 있다.

 

Context

Flow는 코루틴을 호출한 컨텍스트에서 동작한다. 이해하기 쉽게 코드를 보자.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") }
}
[main] Started simple flow
[main] Collected 1
[main] Collected 2
[main] Collected 3

위의 코드 실행 결과를 보면, main에서 동작한 코루틴이라는 것을 확인 할 수 있다. 즉, 다른 설정 없이 flow를 실행하면 호출자의 Context(위 코드는 메인 스레드)에서 실행된다.

 

그런데 만약, flow를 다른 Context에서 실행하려고 한다면??

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@252a8098, BlockingEventLoop@6e355149],
but emission happened in [DispatchedCoroutine{Active}@7e7b2411, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:70)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:55)

위와 같은 에러가 발생한다. (코드 상에서 withContext는 현재 Context를 전환하는 메소드이다.)

해당 에러의 원인은 flow가 Context를 보호하려는 속성때문이다. 그래서 다른 기존 Context가 아닌 다른 Context에 값을 내보낼 수 없다.

flowOn

그렇다면 flow는 다른 Context에서 사용할 수 없는 것일까? 

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
} 

flowOn을 사용하면 된다.

[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3

결과를 보면 flow의 실행 Context가 메인 스레드와 다르다는 것을 알 수 있다. 즉, Dispatchers의 정의된 값을 바탕으로 추가 코루틴에서 작업을 실행한다.

 

Reference

- kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow