Websockets states with Kotlin StateFlow

Following our journey with Kotlin Flow on Android @Geev, we tried to share the Websockets states all over the app.

We wished to save user’s message in local when disconnection happened, to change icons on screen or to notify the user with an in-app message... At the same time, in different classes of course.

A regular flow was not useful here, we needed a special flow: a StateFlow.

Prerequisite: Websockets states🚦

With SocketIO, there are three reserved events to get connection’s information: EVENT_CONNECT, EVENT_CONNECT_ERROR and EVENT_DISCONNECT.

socketClient.on(Socket.EVENT_CONNECT) { Log.i(TAG, “connected”) } socketClient.on(Socket.EVENT_CONNECT_ERROR) { Log.e(TAG, “connection error: ${it[0]}”) } socketClient.on(Socket.EVENT_DISCONNECT) { Log.w(TAG, “disconnected”) }

In our app, we created an Enum for each state including an initial one WAITING:


A cold shower 🥶

As we implemented our flow, we firstly tried to call the same function.

We used a callbackFlow which allowed us to emit from callbacks API based. We listened to the sockets events and deployed our SocketState with trySend. Finally, we kept this flow alive with awaitClose.

fun listenState(): Flow<SocketState> = callbackFlow { socketClient.on(Socket.EVENT_CONNECT) { trySend(SocketState.CONNECTED) } socketClient.on(Socket.EVENT_CONNECT_ERROR) { trySend(SocketState.ERROR) } socketClient.on(Socket.EVENT_DISCONNECT) { trySend(SocketState.DISCONNECTED) } awaitClose { … } }

This was not a good idea.

A flow is cold: each call to a terminal operator (as collect, launchIn, single, etc.) triggers a new call of the flow’s block. Then each time, we created new listeners.

“A flow is cold because it is not active before the call to terminal operation, not active after and releasing all resources before returning from the call.” ¹

Beside, once connected, the socket does not emit a new connected state unless it disconnects. Therefore, new collectors didn’t get the actual state. Plus, the awaitClose kept the old flows alive.

So, how did we manage this? By changing our cold flow to a hot flow.

Changing to a hot Flow 🔥

A hot flow is active independently of the presence of collectors.

There are two types of hot flows: SharedFlow and StateFlow. The former is the parent of the latter. SharedFlow has more customization properties: number of events to replay for new subscribers, buffer overflow behavior, etc.

A StateFlow is a lightweight SharedFlow: it contains only the most recent value, has no replay cache for new subscribers except the current value and it must have an initial value.

We consumed our cold flow into a hot one thanks to the stateIn operator:

val state: StateFlow<SocketState> = listenState() .debounce(1000) … .stateIn(…)
  • val is used to declare this new flow — This is important because it needs to retain the same instance and sharing it to multiple subscribers.
  • StateFlow is now the type of flow.
  • listenState is our previous cold flow.
  • Optional: add operator like debounce.
  • stateIn operator is finally called.

StateIn operator 💪

This operator is really powerful. You can take an existing regular flow and convert it into a StateFlow.

“It is useful in situations when there is a cold flow that provides updates to the value of some state and is expensive to create and/or to maintain, but there are multiple subscribers that need to collect the most recent state value.” ²

Our final operator was defined as follows:

val state = listenState() … .stateIn( scope = CoroutineScope(context = Dispatchers.IO), started = SharingStarted.WhileSubscribed( replayExpirationMillis = 0), initialValue = SocketState.WAITING )
  • A scope: the coroutine scope on IO context where the StateFlow will be alive and the upstream flow will be executed,
  • A sharing strategy policy: the SharingStarted.WhileSubscribed strategy allows to cancel the upstream flow when there are no subscribers left — this prevents wasting resources when no one is listening,
  • An initial value: our SocketState.WAITING when we wait for a connection information from sockets events.

Listen to the states👂

We subscribed to our hot flow by observing val state like this example:

state.onEach { when (it) { SocketState.WAITING -> showLoading() SocketState.CONNECTED -> hideLoading() SocketState.ERROR -> showErrorState() SocketState.DISCONNECTED -> showOfflineState() } }.launchIn(scope = lifecycleScope)

We used launchIn operator to launch the coroutine into a lifecycleScope directly to reduce boilerplate. It is a terminal operator and it’s an extension to call launch and collect. Then we applied onEach to get the sockets states.

With this, we were able to do anything based on connection changes in different classes.

One last note 📓

Using WhileSubscribed strategy, the upstream is restarted again when the last subscriber leaves and a new subscriber comes after. So we ended up to get the initial value whereas the socket is still connected. And, as I said before, the socket does not emit a new connected state unless something changes.

So we had to check the connection and update the StateFlow before listening to the sockets events:

fun listenState(): Flow<SocketState> = callbackFlow { if (socketClient.isConnected()) { trySend(SocketState.CONNECTED) } … }

Therefore if the flow was restarted, we still had the actual connected state.

States everywhere 🎉

We succeed to create a one-to-many flow and to retrieve the actual state thanks to the StateFlow. We used a regular flow and consumed it with stateIn operator. With this, we were capable to listen to Websockets connection state in Activities, Fragments and so on.

If you found this post helpful, feel free to clap! 👏 Thanks for reading.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Florent Blot

Florent Blot

Maker, Mobile developer, Kotlin & Dart enthusiast, IoT & DIY addict. @Geev: https://www.geev.com