Domain Model’s StateFlow Sharing

A lot of applications have an app-wide state defined in its domain model. Making it correctly reactive may not be so trivial. Let’s explore.

Apps have state all over the source code. ViewModels have their screen’s state. Storage has the app’s state, and sometimes we need to keep an app-wide state in our domain model layer. These data are valid across many screens or “navigation graphs”, they are a runtime type of storage. You may keep them separated in specifically named model classes.

The difficult part is how to model the derived state, how to make it “live” and be reactive, and at the same time do not compute when the app-wide state is not needed – especially when the app goes to background. Let’s have screens A ->B -> C with natural transition from A to B to C. We need to access the state in screen A and C and maybe elsewhere. Let’s have the engine with it’s derived state we need.

class NewsRepository(
    private val usersRepository: UsersRepository,
) {
    val news: StateFlow<List<Article>> = userRepository.user
        .flow { user ->
            fetchNews(user.id)
        }
        .stateIn(GlobalScope, SharingStarted.Eagerly, emptyList())
}

The example is rather explanatory and the behavior should be clear.

  • The GlobalScope is used to having the valid data for all the app life-time,
  • The SharingStarted.Eagerly is used to allow have the correct state’s value as soon as possible, ideally for the first access.

But this also means that the flow runs even when the app is in the background. In the end it means that the user repository observes the Room’s storage even when the app is in the background and we do not need those observations.

We may change the SharingStarted implementation to SharingStarted.WhileSubscribed. Before I go into details about what to do with this, let’s explore what WhileSubscribed can do. It contains two timeouts – sharing timeout and value storage timeout. The first timeout stops “sharing” (i.e. observing) the upstream after there is no subscribed and sharing timeout timeouts. Then we measure the value storage timeout – if still no observer subscribes to the flow, it will reset the flow value to the initial value and forgets the latest value.

This way the upstream flow is observed only when there is an observer of the “news” flow. That’s nice but it also means that we may lose latest value (either a new one, or already observed) when going from screen A to C for too long.

To solve this, we want to keep the value and observation while the app is in background, but when going to background, we “enable” our timeouts.

First, we need a flow of the app state, i.e. events of being in foreground and background. This may be nicely done with callbackFlow and ProcessLifecycleOwner.

internal fun Lifecycle.asFlow(): Flow<Lifecycle.State> = callbackFlow {
    val observer = LifecycleEventObserver { _, event ->
        trySend(event.targetState)
    }
    addObserver(observer)
    awaitClose { removeObserver(observer) }
}

val appLifecycle = ProcessLifecycleOwner.get().lifecycle.asFlow()

The SharingStarted interface requires implementing a command method that will send commands to drive the flow. Instead of depending on the subscribe changes we will send commands based on the appLifecycle.

We will keep the value forever. After sharing timeout we are going to stop the observation.

class AppAliveSharing(
    private val stopTimeout: Duration,
) : SharingStarted {
    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
        // ...
    }
}

We will utilize mapLatest on the flow.

  • map — simply map the app state to a wanted command (state);
  • latest — this way we can cancel emission of state which is delayed by the timeout

Last but not least, prevent repeated command emission by distinctUntilChanged().

class AppAliveSharing(
    private val stopTimeout: Duration,
) : SharingStarted {
    private val appLifecycle = ProcessLifecycleOwner.get().lifecycle.asFlow()

    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
        return appLifecycle
            .flowOn(Dispatchers.Main)
            .transformLatest { state ->
                if (state.isAtLeast(Lifecycle.State.STARTED)) {
                    emit(SharingCommand.START)
                } else {
                    delay(stopTimeout)
                    emit(SharingCommand.STOP)
                }
            }
            .distinctUntilChanged()
    }
}

This way you can simply keep the flow subscribed while the app in in foreground:

class NewsRepository(
    private val usersRepository: UsersRepository,
) {
    val news: StateFlow<List<Article>> = userRepository.user
        .flow { user ->
            fetchNews(user.id)
        }
        .stateIn(GlobalScope, AppAliveSharing(Duration.minutes(1)), emptyList())
}

Conclusion

SharingStarted is an extremely powerful interface and you may drive your sharing by various conditions. Don’t be afraid and try to experiment. Having data “alive” why users use your app is a common request. However, when users leave the app for a longer time, you are free to stop observing and free resources. Optionally you may forget the value with the SharingCommand.STOP_AND_RESET_REPLAY_CACHE command.