๐ค
๊ฐ๋ 9๏ธโฃ Coroutines Flow
Flow๋ ์์ฐจ์ ์ผ๋ก ๊ฐ์ emitํ๊ณ , (์ ์์ ์ผ๋ก ๋๋ exception์ ๋ด๋ฉด์) complete๋๋ ๋น๋๊ธฐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด๋ค.
interface Flow<out T>
flow์์ Intermediate operators
๋ผ๊ณ ๋ถ๋ฆฌ๋ (map, filter, take, zip)๊ณผ ๊ฐ์ ๊ฒ๋ค์ upstream flow์ ์ ์ฉ๋๋ ํจ์์ด๋ฉฐ, ๋ค์ operator๋ค์ด ์ ์ฉ๋ ์ ์๋ downstream flow๋ฅผ ๋ฐํํ๋ ํจ์์ด๋ค. Intermediate operation์ flow์์ ํน์ ์ฝ๋๋ฅผ ์คํํ์ง๋ ์๊ณ , ํด๋น operation ์์ฒด๊ฐ suspending ํจ์์ธ ๊ฒ๋ ์๋๋ค. Intermediate operation์ ๋ฏธ๋์ execution์ ์ํ ์ฐ์ operation์ ์
์
ํด์ฃผ๊ณ ์ฌ๋นจ๋ฆฌ ๋ฆฌํดํด์ฃผ๊ธฐ๋ง ํ๋ค. ์ด๋ฌํ ํน์ฑ์ด ๋ฐ๋ก cold flow
ํ๋กํผํฐ์ ํน์ฑ์ด๋ฉฐ, Intermediate operator๋ cold flow ํ๋กํผํฐ์ ํด๋นํ๋ค.
flow์๋ Terminal operators
๋ผ๊ณ ๋ถ๋ฆฌ๋ suspending ํจ์๋ค(collect, single, reduce, toList)๊ณผ launchIn ์ฐ์ฐ์(์ฃผ์ด์ง scope์์ collection flow๋ฅผ ์์ํด์ค)๊ฐ ์๋ค. Terminal operator๋ upstream flow์ ์ ์ฉ๋๊ณ , ๋ชจ๋ operation์ execution์ ํธ๋ฆฌ๊ฑฐํด์ค๋ค. flow๋ฅผ executionํด์ฃผ๋ ๊ฒ์ ๋ค๋ฅธ ๋ง๋ก flow๋ฅผ collectํ๋ค๊ณ ๋ ํ๋๋ฐ, ์ด ํ์๋ ์ค์ ๋ก blocking ์์ด suspendingํ๋ ๋ฐฉ์์ผ๋ก ์ํ๋๋ค. Terminal operator๋ ์ ์์ ์ผ๋ก complete๋๊ฑฐ๋, exeception ๋๋ฉด์ complete๋๋๋ฐ, *upstream**์์ ๋ชจ๋ flow ์ฐ์ฐ๋ค์ด succesfulํ๋ฉด ์ ์์ ์ผ๋ก complete๋๋ ๊ฑฐ๊ณ failedํ๋ฉด exception๋๋ฉด์ complete๋๋ค. ์์์ ๋งํ terminal operator๋ค ์ค์ ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ operator๋ collect
์ด๋ค.
try{
flow.collect{ value ->
println("Received $value")
}
} catch (e: Exception) {
println("the flow has thrown an exceptoin: %e")
}
๋ํดํธ๋ก flow๋ค์ ์์ฐจ์ ์ด๊ณ , ๊ฐ์ ์ฝ๋ฃจํด ์์์๋ ๋ชจ๋ flow ์ฐ์ฐ๋ค์ ์์ฐจ์ ์ผ๋ก ์คํ๋๋ค. ํ์ง๋ง buffer์ flatMapMerge์ ๊ฐ์ด flow์ concurrency๋ฅผ ๋์
ํ๋๋ก ์ค๊ณ๋ operation๋ค์ ์์ฐจ์ ์ผ๋ก ์คํ๋์ง ์๋๋ค.
Flow
์ธํฐํ์ด์ค๋ cold stream์ธ์ง hot stream์ธ์ง์ ๋ํด์๋ ์ด๋ ํ ์ ๋ณด๋ ๊ฐ์ง๊ณ ์์ง ์๋ค.
๐ Cold stream์ด๋?
โก๏ธ collect๋ ๋๋ง๋ค ๋งค๋ฒ ๋ฐ๋ณต์ ์ผ๋ก collected๋๊ณ ๋์ผํ ์ฝ๋๋ฅผ ์คํํ๋ ๋ฐฉ์
๐ Hot stream์ด๋?
โก๏ธ collect๋ ๋๋ง๋ค ๊ฐ์ running source๋ก๋ถํฐ ๋ค๋ฅธ ๊ฐ๋ค์ emitํ๋ ๋ฐฉ์
๋ณดํต flow๋ cold stream์ด์ง๋ง, hot stream์ธ subtype(ex. SharedFlow)๋ ์๋ค. flow๋ stateIn๊ณผ shareIn ์ฐ์ฐ์๋ฅผ ํตํด hot stream์ผ๋ก ์ ํ๋ ์ ์์ผ๋ฉฐ, produceIn ์ฐ์ฐ์๋ฅผ ํตํด hot channel๋ก ์ ํ๋ ์๋ ์๋ค.
Flow builder
flow๋ฅผ ๋ง๋๋ ค๋ฉด ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ์๋ค์ ์ด์ฉํ ์ ์๋ค.
- flowOf(โฆ) ํจ์ : ์ ํด์ง ํฌ๊ธฐ์ value์ set์ผ๋ก flow๋ฅผ ๋ง๋ค ๋ ์ฌ์ฉํ ์ ์๋ ํจ์
- asFlow() ํ์ฅํจ์ : ๋ค์ํ ํ์ ๋ค์ flow๋ก ์ ํํด์ฃผ๋ ํจ์
- flow {โฆ} ๋น๋ ํจ์ : ์์ฐจ์ ์ธ ํธ์ถ๋๋ ๊ฒ์ด emit ํจ์๊ฐ ๋๋๋ก ์์์ flow๋ฅผ ๋ง๋ค์ด์ฃผ๋ ํจ์
- channelFlow {โฆ} ๋น๋ ํจ์ : ์ ์ฌ์ ์ผ๋ก ๋์ ํธ์ถ๋๋ ๊ฒ์ด send ํจ์๊ฐ ๋๋๋ก ์์์ flow๋ฅผ ๋ง๋ค์ด์ฃผ๋ ํจ์
- MutableStateFlow & MutableSharedFlow : ๋ค์ด๋ ํธ๋ก ์ ๋ฐ์ดํธ๋๋ hot flow์ ์์ฑ์ ํจ์๋ค์ ์ ์ํจ
Flow constraints
Flow ์ธํฐํ์ด์ค๋ฅผ implementํ ๋๋ ์๋์ ๋ช ์๋ ๋ ๊ฐ์ง์ ์ค์ํ ํน์ฑ๋ค์ ๊ณ ๋ คํด์ผ ํ๋ค.
- Context preservation
- Exception transparency
์์ ๋ ํน์ฑ๋ค์ flow์ ๊ด๋ จ๋ ์ฝ๋์ ๋ํ local reasoning์ ํ ์ ์๊ฒํด์ฃผ๊ณ , downstream flow collector๋ค๋ก๋ถํฐ ๋ถ๋ฆฌ๋์ด upstream flow emitter๊ฐ develop๋๋ ๋ฐฉ์์ผ๋ก ์ฝ๋๊ฐ ๋ชจ๋ํ๋๋๋ก ํด์ค๋ค.
(1) Context preservation
flow๋ context preservationํ ํน์ฑ์ ๊ฐ์ง๋ค. flow๋ flow ๊ณ ์ ์ execution ์ปจํ
์คํธ๋ฅผ ์บก์ํํ๋ฉฐ, downstream์ผ๋ก ์ ํ์ํค๊ฑฐ๋ ๋์ถ์ํค์ง ์์ผ๋ฏ๋ก (ํน์ transformation์ด๋ terminal operation)์ execution ์ปจํ
์คํธ์ ๋ํด ์ถ๋ก ํ๊ธฐ๊ฐ ๋ ๊ฐ๋จํ๋ค.
flow์ ์ปจํ
์คํธ๋ฅผ ๋ณ๊ฒฝํ๋ ๋ฐฉ๋ฒ์ ์ค๋ก์ง ํ๋๋ค. โก๏ธ flowOn
์ฐ์ฐ์๋ฅผ ์ฐ๋ฉด ๋๋ค. flowOn ์ฐ์ฐ์๋ upstream context(flowOn ์ฐ์ฐ์ ์์ ์๋ ๋ชจ๋ ๊ฒ๋ค)๋ฅผ ๋ณ๊ฒฝ์ํจ๋ค.
val flowA = flowOf(1, 2, 3)
.map { it + 1 } // ctxA์์ ์คํ๋จ
.flowOn(ctxA) // upstream ์ปจํ
์คํธ๋ฅผ ๋ฐ๊ฟ: flowOf and map
// ์ด์ ์ปจํ
์คํธ๋ฅผ ๋ณด์กดํ๋ flow์ธ flowA๋ฅผ ๋ง๋ค์์ : flowA์ ์ ๋ณด๋ ์ด์ flowA ์์ฒด์ ์บก์ํ๋์ด ์์
val filtered = flowA // ctxA๋ flowA์ ์บก์ ํ๋์ด ์์
.filter { it == 3 } // filter๋ ์์ง์ context๊ฐ ์์ด ํจ์ดํ ์ฐ์ฐ์์
withContext(Dispatchers.Main) {
// ๋ชจ๋ ์บก์ํ๋์ง ์์ ์ฐ์ฐ์๋ค์ Main์์ ์คํ๋ ๊ฑฐ์ : filter and single
val result = filtered.single()
myUi.text = result
}
flow๋ฅผ ๊ตฌํํ ๋๋ ๊ฐ์ ์ฝ๋ฃจํด์์๋ง emit์ ํด์ผ ํ๋ค. ์ด๋ฌํ ์ ์ฝ์ฌํญ์ ๋ํดํธ flow ๋น๋์ ์ํด ์ ์ฉ๋๋ค. flow ๋น๋๋ flow implementation์์ ์ด๋ ํ ์ฝ๋ฃจํด๋ ์์์ํค์ง ์์ ๋ ์ธ ์ ์๋ค. flow์ implemetation์ ๋๋ถ๋ถ์ ๊ฐ๋ฐํ ๋ ๊ฐ๋ฐ์๋ค์ด ์ ์ง๋ฅด๋ ์ค์๋ค์ ์๋ฐฉํด์ค๋ค.
val myFlow = flow {
// GlobalScope.launch { // is prohibited
// launch(Dispatchers.IO) { // is prohibited
// withContext(CoroutineName("myFlow")) // is prohibited
emit(1) // OK
coroutineScope {
emit(2) // OK -- still the same coroutine
}
}
๋ง์ฝ์ flow์ collection์ด๋ emission์ด ์ฌ๋ฌ ์ฝ๋ฃจํด์ผ๋ก ๋ถ๋ฆฌ๋์ด์ผ ํ๋ค๋ฉด, channelFlow
๋ฅผ ์ฐ๋ฉด ๋๋ค. channelFlow๋ ๋ชจ๋ ์ปจํ
์คํธ ๋ณด์กด ์์
๋ค์ ์บก์ํํด์ค๋ค. ๊ทธ๋ฆฌ๊ณ channelFlow์ ๊ตฌ์ฒด์ ์ผ๋ก ์ด๋ป๊ฒ ๊ตฌํํ ์ง์ ๋ํด ์๊ฐํ๋๊ฒ ์๋๋ผ, ๋๋ฉ์ธ์ ํ์ ๋ ๋ฌธ์ ์ ๊ฐ๋ฐ์๊ฐ ์ง์คํ ์ ์๋๋ก ๋์์ค๋ค. ์ด๋ฌํ channelFlow ์์์๋ coroutine builder๋ค์ ์กฐํฉํด์ ์ธ ์ ์๋ค.
โ๏ธ ์ฑ๋ฅ ์ค์ + (๋์ emit & ์ปจํ ์คํธ jump)์ด ์๋ค๋ฉด?
flow ๋น๋ ๋์ ์ coroutineScope ๋๋ supervisorScope๋ฅผ ์ธ ์ ์๋ค.
- ๋ฒ์๊ฐ ์ง์ ๋ primitive๋ CoroutineScope๋ฅผ ์ ๊ณตํ๊ธฐ ์ํด ์ฌ์ฉ๋์ด์ผ ํจ
- withContext(ctx)์ด๋ , launch(ctx)์ ๊ฐ์ด ๋น๋์ ์ธ์๋ ๊ฐ์ emission์ ์ปจํ ์คํธ๋ฅผ ๋ฐ๊พธ๋๊ฑด ์๋จ
- ๋ณ๊ฐ์ ์ปจํ ์คํธ์์ ๋ค๋ฅธ flow๋ฅผ collectํ ์๋ ์์. ๊ทผ๋ฐ ์ด๋ ๊ฒ ํ๋ฉด flowOn ์ฐ์ฐ์๋ฅผ ์ ์ฉํ๋ ๊ฒ๊ณผ ๋๊ฐ์ ๊ฒฐ๊ณผ๊ฐ ๋์ค๊ธฐ๋ ํ๋, flowOn์ด ๋ ํจ์จ์ ์
(2) Exception transparency
emit
์ด๋ emitAll
์ด ์์ธ๋ฅผ ๋์ง ๋, Flow implementation์์๋ ์๋ก์ด ๊ฐ์ emitํ๋ ๊ฒ์ ๋ฉ์ถ๊ณ exception์ผ๋ก finishํด์ค์ผ ํ๋ค. downstream์ด ์คํจํ ํ์ ๊ฐ๋ค์ emitํด์ผ ํ๋ค๋ฉด, catch
์ฐ์ฐ์๋ฅผ ์ฐ๋ฉด ๋๋ค. catch
์ฐ์ฐ์๋ upstream exception๋ง ์ก์์ฃผ๊ณ , ๋ชจ๋ downstream exception์ ๊ทธ๋ฅ ํต๊ณผ์์ผ๋ฒ๋ฆฐ๋ค. ์ด์ ์ ์ฌํ๊ฒ collect ๊ฐ์ terminal ์ฐ์ฐ์๋ค์ ์ฝ๋๋ upstream flow์์ ๋ํ๋ unhandled exception์ throwํ๋ค.
flow { emitData() }
.map { computeOne(it) }
.catch { ... } // catches exceptions in emitData and computeOne
.map { computeTwo(it) }
.collect { process(it) } // throws exceptions from process and computeTwo
๊ฐ์ ์ถ๋ก ์ด finally
๋ธ๋ก์ ๋์ฒดํด์ฃผ๋ onCompletion ์ฐ์ฐ์์๋ ์ ์ฉ๋ ์ ์๋ค. ๋ชจ๋ exception์ ์ฒ๋ฆฌํ๋ Flow์ ์ฐ์ฐ์๋ค์ exception suppression ์์น์ ๋ฐ๋ฅธ๋ค. ๋ง์ฝ์ downstream exception์ด throw๋์์ ๋ upstream flow๊ฐ completion ์ค์ exception์ throwํ๋ค๋ฉด, downstream exception์ upstream exception์ ์ํด ๋์ฒด๋๊ณ suppress๋๋ฉฐ finally ๋ธ๋ก์์ throwํ๋ ๊ฒ๊ณผ ์๋ฏธ์ ๋์ผํ๊ฒ ๋๋ค. ๊ทธ๋ฌ๋ downstream exception์ root cause๋ก ์ฌ๊ธฐ๊ณ uptream์ ์๋ฌด๊ฒ๋ throwํ์ง ์์ ๊ฒ์ฒ๋ผ ์๋ํ๊ธฐ ๋๋ฌธ์, exception์ ํธ๋ค๋งํ๋ ์ฐ์ฐ์๋ค์ operation์ ์ํฅ์ ๋ผ์น์ง๋ ์๋๋ค.
์ ๋๋ก exception transparency๋ฅผ ์งํค์ง ์์ผ๋ฉด, ์ฝ๋์ ๋ํ local reasoning์ ์ ๋๋ก ํ์ง ๋ชปํ๊ณ collect {โฆ} ์์์ exception์ด upstream flow์ ์ํด โcaughtโ๋ ์ ์๊ธฐ ๋๋ฌธ์ ์ฝ๋๋ฅผ ์ถ๋ก ํ๊ธฐ ์ด๋ ค์์ ธ์ ์ด์ํ๊ฒ ์๋ํ๊ฒ ๋ ๊ฒ์ด๋ค.
Flow๋ ๋ฐํ์์ exception transparency๋ฅผ ๊ฒ์ฌํด์ ๋ง์ฝ์ exception์ด ์ด์ ์๋์์ throw๋์๋ค๋ฉด, ๊ฐ์ emitํ๋ ค๊ณ ์๋ํ ๋๋ง๋ค IllegalStateException์ throwํ๋ค.
Reactive streams
Flow๋ Reactive streams์ ํธํ๋๋ค. Flow.asPublisher๊ณผ Publisher.asFlow๋ฅผ ์จ์ reactive stream๊ณผ ์ํธ์ด์ฉํ ์ ์๋ค.
Not stable for inheritance
Flow ์ธํฐํ์ด์ค๋ ๋ค๋ฅธ ์คํ์์ค ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ์์ํด์ ์ฌ์ฉํ๊ธฐ์ ์ ํฉํ์ง๋ ์๋ค. ์๋ํ๋ฉด ์๋ก์ด ๋ฉ์๋๋ค์ด ์ถํ์ Flow ์ธํฐํ์ด์ค์ ์ถ๊ฐ๋ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
๊ตฌํํ๋ ค๋ฉด flow {โฆ} ๋น๋ ํจ์๋ฅผ ์ฐ๊ฑฐ๋, AbstractFlow๋ฅผ extendํด์ผ ํ๋ค. ๋ ๋ฐฉ๋ฒ์ ๋ชจ๋ ์ปจํ
์คํธ ๋ณด์กด ํน์ฑ์ ์๋ฐํ์ง ์์ผ๋ฉฐ ๊ฐ๋ฐ์๊ฐ concurrency์ ๊ด๋ จ๋ ์ค์๋ฅผ ํ๊ฑฐ๋, ์ผ๊ด์ฑ์๊ฒ flow dispatcher๋ cancellation์ ์ฐ๋ ๊ฒ์ ์๋ฐฉํด์ค ๊ฒ์ด๋ค.
[์ฐธ๊ณ ์ฌ์ดํธ]
Kotlin ๊ณต์ ๋ฌธ์ - Flow