IT/Kotlin

Coroutine Flow - 2부

물통꿀꿀이 2020. 11. 21. 21:00

지난 번에 포스팅한 1부 에 이어서 계속 알아보려고 한다. 

 

Buffering

flow에서 실행되는 작업이 오래 걸리는 작업일 때, collect를 수행하면 전체적으로 성능이 낮아 질 수 밖에 없다.

(각 작업을 순차적으로 실행하는 flow의 컨셉에 따라...)

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

예로 위 코드 결과는 불보듯 1200ms 이상이 걸린다. 하나 실행하는데 최소 400ms가 걸리기 때문에..

이를 개선하기 위해 등장한 operator가 Buffering이다.

특별한 것 없이 단순히 buffer().collect()와 같이 Chaining을 걸면 되는데 아래와 같다.

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

내부적으로 buffer를 살펴보면, Capacity가 있는 채널로 값을 emit하고 코루틴을 분리해서 사용한다.

이 말이 어려울 수 있는데 buffer 소개한 페이지를 들어가보면 쉽게 이해 할 수 있게 나와있다.

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }

위 코드를 보면 flow 특징상 같은 코루틴에서 순차적으로 동작한다. (아래 결과 참조)

Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .buffer()  // <--------------- buffer between onEach and collect
    .collect { println("2$it") }

반면에 buffer를 넣게 되면, 얘기가 달라진다.

P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }
                            |
                            | channel // buffer()
                           V
Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect

내부적으로 코루틴이 하나 더 생성되어 값을 channel로 보낸다. 때문에 전보다 조금 성능이 좋아진다.

여기서 P의 처리 속도가 너무 빨라서 channel의 buffer가 꽉 차면 어떨까?? 생각이 들 수도 있다. 이렇게 되면 P는 buffer가 비워질 때까지 (Q가 작업이 끝날 때까지) 잠시 대기한다.

Conflation

conflation은 buffer와 비슷하면서도 조금 다른 기능을 가지고 있다. 한 마디로 정리하면 작업이 오래걸리는 것은 버리고 빠른 것만 취급한다(?) 정도로 이해하면 될 것 같다.

즉, 메인 코루틴에서 작업이 오래 걸릴때 분리된 코루틴에서 진행되어 쌓인 작업들은 다 무시된다. (메인 코루틴이 받을 준비가 될 때, 들어온 작업만 받는다.)

val flow = flow {
    for (i in 1..30) {
        delay(100)
        emit(i)
    }
}
val result = flow.conflate().onEach { delay(1000) }.toList()

그러므로 위의 결과는 1, 10, 20, 30 만 된다.

conflation은 channel을 기준으로 emitter와 collector의 처리 속도가 차이가 있을 때, 성능을 높이는 방법이기도 하다. 하지만 몇몇 값은 drop 된다는 것을 받아들여야 한다. 그렇기 때문에 조금 다른 방법은 xxxLatest를 사용하는 것이다.

 

예를 들어, collectLatest를 보면

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

결과는 위와 같다. 메소드의 이름처럼 최종 값은 가장 마지막 값이 되겠지만 conflate와 조금 다른점이 있다면, 새로운 값이 emit 될 때마다 내부적으로 계속 재시작을 한다는 것이다.

이 말은 즉, 작업이 오래 걸리는 것이라면 해당 작업을 취소하고 새로운 값으로 작업을 대신한다는 것이다. 때문에 최종 결과값이 마지막 값이 된다.

 

Reference

- kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow