android, coroutine,

Flow in Android

Ella Ella Follow Dec 28, 2021 ยท 8 mins read
Flow in Android
Share this

๐Ÿค–

๊ฐœ๋… ๐Ÿ”Ÿ Flow in Android

Flow๋Š” ์ˆœ์ฐจ์ ์œผ๋กœ ์—ฌ๋Ÿฌ ๊ฐ’์„ emitํ•˜๋Š” ๋ฐ์ดํ„ฐ์ŠคํŠธ๋ฆผ์ด๋ฉฐ, ๋‹จ ํ•˜๋‚˜์˜ ๊ฐ’๋งŒ ๋ฐ˜ํ™˜ํ•˜๋Š” suspending ํ•จ์ˆ˜์™€๋Š” ๋‹ค๋ฅด๋‹ค. flow๋ฅผ ์“ฐ๋ฉด DB์—์„œ ์‹ค์‹œ๊ฐ„ ์—†๋ฐ์ดํŠธํ•˜๋Š” ๊ฐ’๋“ค์„ ์ˆ˜์‹ ํ•  ์ˆ˜ ์žˆ๋‹ค.
flow๋Š” ์ฝ”๋ฃจํ‹ด์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋นŒ๋“œ๋˜๋ฉฐ, ๋น„๋™๊ธฐ์‹์œผ๋กœ ๊ณ„์‚ฐํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด๋‹ค. flow์—์„œ emitํ•˜๋Š” ๊ฐ’๋“ค์€ ๋™์ผํ•œ ํƒ€์ž…์ด์—ฌ์•ผ ํ•œ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด์„œ Flow<Int>๋Š” Int ๊ฐ’๋งŒ์„ ๋‚ด๋ณด๋‚ด๋Š” flow๊ฐ€ ๋œ๋‹ค.
๊ฐ’ ์‹œํ€€์Šค๋ฅผ ์ƒ์„ฑํ•˜๋Š” Iterator์™€ ์œ ์‚ฌํ•˜์ง€๋งŒ, flow๋Š” suspending ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ๊ฐ’์„ ๋น„๋™๊ธฐ๋กœ ๋งŒ๋“ค์–ด์„œ ์“ด๋‹ค. flow๋ฅผ ์“ฐ๋ฉด ๊ธฐ๋ณธ ์Šค๋ ˆ๋“œ๋ฅผ blockํ•˜์ง€ ์•Š๊ณ ๋„ ๋„คํŠธ์›Œํฌ ์š”์ฒญ์„ ํ•ด์„œ ๋‹ค์Œ ๊ฐ’์„ ๋งŒ๋“ค์–ด๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

Data Stream์˜ ์„ธ ๊ฐ€์ง€ ์š”์†Œ : ์ƒ์‚ฐ์ž + ์ค‘๊ฐœ์ž(option) + ์†Œ๋น„์ž

  • ์ƒ์‚ฐ์ž(Producer) : ์ŠคํŠธ๋ฆผ์— ์ถ”๊ฐ€๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์‚ฐํ•จ. flow์—์„œ ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์‚ฐํ•จ
  • ์ค‘๊ฐœ์ž(Intermediaries, option) : ์ŠคํŠธ๋ฆผ์— ๋‚ด๋ณด๋‚ด๋Š” ๊ฐ๊ฐ์˜ ๊ฐ’์„ ์ˆ˜์ •ํ•˜๊ฑฐ๋‚˜, ์ŠคํŠธ๋ฆผ ์ž์ฒด๋ฅผ ์ˆ˜์ •ํ•จ
  • ์†Œ๋น„์ž(Consumer) : ์ŠคํŠธ๋ฆผ์˜ ๊ฐ’์„ ์‚ฌ์šฉํ•จ

์•ˆ๋“œ๋กœ์ด๋“œ์—์„œ ์ผ๋ฐ˜์ ์œผ๋กœ repository๋Š” UI ๋ฐ์ดํ„ฐ์˜ Producer๋‹ค. ๊ทธ๋ฆฌ๊ณ  UI๋Š” ์ตœ์ข…์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ํ‘œ์‹œํ•˜๋Š” Consumer๋‹ค. UI ๋ ˆ์ด์–ด๋Š” User Input Event์˜ Producer์ด๋ฉฐ, ๊ณ„์ธต ๊ตฌ์กฐ์˜ ๋‹ค๋ฅธ ๋ ˆ์ด์–ด๊ฐ€ input event๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋„ ํ•œ๋‹ค. Producer์™€ Consumer ์‚ฌ์ด์— ์žˆ๋Š” ๋ ˆ์ด์–ด๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ์ค‘๊ฐœ์ž์˜ ์—ญํ• ์„ ํ•˜๋Š”๋ฐ, ๋‹ค์Œ ๋ ˆ์ด์–ด์˜ ์š”๊ตฌ์‚ฌํ•ญ์— ๋งž์ถฐ์ฃผ๊ธฐ ์œ„ํ•ด์„œ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ˆ˜์ •ํ•ด์ค€๋‹ค.

๐ŸŒŸ Flow ๋งŒ๋“ค์–ด๋ณด๊ธฐ

Flow๋ฅผ ๋งŒ๋“ค๋ ค๋ฉด ์•ž์„œ ํฌ์ŠคํŒ…ํ–ˆ๋˜ ๋‚ด์šฉ์—์„œ ๋ณธ ๋ฐ”์™€ ๊ฐ™์ด Flow Builder API๋ฅผ ์จ์•ผํ•œ๋‹ค. flow ๋นŒ๋” ํ•จ์ˆ˜๋Š” emit ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ๊ฐ’์„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์— ๋‚ด๋ณด๋‚ด๋Š” ์ƒˆ๋กœ์šด flow๋ฅผ ๋งŒ๋“ค์–ด์ค€๋‹ค.
๋ฐ‘์˜ ์ฝ”๋“œ์—์„œ DataSource๋Š” ๊ณ ์ •๋œ ๊ฐ„๊ฒฉ์œผ๋กœ ์ตœ์‹  ๋‰ด์Šค๋ฅผ ์ž๋™์œผ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค. suspending ํ•จ์ˆ˜๋Š” ๋‹จ์ผ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•  ๋ฟ, ์—ฌ๋Ÿฌ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์—†์œผ๋ฏ€๋กœ DataSource๊ฐ€ ์ด๋Ÿฌํ•œ ์š”๊ตฌ์‚ฌํ•ญ์„ ์ถฉ์กฑํ•˜๋Š” flow๋ฅผ ๋งŒ๋“ค์–ด์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ์ด์™€ ๊ฐ™์€ ๊ฒฝ์šฐ์—์„œ DataSource๋Š” Producer ์—ญํ• ์„ ํ•œ๋‹ค.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // flow๋กœ request ๊ฒฐ๊ณผ๋ฅผ ๋‚ด๋ณด๋ƒ„
            delay(refreshIntervalMs) // refreshIntervalMs๋งŒํผ ์ฝ”๋ฃจํ‹ด์„ suspendํ•จ
        }
    }
}

// suspend ํ•จ์ˆ˜๋กœ ๋„คํŠธ์›Œํฌ ์š”์ฒญ์„ ๋งŒ๋“ค์–ด๋†“์€ ์ธํ„ฐํŽ˜์ด์Šค
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow ๋นŒ๋”๋Š” ์ฝ”๋ฃจํ‹ด ๋‚ด์—์„œ ์‹คํ–‰๋˜๋ฉฐ ์•„๋ž˜์™€ ๊ฐ™์€ ์ œํ•œ์‚ฌํ•ญ์ด ์ ์šฉ๋œ๋‹ค.

  • flow๋Š” ์ˆœ์ฐจ์ ์ž„. Producer๋Š” ์ฝ”๋ฃจํ‹ด์— ์žˆ์œผ๋ฏ€๋กœ, suspending ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด producer๋Š” ์ •์ง€ ํ•จ์ˆ˜๊ฐ€ ๋ฐ˜ํ™˜๋  ๋•Œ๊นŒ์ง€ suspend ์ƒํƒœ๋กœ ์œ ์ง€๋จ. ์œ„์˜ ์ฝ”๋“œ์—์„œ Producer๋Š” fetchLatestNews ๋„คํŠธ์›Œํฌ ์š”์ฒญ์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ์ •์ง€ํ•จ. ์™„๋ฃŒ๋œ ํ›„์—์•ผ ๊ฒฐ๊ณผ๋ฅผ ์ŠคํŠธ๋ฆผ์— ๋‚ด๋ณด๋‚ธ๋‹ค.
  • flow ๋นŒ๋”์—์„œ ์ƒ์‚ฐ์ž๊ฐ€ ๋‹ค๋ฅธ CoroutineContext์˜ ๊ฐ’์„ emitํ•  ์ˆ˜ ์—†์Œ. ๋”ฐ๋ผ์„œ ์ƒˆ ์ฝ”๋ฃจํ‹ด์„ ๋”ฐ๋กœ ๋งŒ๋“ค๊ฑฐ๋‚˜, withContext๋กœ ๋‹ค๋ฅธ CoroutineContext์—์„œ emitํ•ด๋ดค์ž ์†Œ์šฉ ์—†์Œ. ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ์—๋Š” ๋‹ค๋ฅธ callbackFlow์™€ ๊ฐ™์€ flow ๋นŒ๋”๋ฅผ ์จ์•ผํ•จ

โœ๏ธ Stream ์ˆ˜์ •ํ•ด๋ณด๊ธฐ

์ค‘๊ฐœ์ž(Intermediaries)๋Š” ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž(intermediate operator)๋ฅผ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ, ๊ฐ’์„ ์†Œ๋น„ํ•˜์ง€ ์•Š๊ณ ๋„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ˆ˜์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. intermediate ์—ฐ์‚ฐ์ž๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์— ์ ์šฉ๋˜๋Š” ๊ฒฝ์šฐ, ๊ฐ’์ด ๋‚˜์ค‘์— ์‚ฌ์šฉ๋˜๊ธฐ ์ „๊นŒ์ง€๋Š” ์‹คํ–‰๋˜์ง€ ์•Š๋Š” work chain์„ ์„ค์ •ํ•ด์ฃผ๋Š” ํ•จ์ˆ˜์ด๋‹ค.
์•„๋ž˜์˜ ์ฝ”๋“œ์—์„œ Repository ๋ ˆ์ด์–ด๋Š” ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž map ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๊ฐ€ View์— ํ‘œ์‹œ๋˜๋„๋ก ๋ณ€ํ™˜ํ•˜๊ณ  ์žˆ๋‹ค.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

intermediate ์—ฐ์‚ฐ์ž๋Š” ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ๋‘๊ณ  ์ˆœ์ฐจ์ ์œผ๋กœ ์ ์šฉํ•  ์ˆ˜ ์žˆ์–ด์„œ, item๋“ค์„ flow์— ๋‚ด๋ณด๋‚ผ ๋•Œ ๋Š๋ฆฌ๊ฒŒ ์‹คํ–‰๋˜๋Š” work chain์„ ๊ตฌ์„ฑํ•ด์ค€๋‹ค. ์ŠคํŠธ๋ฆผ์— intermediate ์—ฐ์‚ฐ์ž๋ฅผ ์ ์šฉํ•˜๋Š” ๊ฒƒ๋งŒ์œผ๋กœ๋Š” flow๊ฐ€ collect ๋˜์ง€ ์•Š๋Š”๋‹ค.

๐Ÿฅ flow์—์„œ collectํ•ด๋ณด๊ธฐ

ํ„ฐ๋ฏธ๋„ ์—ฐ์‚ฐ์ž(terminal operator)๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด, โ€˜๊ฐ’๋“ค์„ ์ˆ˜์‹ ํ•˜๊ธฐ ์œ„ํ•ด listeningโ€™์„ ์‹œ์ž‘ํ•˜๋Š” flow๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•ด์ค€๋‹ค. collect์„ ์“ฐ๋ฉด emitํ•  ๋•Œ ์ŠคํŠธ๋ฆผ์˜ ๋ชจ๋“  ๊ฐ’๋“ค์„ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.
collect๋Š” suspending ํ•จ์ˆ˜์ด๋ฏ€๋กœ, ์ฝ”๋ฃจํ‹ด ๋‚ด์—์„œ ์‹คํ–‰๋˜์–ด์•ผ ํ•œ๋‹ค. ์ƒˆ๋กœ์šด ๊ฐ’์—์„œ ํ˜ธ์ถœ๋˜๋Š” ๋งค๊ฐœ๋ณ€์ˆ˜์—๋Š” ๋žŒ๋‹ค๋ฅผ ์“ด๋‹ค. collect๋Š” suspending ํ•จ์ˆ˜๋ผ์„œ collect๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ์ฝ”๋ฃจํ‹ด์€ flow๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ์ •์ง€๋  ์ˆ˜ ์žˆ๋‹ค.
์œ„์˜ ์ฝ”๋“œ์—์„œ repository์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ViewModel์„ ๊ฐ„๋‹จํžˆ ๊ตฌํ˜„ํ•ด๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // collect๋ฅผ ํ†ตํ•ด flow๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•˜์—ฌ element๋“ค์„ ์†Œ๋น„ํ•œ๋‹ค.
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // ์—ฌ๊ธฐ์„œ ์ตœ์‹  favorite news์— ๋Œ€ํ•œ View๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋ฉด ๋จ
            }
        }
    }
}

flow๋ฅผ collectํ•˜๋ฉด ์ผ์ •ํ•œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์œผ๋กœ ์ตœ์‹  ๋‰ด์Šค๋ฅผ ์ƒˆ๋กœ๊ณ ์นจํ•˜์—ฌ ๋„คํŠธ์›Œํฌ ์š”์ฒญ์˜ ๊ฒฐ๊ณผ๋ฅผ emitํ•˜๋Š” producer๊ฐ€ ํŠธ๋ฆฌ๊ฑฐ๋œ๋‹ค. producer๋Š” while(true) ๋ฃจํ”„๋กœ ํ•ญ์ƒ ํ™œ์„ฑ ์ƒํƒœ๊ฐ€ ์œ ์ง€๋˜๋ฏ€๋กœ Viewmodel์ด ์‚ญ์ œ๋˜์–ด viewModelScope๊ฐ€ ์ทจ์†Œ๋˜๋ฉด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋œ๋‹ค.

โš ๏ธ flow์˜ collect๊ฐ€ ์ค‘์ง€๋˜๋Š” ๊ฒฝ์šฐ?

  • collect๋œ ์ฝ”๋ฃจํ‹ด์ด ์ทจ์†Œ๋œ ๊ฒฝ์šฐ, flow์˜ collect๊ฐ€ ์ค‘์ง€๋˜๊ณ  ๊ธฐ๋ณธ producer๋„ ์ค‘์ง€๋จ
  • producer๊ฐ€ ํ•ญ๋ชฉ emit์„ ๋ชจ๋‘ ์™„๋ฃŒํ•œ ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋˜๊ณ  collect๋ฅผ ํ˜ธ์ถœํ•œ ์ฝ”๋ฃจํ‹ด์ด ๋‹ค์‹œ ์‹คํ–‰ํ•˜๊ธฐ ์‹œ์ž‘ํ•จ
    ๋‹ค๋ฅธ intermediate ์—ฐ์‚ฐ์ž๊ฐ€ ์ง€์ •๋˜์ง€ ์•Š์œผ๋ฉด, flow๋Š” coldํ•˜๊ณ  lazyํ•˜๋‹ค. ์ฆ‰, flow์—์„œ terminal ์—ฐ์‚ฐ์ž๊ฐ€ ํ˜ธ์ถœ๋  ๋•Œ๋งˆ๋‹ค, producer ์ฝ”๋“œ๊ฐ€ ์‹คํ–‰๋œ๋‹ค. ์—ฌ๋Ÿฌ consumer๊ฐ€ ๋™์‹œ์— collectํ•˜๋Š” ๊ฒฝ์šฐ์— flow๋ฅผ ์ตœ์ ํ™”์‹œํ‚ค๊ณ  ๊ณต์œ ํ•˜๊ฒŒ ํ•˜๋ ค๋ฉด shareIn์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

๐Ÿคš Unexpected exception ์บ์น˜ํ•˜๊ธฐ

ํƒ€์‚ฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—์„œ producer๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒฝ์šฐ์— unexpected exception์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ exception์„ ์ฒ˜๋ฆฌํ•˜๋ ค๋ฉด intermediate ์—ฐ์‚ฐ์ž์ธ catch๋ฅผ ์จ์•ผ ํ•œ๋‹ค.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž์ธ catch operator. ๋งŒ์•ฝ์— exception์ด throw๋˜๋ฉด 
                // catchํ•˜๊ณ  UI๋ฅผ ์—…๋ฐ์ดํŠธ ํ•จ
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // ์—ฌ๊ธฐ์„œ ์ตœ์‹  favorite news์— ๋Œ€ํ•œ View๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋ฉด ๋จ
                }
        }
    }
}

catch๋ฅผ ์“ฐ์ง€ ์•Š์•˜๋˜ ์ด์ „์˜ ์ฝ”๋“œ์—์„œ๋Š” exception์ด ๋ฐœ์ƒํ–ˆ์„ ๋•Œ, ์ƒˆ element๊ฐ€ ์ˆ˜์‹ ๋˜์ง€ ์•Š์œผ๋ฏ€๋กœ collect ๋žŒ๋‹ค๊ฐ€ ํ˜ธ์ถœ๋˜์ง€ ์•Š๋Š”๋‹ค. catch๋ฅผ ํ†ตํ•ด flow์— ํ•ญ๋ชฉ์„ emitํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ฐ‘์˜ ์ฝ”๋“œ์—์„œ repository ๋ ˆ์ด์–ด๋Š” ์บ์‹œ๋œ ๊ฐ’์„ emitํ•˜๊ณ  ์žˆ๋‹ค.

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

์œ„์˜ ์ฝ”๋“œ์—์„œ exception์ด ๋ฐœ์ƒํ•˜๋ฉด collect ๋žŒ๋‹ค๊ฐ€ ํ˜ธ์ถœ๋˜์–ด์„œ exception์œผ๋กœ ์ธํ•œ ์ƒˆ ํ•ญ๋ชฉ์ด ์ŠคํŠธ๋ฆผ์— ์ „์†ก๋œ๋‹ค.

๋‹ค๋ฅธ CoroutineContext์—์„œ ์‹คํ–‰ํ•˜๊ธฐ

๊ธฐ๋ณธ์ ์œผ๋กœ flow ๋นŒ๋”์˜ producer๋Š” collectํ•˜๋Š” ์ฝ”๋ฃจํ‹ด์˜ CoroutineContext์—์„œ ์‹คํ–‰๋œ๋‹ค. ๋‹ค๋ฅธ CoroutineContext์—์„œ ๊ฐ’์„ emitํ•  ์ˆ˜๊ฐ€ ์—†์œผ๋ฉฐ, ๊ทธ๋Ÿด ๊ฒฝ์šฐ ๋™์ž‘์ด ์ด์ƒํ•˜๊ฒŒ ๋Œ์•„๊ฐˆ ์ˆ˜๋„ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด์„œ, ์œ„์˜ ์˜ˆ์ œ์—์„œ repository ๋ ˆ์ด์–ด๋Š” viewModelScope๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” Dispatchers.Main์—์„œ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋ฉด ์•ˆ๋œ๋‹ค.
flow์˜ CoroutineContext๋ฅผ ๋ณ€๊ฒฝํ•˜๋ ค๋ฉด intermediate ์—ฐ์‚ฐ์ž์ธ flowOn๋ฅผ ์จ์•ผํ•œ๋‹ค. flowOn์€ upstream flow์˜ CoroutineContext๋ฅผ ๋ณ€๊ฒฝํ•ด์ค€๋‹ค. ์ฆ‰, flowOn ์ „์— ์žˆ๋Š” producer์™€ intermediate ์—ฐ์‚ฐ์ž์— ์ ์šฉ๋œ๋‹ค. downstream flow(flowOn ์ดํ›„์˜ intermediate ์—ฐ์‚ฐ์ž์™€ producer)๋Š” flowOn์— ์˜ํ–ฅ์„ ๋ฐ›์ง€ ์•Š์œผ๋ฉฐ, flow์—์„œ collectํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•˜๋Š” CoroutineContext์—์„œ ์‹คํ–‰๋œ๋‹ค. flowOn ์—ฐ์‚ฐ์ž๊ฐ€ ์—ฌ๋Ÿฌ๊ฐœ๊ฐ€ ์žˆ๋‹ค๋ฉด, ๊ฐ ์—ฐ์‚ฐ์ž๋Š” ํ˜„์žฌ ์œ„์น˜์—์„œ upstream์„ ๋ณ€๊ฒฝํ•œ๋‹ค.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // defaultDispatcher์—์„œ ์‹คํ–‰๋จ
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // defaultDispatcher์—์„œ ์‹คํ–‰๋จ
                saveInCache(news)
            }
            // flowOn์ด upstream flow โ†‘ ์— ์˜ํ–ฅ์„ ๋ฏธ์นจ
            .flowOn(defaultDispatcher)
            // downstream flow โ†“ ๋Š” ์˜ํ–ฅ x
            .catch { exception -> // consumer์˜ context์—์„œ ์‹คํ–‰๋จ
                emit(lastCachedNews())
            }
}  

์œ„์˜ ์ฝ”๋“œ์—์„œ onEach์™€ map ์—ฐ์‚ฐ์ž๋Š” defaultDispatcher๋ฅผ ์‚ฌ์šฉํ•˜๋Š”๋ฐ ๋ฐ˜ํ•ด, catch ์—ฐ์‚ฐ์ž์™€ ์†Œ๋น„์ž๋Š” viewModelScope์— ์‚ฌ์šฉ๋˜๋Š” Dispatchers.Main์—์„œ ์‹คํ–‰๋œ๋‹ค.
DataSource ๋ ˆ์ด์–ด๋Š” I/O ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋ฏ€๋กœ, I/O ์ž‘์—…์— ์ตœ์ ํ™”๋œ ๋””์ŠคํŒจ์ฒ˜์ธ IO Dispatcher๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ flow

Flow๋Š” Jetpack ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์— ํ†ตํ•ฉ๋˜๋ฉฐ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ฐ์ดํŠธ๋ฅผ ์—…๋ฐ์ดํŠธํ•ด์•ผ ํ•˜๊ฑฐ๋‚˜, ๋ฌด์ œํ•œ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด ํ•„์š”ํ•  ๋•Œ ๋งค์šฐ ์ ํ•ฉํ•˜๋‹ค.
Flow with Room์„ ์‚ฌ์šฉํ•˜๋ฉด DB ๋ณ€๊ฒฝ ์•Œ๋ฆผ์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค. DAO๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ์— ์‹ค์‹œ๊ฐ„ ์—…๋ฐ์ดํŠธ๋ฅผ ๋ฐ›์œผ๋ ค๋ฉด, Flow ํƒ€์ž…์„ ๋ฐ˜ํ™˜ํ•˜๋ฉด ๋œ๋‹ค.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

์ด๋ ‡๊ฒŒ ์ •์˜ํ•ด๋†“์œผ๋ฉด Example ํ…Œ์ด๋ธ”์ด ๋ณ€๊ฒฝ๋  ๋•Œ๋งˆ๋‹ค DB์˜ ์ƒˆํ•ญ๋ชฉ์ด ํฌํ•จ๋œ ์ƒˆ ๋ชฉ๋ก์ด emit๋œ๋‹ค.

์ฝœ๋ฐฑ ๊ธฐ๋ฐ˜ API๋ฅผ flow๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ

callbackFlow๋Š” ์ฝœ๋ฐฑ ๊ธฐ๋ฐ˜ API๋ฅผ flow๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋Š” flow ๋นŒ๋”์ด๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด์„œ Firebase Firestore Android API๋Š” ์ฝœ๋ฐฑ์„ ์‚ฌ์šฉํ•˜๋Š”๋ฐ, ์ด API๋ฅผ flow๋กœ ๋ณ€ํ™˜ํ•˜๊ณ , Firebase DB ์—…๋ฐ์ดํŠธ๋ฅผ ์ˆ˜์‹  ๋Œ€๊ธฐํ•˜๋ ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๋ฅผ ์ด์šฉํ•˜๋ฉด ๋œ๋‹ค.

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow ๋นŒ๋”์™€๋Š” ๋‹ฌ๋ฆฌ callbackFlow์—์„œ๋Š” send ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋‹ค๋ฅธ CoroutineContext์—์„œ ๊ฐ’์„ emit ํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  offer ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๊ฐ’์„ ์ฝ”๋ฃจํ‹ด์˜ ์™ธ๋ถ€๋กœ emitํ•  ์ˆ˜ ์žˆ๋‹ค.
callbackFlow๋Š” ๊ฐœ๋…์ƒ blocking queue์™€ ๋งค์šฐ ์œ ์‚ฌํ•œ channel์„ ์‚ฌ์šฉํ•œ๋‹ค. channel์€ capacity(๋ฒ„ํผ๋ง์ด ๊ฐ€๋Šฅํ•œ element์˜ ์ตœ๋Œ€ ๊ฐœ์ˆ˜)๋กœ ๊ตฌ์„ฑ๋œ๋‹ค. callbackFlow์—์„œ ์ƒ์„ฑ๋œ ์ฑ„๋„์˜ ๊ธฐ๋ณธ capacity๋Š” element 64๊ฐœ์ด๋‹ค. ์ „์ฒด ์ฑ„๋„์— ์ƒˆ๋กœ์šด element๋ฅผ ์ถ”๊ฐ€ํ•˜๋Š” ๊ฒฝ์šฐ (1) send๋Š” ์ƒˆ๋กœ์šด element๋ฅผ ์œ„ํ•œ ๊ณต๊ฐ„์ด ์ƒ๊ธธ ๋•Œ๊นŒ์ง€ producer๋ฅผ ์ •์ง€ํ•œ๋‹ค. ๋ฐ˜๋ฉด์— (2) offer๋Š” ์ฑ„๋„์— element๋ฅผ ์ถ”๊ฐ€ํ•˜์ง€ ์•Š๊ณ  ์ฆ‰์‹œ false๋ฅผ ๋ฐ˜ํ™˜ํ•ด๋ฒ„๋ฆฐ๋‹ค.

[์ฐธ๊ณ  ์‚ฌ์ดํŠธ]
์•ˆ๋“œ๋กœ์ด๋“œ ๊ณต์‹ ๋ฌธ์„œ - Flow

Join Newsletter
Get the latest news right in your inbox. We never spam!
Ella
Written by Ella Follow
Android Developer, love to explore new ideas and write on my morning coffee!