๐ค
๊ฐ๋ ๐ Flow in Android
Flow๋ ์์ฐจ์ ์ผ๋ก ์ฌ๋ฌ ๊ฐ์ emitํ๋ ๋ฐ์ดํฐ์คํธ๋ฆผ์ด๋ฉฐ, ๋จ ํ๋์ ๊ฐ๋ง ๋ฐํํ๋ suspending ํจ์์๋ ๋ค๋ฅด๋ค. flow๋ฅผ ์ฐ๋ฉด DB์์ ์ค์๊ฐ ์๋ฐ์ดํธํ๋ ๊ฐ๋ค์ ์์ ํ ์ ์๋ค.
flow๋ ์ฝ๋ฃจํด์ ๊ธฐ๋ฐ์ผ๋ก ๋น๋๋๋ฉฐ, ๋น๋๊ธฐ์์ผ๋ก ๊ณ์ฐํ ์ ์๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด๋ค. flow์์ emitํ๋ ๊ฐ๋ค์ ๋์ผํ ํ์
์ด์ฌ์ผ ํ๋ค. ์๋ฅผ ๋ค์ด์ Flow<Int>
๋ Int ๊ฐ๋ง์ ๋ด๋ณด๋ด๋ flow๊ฐ ๋๋ค.
๊ฐ ์ํ์ค๋ฅผ ์์ฑํ๋ Iterator
์ ์ ์ฌํ์ง๋ง, flow๋ suspending ํจ์๋ฅผ ์ด์ฉํ์ฌ ๊ฐ์ ๋น๋๊ธฐ๋ก ๋ง๋ค์ด์ ์ด๋ค. flow๋ฅผ ์ฐ๋ฉด ๊ธฐ๋ณธ ์ค๋ ๋๋ฅผ blockํ์ง ์๊ณ ๋ ๋คํธ์ํฌ ์์ฒญ์ ํด์ ๋ค์ ๊ฐ์ ๋ง๋ค์ด๋ผ ์ ์๋ค.
Data Stream์ ์ธ ๊ฐ์ง ์์ : ์์ฐ์ + ์ค๊ฐ์(option) + ์๋น์
- ์์ฐ์(Producer) : ์คํธ๋ฆผ์ ์ถ๊ฐ๋๋ ๋ฐ์ดํฐ๋ฅผ ์์ฐํจ. flow์์ ๋น๋๊ธฐ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์์ฐํจ
- ์ค๊ฐ์(Intermediaries, option) : ์คํธ๋ฆผ์ ๋ด๋ณด๋ด๋ ๊ฐ๊ฐ์ ๊ฐ์ ์์ ํ๊ฑฐ๋, ์คํธ๋ฆผ ์์ฒด๋ฅผ ์์ ํจ
- ์๋น์(Consumer) : ์คํธ๋ฆผ์ ๊ฐ์ ์ฌ์ฉํจ
์๋๋ก์ด๋์์ ์ผ๋ฐ์ ์ผ๋ก repository๋ UI ๋ฐ์ดํฐ์ Producer๋ค. ๊ทธ๋ฆฌ๊ณ UI๋ ์ต์ข ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ํ์ํ๋ Consumer๋ค. UI ๋ ์ด์ด๋ User Input Event์ Producer์ด๋ฉฐ, ๊ณ์ธต ๊ตฌ์กฐ์ ๋ค๋ฅธ ๋ ์ด์ด๊ฐ input event๋ฅผ ์ฌ์ฉํ๊ธฐ๋ ํ๋ค. Producer์ Consumer ์ฌ์ด์ ์๋ ๋ ์ด์ด๋ ์ผ๋ฐ์ ์ผ๋ก ์ค๊ฐ์์ ์ญํ ์ ํ๋๋ฐ, ๋ค์ ๋ ์ด์ด์ ์๊ตฌ์ฌํญ์ ๋ง์ถฐ์ฃผ๊ธฐ ์ํด์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์์ ํด์ค๋ค.
๐ Flow ๋ง๋ค์ด๋ณด๊ธฐ
Flow๋ฅผ ๋ง๋ค๋ ค๋ฉด ์์ ํฌ์คํ
ํ๋ ๋ด์ฉ์์ ๋ณธ ๋ฐ์ ๊ฐ์ด Flow Builder API๋ฅผ ์จ์ผํ๋ค. flow ๋น๋ ํจ์๋ emit
ํจ์๋ฅผ ์ด์ฉํ์ฌ ์๋ก์ด ๊ฐ์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๋ด๋ณด๋ด๋ ์๋ก์ด flow๋ฅผ ๋ง๋ค์ด์ค๋ค.
๋ฐ์ ์ฝ๋์์ DataSource
๋ ๊ณ ์ ๋ ๊ฐ๊ฒฉ์ผ๋ก ์ต์ ๋ด์ค๋ฅผ ์๋์ผ๋ก ๊ฐ์ ธ์จ๋ค. suspending ํจ์๋ ๋จ์ผ ๊ฐ์ ๋ฐํํ ๋ฟ, ์ฌ๋ฌ ๊ฐ์ ๋ฐํํ ์ ์์ผ๋ฏ๋ก DataSource๊ฐ ์ด๋ฌํ ์๊ตฌ์ฌํญ์ ์ถฉ์กฑํ๋ flow๋ฅผ ๋ง๋ค์ด์ ๋ฐํํ๋ค. ์ด์ ๊ฐ์ ๊ฒฝ์ฐ์์ DataSource๋ Producer ์ญํ ์ ํ๋ค.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // flow๋ก request ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ณด๋
delay(refreshIntervalMs) // refreshIntervalMs๋งํผ ์ฝ๋ฃจํด์ suspendํจ
}
}
}
// suspend ํจ์๋ก ๋คํธ์ํฌ ์์ฒญ์ ๋ง๋ค์ด๋์ ์ธํฐํ์ด์ค
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
๋น๋๋ ์ฝ๋ฃจํด ๋ด์์ ์คํ๋๋ฉฐ ์๋์ ๊ฐ์ ์ ํ์ฌํญ์ด ์ ์ฉ๋๋ค.
- flow๋ ์์ฐจ์ ์. Producer๋ ์ฝ๋ฃจํด์ ์์ผ๋ฏ๋ก, suspending ํจ์๋ฅผ ํธ์ถํ๋ฉด producer๋ ์ ์ง ํจ์๊ฐ ๋ฐํ๋ ๋๊น์ง suspend ์ํ๋ก ์ ์ง๋จ. ์์ ์ฝ๋์์ Producer๋
fetchLatestNews
๋คํธ์ํฌ ์์ฒญ์ด ์๋ฃ๋ ๋๊น์ง ์ ์งํจ. ์๋ฃ๋ ํ์์ผ ๊ฒฐ๊ณผ๋ฅผ ์คํธ๋ฆผ์ ๋ด๋ณด๋ธ๋ค. flow
๋น๋์์ ์์ฐ์๊ฐ ๋ค๋ฅธCoroutineContext
์ ๊ฐ์emit
ํ ์ ์์. ๋ฐ๋ผ์ ์ ์ฝ๋ฃจํด์ ๋ฐ๋ก ๋ง๋ค๊ฑฐ๋,withContext
๋ก ๋ค๋ฅธCoroutineContext
์์ emitํด๋ดค์ ์์ฉ ์์. ์ด๋ฌํ ๊ฒฝ์ฐ์๋ ๋ค๋ฅธcallbackFlow
์ ๊ฐ์ flow ๋น๋๋ฅผ ์จ์ผํจ
โ๏ธ Stream ์์ ํด๋ณด๊ธฐ
์ค๊ฐ์(Intermediaries)๋ ์ค๊ฐ ์ฐ์ฐ์(intermediate operator
)๋ฅผ ์ฌ์ฉํจ์ผ๋ก์จ, ๊ฐ์ ์๋นํ์ง ์๊ณ ๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์์ ํ ์ ์๋ค. intermediate ์ฐ์ฐ์๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ ์ฉ๋๋ ๊ฒฝ์ฐ, ๊ฐ์ด ๋์ค์ ์ฌ์ฉ๋๊ธฐ ์ ๊น์ง๋ ์คํ๋์ง ์๋ work chain์ ์ค์ ํด์ฃผ๋ ํจ์์ด๋ค.
์๋์ ์ฝ๋์์ Repository ๋ ์ด์ด๋ ์ค๊ฐ ์ฐ์ฐ์ map
์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๊ฐ View
์ ํ์๋๋๋ก ๋ณํํ๊ณ ์๋ค.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
intermediate ์ฐ์ฐ์๋ ์๊ฐ ๊ฐ๊ฒฉ์ ๋๊ณ ์์ฐจ์ ์ผ๋ก ์ ์ฉํ ์ ์์ด์, item๋ค์ flow์ ๋ด๋ณด๋ผ ๋ ๋๋ฆฌ๊ฒ ์คํ๋๋ work chain์ ๊ตฌ์ฑํด์ค๋ค. ์คํธ๋ฆผ์ intermediate ์ฐ์ฐ์๋ฅผ ์ ์ฉํ๋ ๊ฒ๋ง์ผ๋ก๋ flow๊ฐ collect ๋์ง ์๋๋ค.
๐ฅ flow์์ collectํด๋ณด๊ธฐ
ํฐ๋ฏธ๋ ์ฐ์ฐ์(terminal operator
)๋ฅผ ์ฌ์ฉํ๋ฉด, โ๊ฐ๋ค์ ์์ ํ๊ธฐ ์ํด listeningโ์ ์์ํ๋ flow๋ฅผ ํธ๋ฆฌ๊ฑฐํด์ค๋ค. collect
์ ์ฐ๋ฉด emitํ ๋ ์คํธ๋ฆผ์ ๋ชจ๋ ๊ฐ๋ค์ ๊ฐ์ ธ์ฌ ์ ์๋ค.
collect
๋ suspending ํจ์์ด๋ฏ๋ก, ์ฝ๋ฃจํด ๋ด์์ ์คํ๋์ด์ผ ํ๋ค. ์๋ก์ด ๊ฐ์์ ํธ์ถ๋๋ ๋งค๊ฐ๋ณ์์๋ ๋๋ค๋ฅผ ์ด๋ค. collect๋ suspending ํจ์๋ผ์ collect๋ฅผ ํธ์ถํ๋ ์ฝ๋ฃจํด์ flow๊ฐ ์ข
๋ฃ๋ ๋๊น์ง ์ ์ง๋ ์ ์๋ค.
์์ ์ฝ๋์์ repository์ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๋ ViewModel
์ ๊ฐ๋จํ ๊ตฌํํด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// collect๋ฅผ ํตํด flow๋ฅผ ํธ๋ฆฌ๊ฑฐํ์ฌ element๋ค์ ์๋นํ๋ค.
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// ์ฌ๊ธฐ์ ์ต์ favorite news์ ๋ํ View๋ฅผ ์
๋ฐ์ดํธํ๋ฉด ๋จ
}
}
}
}
flow๋ฅผ collectํ๋ฉด ์ผ์ ํ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ์ต์ ๋ด์ค๋ฅผ ์๋ก๊ณ ์นจํ์ฌ ๋คํธ์ํฌ ์์ฒญ์ ๊ฒฐ๊ณผ๋ฅผ emitํ๋ producer๊ฐ ํธ๋ฆฌ๊ฑฐ๋๋ค. producer๋ while(true) ๋ฃจํ๋ก ํญ์ ํ์ฑ ์ํ๊ฐ ์ ์ง๋๋ฏ๋ก Viewmodel์ด ์ญ์ ๋์ด viewModelScope
๊ฐ ์ทจ์๋๋ฉด ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ์ข
๋ฃ๋๋ค.
โ ๏ธ flow์ collect๊ฐ ์ค์ง๋๋ ๊ฒฝ์ฐ?
- collect๋ ์ฝ๋ฃจํด์ด ์ทจ์๋ ๊ฒฝ์ฐ, flow์ collect๊ฐ ์ค์ง๋๊ณ ๊ธฐ๋ณธ producer๋ ์ค์ง๋จ
- producer๊ฐ ํญ๋ชฉ emit์ ๋ชจ๋ ์๋ฃํ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ์ข
๋ฃ๋๊ณ collect๋ฅผ ํธ์ถํ ์ฝ๋ฃจํด์ด ๋ค์ ์คํํ๊ธฐ ์์ํจ
๋ค๋ฅธ intermediate ์ฐ์ฐ์๊ฐ ์ง์ ๋์ง ์์ผ๋ฉด, flow๋cold
ํ๊ณlazy
ํ๋ค. ์ฆ, flow์์ terminal ์ฐ์ฐ์๊ฐ ํธ์ถ๋ ๋๋ง๋ค, producer ์ฝ๋๊ฐ ์คํ๋๋ค. ์ฌ๋ฌ consumer๊ฐ ๋์์ collectํ๋ ๊ฒฝ์ฐ์ flow๋ฅผ ์ต์ ํ์ํค๊ณ ๊ณต์ ํ๊ฒ ํ๋ ค๋ฉดshareIn
์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
๐ค Unexpected exception ์บ์นํ๊ธฐ
ํ์ฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ producer๋ฅผ ๊ตฌํํ๋ ๊ฒฝ์ฐ์ unexpected exception์ด ๋ฐ์ํ ์ ์๋ค. ์ด๋ฌํ exception์ ์ฒ๋ฆฌํ๋ ค๋ฉด intermediate ์ฐ์ฐ์์ธ catch
๋ฅผ ์จ์ผ ํ๋ค.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// ์ค๊ฐ ์ฐ์ฐ์์ธ catch operator. ๋ง์ฝ์ exception์ด throw๋๋ฉด
// catchํ๊ณ UI๋ฅผ ์
๋ฐ์ดํธ ํจ
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// ์ฌ๊ธฐ์ ์ต์ favorite news์ ๋ํ View๋ฅผ ์
๋ฐ์ดํธํ๋ฉด ๋จ
}
}
}
}
catch๋ฅผ ์ฐ์ง ์์๋ ์ด์ ์ ์ฝ๋์์๋ exception์ด ๋ฐ์ํ์ ๋, ์ element๊ฐ ์์ ๋์ง ์์ผ๋ฏ๋ก collect
๋๋ค๊ฐ ํธ์ถ๋์ง ์๋๋ค. catch
๋ฅผ ํตํด flow์ ํญ๋ชฉ์ emitํ ์ ์๋ค. ๋ฐ์ ์ฝ๋์์ repository ๋ ์ด์ด๋ ์บ์๋ ๊ฐ์ emitํ๊ณ ์๋ค.
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
์์ ์ฝ๋์์ exception์ด ๋ฐ์ํ๋ฉด collect
๋๋ค๊ฐ ํธ์ถ๋์ด์ exception์ผ๋ก ์ธํ ์ ํญ๋ชฉ์ด ์คํธ๋ฆผ์ ์ ์ก๋๋ค.
๋ค๋ฅธ CoroutineContext์์ ์คํํ๊ธฐ
๊ธฐ๋ณธ์ ์ผ๋ก flow
๋น๋์ producer๋ collectํ๋ ์ฝ๋ฃจํด์ CoroutineContext์์ ์คํ๋๋ค. ๋ค๋ฅธ CoroutineContext์์ ๊ฐ์ emitํ ์๊ฐ ์์ผ๋ฉฐ, ๊ทธ๋ด ๊ฒฝ์ฐ ๋์์ด ์ด์ํ๊ฒ ๋์๊ฐ ์๋ ์๋ค. ์๋ฅผ ๋ค์ด์, ์์ ์์ ์์ repository ๋ ์ด์ด๋ viewModelScope๊ฐ ์ฌ์ฉํ๋ Dispatchers.Main์์ ์์
์ ์คํํ๋ฉด ์๋๋ค.
flow์ CoroutineContext๋ฅผ ๋ณ๊ฒฝํ๋ ค๋ฉด intermediate ์ฐ์ฐ์์ธ flowOn
๋ฅผ ์จ์ผํ๋ค. flowOn
์ upstream flow์ CoroutineContext๋ฅผ ๋ณ๊ฒฝํด์ค๋ค. ์ฆ, flowOn
์ ์ ์๋ producer์ intermediate ์ฐ์ฐ์์ ์ ์ฉ๋๋ค. downstream flow(flowOn ์ดํ์ intermediate ์ฐ์ฐ์์ producer)๋ flowOn
์ ์ํฅ์ ๋ฐ์ง ์์ผ๋ฉฐ, flow์์ collectํ๋๋ฐ ์ฌ์ฉํ๋ CoroutineContext์์ ์คํ๋๋ค. flowOn
์ฐ์ฐ์๊ฐ ์ฌ๋ฌ๊ฐ๊ฐ ์๋ค๋ฉด, ๊ฐ ์ฐ์ฐ์๋ ํ์ฌ ์์น์์ upstream์ ๋ณ๊ฒฝํ๋ค.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // defaultDispatcher์์ ์คํ๋จ
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // defaultDispatcher์์ ์คํ๋จ
saveInCache(news)
}
// flowOn์ด upstream flow โ ์ ์ํฅ์ ๋ฏธ์นจ
.flowOn(defaultDispatcher)
// downstream flow โ ๋ ์ํฅ x
.catch { exception -> // consumer์ context์์ ์คํ๋จ
emit(lastCachedNews())
}
}
์์ ์ฝ๋์์ onEach
์ map
์ฐ์ฐ์๋ defaultDispatcher๋ฅผ ์ฌ์ฉํ๋๋ฐ ๋ฐํด, catch
์ฐ์ฐ์์ ์๋น์๋ viewModelScope์ ์ฌ์ฉ๋๋ Dispatchers.Main์์ ์คํ๋๋ค.
DataSource ๋ ์ด์ด๋ I/O ์์
์ ์ํํ๋ฏ๋ก, I/O ์์
์ ์ต์ ํ๋ ๋์คํจ์ฒ์ธ IO Dispatcher๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค.
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ flow
Flow๋ Jetpack
๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ํตํฉ๋๋ฉฐ ์ค์๊ฐ์ผ๋ก ๋ฐ์ดํธ๋ฅผ ์
๋ฐ์ดํธํด์ผ ํ๊ฑฐ๋, ๋ฌด์ ํ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ํ์ํ ๋ ๋งค์ฐ ์ ํฉํ๋ค.
Flow with Room
์ ์ฌ์ฉํ๋ฉด DB ๋ณ๊ฒฝ ์๋ฆผ์ ๋ฐ์ ์ ์๋ค. DAO๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ์ ์ค์๊ฐ ์
๋ฐ์ดํธ๋ฅผ ๋ฐ์ผ๋ ค๋ฉด, Flow ํ์
์ ๋ฐํํ๋ฉด ๋๋ค.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
์ด๋ ๊ฒ ์ ์ํด๋์ผ๋ฉด Example
ํ
์ด๋ธ์ด ๋ณ๊ฒฝ๋ ๋๋ง๋ค DB์ ์ํญ๋ชฉ์ด ํฌํจ๋ ์ ๋ชฉ๋ก์ด emit๋๋ค.
์ฝ๋ฐฑ ๊ธฐ๋ฐ API๋ฅผ flow๋ก ๋ณํํ๊ธฐ
callbackFlow
๋ ์ฝ๋ฐฑ ๊ธฐ๋ฐ API๋ฅผ flow๋ก ๋ณํํ ์ ์๋ flow ๋น๋์ด๋ค. ์๋ฅผ ๋ค์ด์ Firebase Firestore Android API๋ ์ฝ๋ฐฑ์ ์ฌ์ฉํ๋๋ฐ, ์ด API๋ฅผ flow๋ก ๋ณํํ๊ณ , Firebase DB ์
๋ฐ์ดํธ๋ฅผ ์์ ๋๊ธฐํ๋ ค๋ฉด ์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ์ด์ฉํ๋ฉด ๋๋ค.
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
๋น๋์๋ ๋ฌ๋ฆฌ callbackFlow
์์๋ send
ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋ค๋ฅธ CoroutineContext
์์ ๊ฐ์ emit ํ ์ ์๋ค. ๊ทธ๋ฆฌ๊ณ offer
ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด ๊ฐ์ ์ฝ๋ฃจํด์ ์ธ๋ถ๋ก emitํ ์ ์๋ค.
callbackFlow
๋ ๊ฐ๋
์ blocking queue
์ ๋งค์ฐ ์ ์ฌํ channel์ ์ฌ์ฉํ๋ค. channel์ capacity(๋ฒํผ๋ง์ด ๊ฐ๋ฅํ element์ ์ต๋ ๊ฐ์)๋ก ๊ตฌ์ฑ๋๋ค. callbackFlow
์์ ์์ฑ๋ ์ฑ๋์ ๊ธฐ๋ณธ capacity๋ element 64๊ฐ์ด๋ค. ์ ์ฒด ์ฑ๋์ ์๋ก์ด element๋ฅผ ์ถ๊ฐํ๋ ๊ฒฝ์ฐ (1) send
๋ ์๋ก์ด element๋ฅผ ์ํ ๊ณต๊ฐ์ด ์๊ธธ ๋๊น์ง producer๋ฅผ ์ ์งํ๋ค. ๋ฐ๋ฉด์ (2) offer
๋ ์ฑ๋์ element๋ฅผ ์ถ๊ฐํ์ง ์๊ณ ์ฆ์ false
๋ฅผ ๋ฐํํด๋ฒ๋ฆฐ๋ค.
[์ฐธ๊ณ ์ฌ์ดํธ]
์๋๋ก์ด๋ ๊ณต์ ๋ฌธ์ - Flow