Websockets states with Kotlin StateFlow
We wished to save user’s message in local when disconnection happened, to change icons on screen or to notify the user with an in-app message... At the same time, in different classes of course.
A regular flow was not useful here, we needed a special flow: a StateFlow.
Prerequisite: Websockets states🚦
With SocketIO, there are three reserved events to get connection’s information:
In our app, we created an
Enum for each state including an initial one
A cold shower 🥶
As we implemented our flow, we firstly tried to call the same function.
We used a
callbackFlow which allowed us to emit from callbacks API based. We listened to the sockets events and deployed our
trySend. Finally, we kept this flow alive with
This was not a good idea.
A flow is cold: each call to a terminal operator (as
single, etc.) triggers a new call of the flow’s block. Then each time, we created new listeners.
“A flow is cold because it is not active before the call to terminal operation, not active after and releasing all resources before returning from the call.” ¹
Beside, once connected, the socket does not emit a new connected state unless it disconnects. Therefore, new collectors didn’t get the actual state. Plus, the
awaitClose kept the old flows alive.
So, how did we manage this? By changing our cold flow to a hot flow.
Changing to a hot Flow 🔥
A hot flow is active independently of the presence of collectors.
There are two types of hot flows:
StateFlow. The former is the parent of the latter.
SharedFlow has more customization properties: number of events to replay for new subscribers, buffer overflow behavior, etc.
StateFlow is a lightweight
SharedFlow: it contains only the most recent value, has no replay cache for new subscribers except the current value and it must have an initial value.
We consumed our cold flow into a hot one thanks to the
valis used to declare this new flow — This is important because it needs to retain the same instance and sharing it to multiple subscribers.
StateFlowis now the type of flow.
listenStateis our previous cold flow.
- Optional: add operator like
stateInoperator is finally called.
StateIn operator 💪
This operator is really powerful. You can take an existing regular flow and convert it into a
“It is useful in situations when there is a cold flow that provides updates to the value of some state and is expensive to create and/or to maintain, but there are multiple subscribers that need to collect the most recent state value.” ²
Our final operator was defined as follows:
- A scope: the coroutine scope on IO context where the
StateFlowwill be alive and the upstream flow will be executed,
- A sharing strategy policy: the
SharingStarted.WhileSubscribedstrategy allows to cancel the upstream flow when there are no subscribers left — this prevents wasting resources when no one is listening,
- An initial value: our
SocketState.WAITINGwhen we wait for a connection information from sockets events.
Listen to the states👂
We subscribed to our hot flow by observing
val state like this example:
launchIn operator to
launch the coroutine into a
lifecycleScope directly to reduce boilerplate. It is a terminal operator and it’s an extension to call
collect. Then we applied
onEach to get the sockets states.
With this, we were able to do anything based on connection changes in different classes.
One last note 📓
WhileSubscribed strategy, the upstream is restarted again when the last subscriber leaves and a new subscriber comes after. So we ended up to get the initial value whereas the socket is still connected. And, as I said before, the socket does not emit a new connected state unless something changes.
So we had to check the connection and update the
StateFlow before listening to the sockets events:
Therefore if the flow was restarted, we still had the actual connected state.
States everywhere 🎉
We succeed to create a one-to-many flow and to retrieve the actual state thanks to the
StateFlow. We used a regular flow and consumed it with
stateIn operator. With this, we were capable to listen to Websockets connection state in Activities, Fragments and so on.
If you found this post helpful, feel free to clap! 👏 Thanks for reading.