android, coroutine,

StateFlow & SharedFlow

Ella Ella Follow Dec 29, 2021 ยท 5 mins read
StateFlow & SharedFlow
Share this

๐Ÿค–

๊ฐœ๋… 1๏ธโƒฃ1๏ธโƒฃ StateFlow & SharedFlow

StateFlow์™€ SharedFlow๋Š” flow์—์„œ state update๋ฅผ emitํ•˜๊ณ  ์—ฌ๋Ÿฌ consumer์—๊ฒŒ ๊ฐ’์„ emitํ•  ์ˆ˜ ์žˆ๋Š” Flow API์ด๋‹ค.

1๏ธโƒฃ StateFlow

StateFlow๋Š” observableํ•œ state holder flow๋กœ์„œ, ํ˜„์žฌ์˜ state์™€ ์ƒˆ๋กœ์šด state ์—…๋ฐ์ดํŠธ๋ฅผ collector์— emitํ•œ๋‹ค. StateFlow์˜ value ํ”„๋กœํผํ‹ฐ๋ฅผ ํ†ตํ•ด์„œ ํ˜„์žฌ state ๊ฐ’์„ ์ฝ์„ ์ˆ˜๋„ ์žˆ๋‹ค. state๋ฅผ ์—…๋ฐ์ดํŠธํ•˜์—ฌ flow์— ์ „์†กํ•˜๋ ค๋ฉด, MutableStateFlow ํด๋ž˜์Šค์˜ value์— ์ƒˆ๋กœ์šด ๊ฐ’์„ ํ• ๋‹นํ•˜๋ฉด ๋œ๋‹ค.
Android์—์„œ StateFlow๋Š” observableํ•œ mutable state๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์–ด์•ผํ•˜๋Š” ํด๋ž˜์Šค์— ๋งค์šฐ ์ ํ•ฉํ•˜๋‹ค. ์•„๋ž˜์˜ ์˜ˆ์—์„œ๋Š” View๊ฐ€ UI state์˜ update๋ฅผ ์ˆ˜์‹  ๋Œ€๊ธฐํ•˜๊ณ  configuration ๋ณ€๊ฒฝ์—๋„ ๊ธฐ๋ณธ์ ์œผ๋กœ UI์˜ state๊ฐ€ ์ง€์†๋˜๋„๋ก LatestNewsViewModel์—์„œ StateFlow๋ฅผ ๋…ธ์ถœ์‹œํ‚ค๊ณ  ์žˆ๋‹ค.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // ๋‹ค๋ฅธ ํด๋ž˜์Šค๊ฐ€ state๋ฅผ ์—…๋ฐ์ดํŠธํ•˜์ง€ ๋ชปํ•˜๊ฒŒ ํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฑ์—… ํ”„๋กœํผํ‹ฐ
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // ์‹ค์ œ UI์—์„œ๋Š” state๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๊ธฐ ์œ„ํ•ด์„œ uiState์—์„œ collectํ•จ
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // latest favorite news๋กœ ๋ทฐ๋ฅผ ์—…๋ฐ์ดํŠธํ•ด์คŒ 
                // MutableStateFlow์ธ _uiState์˜ value์— ๊ฐ’์„ ์„ค์ •ํ•จ
                // flow์— ์ƒˆ๋กœ์šด element๋ฅผ ์ถ”๊ฐ€ํ•ด์ฃผ๊ณ ,collector๋“ค์—๊ฒŒ ์—…๋ฐ์ดํŠธ! 
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

// LatestNews ํ™”๋ฉด์˜ state๋ฅผ ๋‚˜ํƒ€๋ƒ„ 
sealed class LatestNewsUiState {
    data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(exception: Throwable): LatestNewsUiState()
}

Producer๋Š” MutableStateFlow ์—…๋ฐ์ดํŠธ๋ฅผ ๋‹ด๋‹นํ•˜๋Š” ํด๋ž˜์Šค์ด๊ณ , consumer๋Š” StateFlow์—์„œ collect๋˜๋Š” ํด๋ž˜์Šค์ด๋‹ค. flow ๋นŒ๋”๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋นŒ๋“œ๋˜๋Š” cold stream๊ณผ ๋‹ฌ๋ฆฌ, StateFlow๋Š” hot stream์ด๋‹ค. ์ฆ‰ flow์—์„œ collectํ•ด๋„ producer ์ฝ”๋“œ๊ฐ€ trigger๋˜์ง€ ์•Š๋Š”๋‹ค. StateFlow๋Š” ํ•ญ์ƒ active ์ƒํƒœ์ด๊ณ  ๋ฉ”๋ชจ๋ฆฌ ๋‚ด์— ์žˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๊ฐ€๋น„์ง€ ์ปฌ๋ ‰์…˜์˜ ๋ฃจํŠธ์—์„œ reference๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ์—๋งŒ ๊ฐ€๋น„์ง€ ์ปฌ๋ ‰์…˜์—์„œ ๊ฐ€์ ธ๊ฐˆ ์ˆ˜ ์žˆ๋‹ค.
์ƒˆ๋กœ์šด consumer๊ฐ€ flow์—์„œ collectํ•˜๊ธฐ ์‹œ์ž‘ํ•˜๋ฉด, ์ŠคํŠธ๋ฆผ์˜ ๋งˆ์ง€๋ง‰ state์™€ ๋‹ค์Œ state๊ฐ€ ์ˆ˜์‹ ๋œ๋‹ค. ์•„๋ž˜์˜ ์ฝ”๋“œ์—์„œ View๋Š” ๋‹ค๋ฅธ flow๋“ค๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ StateFlow๋ฅผ ์ˆ˜์‹ ๋Œ€๊ธฐ(listen) ํ•œ๋‹ค.

lass LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // lifecycle scope์—์„œ ์ฝ”๋ฃจํ‹ด์„ launchํ•จ
        lifecycleScope.launch {
            // lifecycle์ด STARTED state ๋˜๋Š” ๊ทธ ์•ž์˜ state๋“ค์ด๊ฑฐ๋‚˜, STOPPED์ด๋ผ์„œ cancelํ•  ๋•Œ
            // repeatOnLifecycle ์ด ๋ธ”๋ก์„ launchํ•จ
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // flow๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•˜์—ฌ value๋“ค์„ listenํ•˜๊ธฐ ์‹œ์ž‘ํ•จ
                // lifecycle์ด STARTED์ผ๋•Œ ํŠธ๋ฆฌ๊ฑฐ๋˜๊ณ , lifecycle์ด STOPPED์ผ ๋•Œ collect๋ฅผ ๋ฉˆ์ถค
                latestNewsViewModel.uiState.collect { uiState ->
                    // ์ƒˆ๋กœ์šด ๊ฐ’์ด ์ˆ˜์‹ ๋˜์—ˆ์„ ๋•Œ!!
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

โš ๏ธ ์ฃผ์˜ํ•ด์•ผ ํ•  ์ 

UI๋ฅผ ์—…๋ฐ์ดํŠธํ•ด์•ผํ•  ๋•Œ, UI์—์„œ์˜ launch๋‚˜ launchIn์—์„œ repeatOnLifecycle ์—†์ด ๋‹ค์ด๋ ‰ํŠธ๋กœ flow๋ฅผ collectํ•˜๋ฉด ์•ˆ๋œ๋‹ค. launch์™€ launchIn๋Š” ๋ทฐ๊ฐ€ ํ‘œ์‹œ๋˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ์—๋„ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ์ด๋กœ์ธํ•ด ์•ฑ์ด ๋‹ค์šด๋  ์ˆ˜๋„ ์žˆ์œผ๋ฏ€๋กœ repeatOnLifecycle API๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์•ฑ์ด ๋‹ค์šด๋˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€ํ•ด์ค˜์•ผ ํ•œ๋‹ค.!!

๐ŸŒŸ flow๋ฅผ StateFlow๋กœ ๋ณ€ํ™˜ํ•˜๋ ค๋ฉด?

stateIn์ด๋ผ๋Š” intermediate ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

๐Ÿคผโ€โ™‚๏ธ StateFlow vs LiveData

โค๏ธ ๊ณตํ†ต์ 

  • observable ๋ฐ์ดํ„ฐ ํ™€๋” ํด๋ž˜์Šค์ž„
  • ์•ฑ ์•„ํ‚คํ…์ฒ˜์—์„œ ๋น„์Šทํ•œ ํŒจํ„ด์œผ๋กœ ์“ฐ์ž„

๐Ÿ’” ์ฐจ์ด์ 

  • StateFlow
    • ์ดˆ๊ธฐ ์ƒํƒœ๋ฅผ ์ƒ์„ฑ์ž์— ์ „๋‹ฌํ•ด์•ผ ํ•จ
    • ๋ทฐ๊ฐ€ STOPPED ์ƒํƒœ๊ฐ€ ๋˜์—ˆ์„ ๋•Œ ์ž๋™์œผ๋กœ collect๋ฅผ ์ค‘์ง€ํ•˜์ง€ ์•Š์Œ. ์ž๋™์œผ๋กœ ์ค‘์ง€๋˜๊ฒŒ ํ•˜๋ ค๋ฉด Lifecycle.repeatOnLifecycle ๋ธ”๋ก์—์„œ flow๋ฅผ collectํ•ด์•ผ ํ•จ
  • LiveData
    • ์ดˆ๊ธฐ ์ƒํƒœ๋ฅผ ์ƒ์„ฑ์ž์— ์ „๋‹ฌํ•˜์ง€ ์•Š์Œ
    • ๋ทฐ๊ฐ€ STOPPED ์ƒํƒœ๊ฐ€ ๋˜์—ˆ์„ ๋•Œ cosumer๋ฅผ ์ž๋™์œผ๋กœ unregisterํ•จ

cold stream โžก๏ธ hot stream ๋ณ€ํ™˜? ๐ŸŒŸshareIn๐ŸŒŸ

StateFlow๋Š” hot stream์œผ๋กœ flow๊ฐ€ collect๋˜๋Š” ๋™์•ˆ, ๊ทธ๋ฆฌ๊ณ  ๊ฐ€๋น„์ง€ ์ปฌ๋ ‰์…˜ ๋ฃจํŠธ์—์„œ ๋‹ค๋ฅธ ๋ ˆํผ๋Ÿฐ์Šค๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ์— ๋ฉ”๋ชจ๋ฆฌ์— ๋‚จ์•„์žˆ๋‹ค. shareIn ์—ฐ์‚ฐ์ž๋ฅผ ํ•˜์šฉํ•˜์—ฌ cold stream์„ hot stream์œผ๋กœ ์ „ํ™˜์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.
๊ฐ collector์—์„œ ์ƒˆ๋กœ์šด flow๋ฅผ ๋งŒ๋“ค ํ•„์š” ์—†์ด flow์—์„œ ์ƒ์„ฑํ•œ callbackFlow๋ฅผ ์“ฐ๋ฉด Firestore์—์„œ ๊ฐ€์ ธ์˜จ ๋ฐ์ดํ„ฐ๋ฅผ shareIn์„ ํ†ตํ•ด collector๋“ค๋ผ๋ฆฌ ๊ณต์œ ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋ฅผ ์œ„ํ•ด์„œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋‚ด์šฉ์„ ์ „๋‹ฌํ•ด์ค˜์•ผ ํ•œ๋‹ค.

  • flow๋ฅผ ๊ณต์œ ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š” CoroutineScope. ๊ณต์œ  Flow๋ฅผ ํ•„์š”ํ•œ ๋งŒํผ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•ด์„œ, ์ด scope๋Š” consumer๋ณด๋‹ค ์˜ค๋ž˜ ์ง€์†๋˜์–ด์•ผ ํ•จ
  • ๊ฐ๊ฐ์˜ ์ƒˆ๋กœ์šด collector๋กœ replayํ•  item์˜ ๊ฐœ์ˆ˜
  • start behavior ์ •์ฑ…
    class NewsRemoteDataSource(...,
      private val externalScope: CoroutineScope,
    ) {
      val latestNews: Flow<List<ArticleHeadline>> = flow {
          ...
      }.shareIn(
          externalScope,
          replay = 1,
          started = SharingStarted.WhileSubscribed()
      )
    }  
    

    ์œ„์˜ ์ฝ”๋“œ์—์„œ lastNews flow๋Š” ๋งˆ์ง€๋ง‰์œผ๋กœ emitํ•œ item์„ ์ƒˆ๋กœ์šด collector๋กœ replayํ•˜๊ณ , externalScope๊ฐ€ active ์ƒํƒœ์ด๊ณ  activeํ•œ collector๊ฐ€ ์žˆ๋Š” ํ•œ, active ์ƒํƒœ๋กœ ์œ ์ง€๋œ๋‹ค. SharingStarted.WhileSubscribed() ์ •์ฑ…์€ activeํ•œ subscriber๊ฐ€ ์žˆ๋Š” ๋™์•ˆ์—๋Š” upstream producer๋ฅผ active ์ƒํƒœ๋กœ ์œ ์ง€ํ•œ๋‹ค. ์—ฌ๊ธฐ์— ๋‹ค๋ฅธ ์‹œ์ž‘ ์ •์ฑ„๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด์„œ SharingStarted.Eagerly๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ producer๋ฅผ ์ฆ‰์‹œ ์‹œ์ž‘ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. ์•„๋‹ˆ๋ฉด SharingStarted.Lazily๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ฒซ ๋ฒˆ์งธ subscriber๊ฐ€ ํ‘œ์‹œ๋œ ํ›„์—์•ผ ๊ณต์œ ๋ฅผ ์‹œ์ž‘ํ•˜๊ณ , Flow๋ฅผ ์˜๊ตฌ์ ์œผ๋กœ active ์ƒํƒœ๋กœ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค.

2๏ธโƒฃ SharedFlow

shareIn ํ•จ์ˆ˜๋Š” SharedFlow๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. SharedFlow๋Š” flow๋ฅผ collectํ•˜๋Š” ๋ชจ๋“  consumer์—๊ฒŒ ๊ฐ’์„ emitํ•˜๋Š” hot stream์ด๋‹ค. SharedFlow๋Š” StateFlow๊ฐ€ highly-configurableํ•˜๊ฒŒ ์ผ๋ฐ˜ํ™”๋œ Flow์ด๋‹ค. shareIn์„ ์“ฐ์ง€ ์•Š๊ณ ๋„ SharedFlow๋ฅผ ๋งŒ๋“ค ์ˆ˜๋„ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋ชจ๋“  ์ฝ˜ํ…์ธ ๊ฐ€ ์ฃผ๊ธฐ์ +๋™์‹œ์— ์ƒˆ๋กœ๊ณ ์นจ๋˜๋„๋ก ์•ฑ์— ํ‹ฑ์„ ์ „์†กํ•˜๋Š” SharedFlow๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์•„๋ž˜์˜ ์ฝ”๋“œ์—์„œ TickHandler๋Š” SharedFlow๋ฅผ exposeํ•ด์„œ ๋‹ค๋ฅธ ํด๋ž˜์Šค๊ฐ€ ์ƒˆ๋กœ๊ณ ์นจํ•˜๋Š” ํƒ€์ด๋ฐ์„ ์•Œ ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค€๋‹ค. StateFlow์ฒ˜๋Ÿผ ํด๋ž˜์Šค์—์„œ MutableSharedFlow ํƒ€์ž…์„ ์‚ฌ์šฉํ•˜์—ฌ item์„ flow๋กœ emitํ•œ๋‹ค.

// ์•ฑ์˜ ์ฝ˜ํ…์ธ ๊ฐ€ ๋ฆฌํ”„๋ ˆ์‰ฌ๋˜์–ด์•ผํ•  ๋•Œ centeralize์‹œ์ผœ์ฃผ๋Š” TickHandler ํด๋ž˜์Šค
class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow

    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
}

class NewsRepository(
    ...,
    private val tickHandler: TickHandler,
    private val externalScope: CoroutineScope
) {
    init {
        externalScope.launch {
            // tick update๋ฅผ listen ์ค‘...
            tickHandler.tickFlow.collect {
                refreshLatestNews()
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

๐ŸŒŸ SharedFlow ๋™์ž‘์„ ์ปค์Šคํ…€ํ•˜๋Š” ๋‘ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•

  • replay : ์ด์ „์— emitํ•œ ์—ฌ๋Ÿฌ ๊ฐ’๋“ค์„ ์ƒˆ๋กœ์šด subscriber์—๊ฒŒ ๋‹ค์‹œ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Œ
  • onBufferOverflow : ๋ฒ„ํผ์— ์ „์†กํ•  item์œผ๋กœ ๊ฐ€๋“ ์ฐผ์„ ๋•Œ ์–ด๋–ป๊ฒŒ ํ• ๊ฑด์ง€์— ๋Œ€ํ•ด ์ •์ฑ…์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Œ. ๊ธฐ๋ณธ๊ฐ’์€ ํ˜ธ์ถœ์ž๋ฅผ ์ •์ง€์‹œํ‚ค๋Š” BufferOverflow.SUSPEND์ž„. ๋‹ค๋ฅธ ์˜ต์…˜์œผ๋กœ๋Š” DROP_LATEST, DROP_OLDEST๊ฐ€ ์žˆ์Œ ๊ทธ๋ฆฌ๊ณ  MutableSharedFlow์˜ ํ”„๋กœํผํ‹ฐ ์ค‘์— subscriptionCount๊ฐ€ ์žˆ๋Š”๋ฐ, subscriptionCount๋ฅผ ํ†ตํ•ด activeํ•œ colletor์˜ ๊ฐœ์ˆ˜๋ฅผ ์•Œ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— business logic์„ ์ตœ์ ํ™”์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. MutableSharedFlow์—๋Š” ๋˜ํ•œ resetReplayCache๋ผ๋Š” ํ•จ์ˆ˜๋„ ์žˆ๋Š”๋ฐ, resetReplayCache๋Š” flow์— ์ „์†ก๋œ ์ตœ์‹  ์ •๋ณด๋ฅผ replayํ•˜์ง€ ์•Š๊ณ  ์‹ถ์„ ๋•Œ ์“ธ ์ˆ˜ ์žˆ๋‹ค.

[์ฐธ๊ณ  ์‚ฌ์ดํŠธ]
์•ˆ๋“œ๋กœ์ด๋“œ ๊ณต์‹ ๋ฌธ์„œ - StateFlow and SharedFlow

Join Newsletter
Get the latest news right in your inbox. We never spam!
Ella
Written by Ella Follow
Android Developer, love to explore new ideas and write on my morning coffee!