Websockets states with Kotlin StateFlow
Following our journey with Kotlin Flow on Android @Geev, we tried to share the Websockets states all over the app.
We wished to save user’s message in local when disconnection happened, to change icons on screen or to notify the user with an in-app message... At the same time, in different classes of course.
A regular flow was not useful here, we needed a special flow: a StateFlow.
Prerequisite: Websockets states🚦
With SocketIO, there are three reserved events to get connection’s information: EVENT_CONNECT
, EVENT_CONNECT_ERROR
and EVENT_DISCONNECT
.
In our app, we created an Enum
for each state including an initial one WAITING
:
A cold shower 🥶
As we implemented our flow, we firstly tried to call the same function.
We used a callbackFlow
which allowed us to emit from callbacks API based. We listened to the sockets events and deployed our SocketState
with trySend
. Finally, we kept this flow alive with awaitClose
.
This was not a good idea.
A flow is cold: each call to a terminal operator (as collect
, launchIn
, single
, etc.) triggers a new call of the flow’s block. Then each time, we created new listeners.
“A flow is cold because it is not active before the call to terminal operation, not active after and releasing all resources before returning from the call.” ¹
Beside, once connected, the socket does not emit a new connected state unless it disconnects. Therefore, new collectors didn’t get the actual state. Plus, the awaitClose
kept the old flows alive.
So, how did we manage this? By changing our cold flow to a hot flow.
Changing to a hot Flow 🔥
A hot flow is active independently of the presence of collectors.
There are two types of hot flows: SharedFlow
and StateFlow
. The former is the parent of the latter. SharedFlow
has more customization properties: number of events to replay for new subscribers, buffer overflow behavior, etc.
A StateFlow
is a lightweight SharedFlow
: it contains only the most recent value, has no replay cache for new subscribers except the current value and it must have an initial value.
We consumed our cold flow into a hot one thanks to the stateIn
operator:
val
is used to declare this new flow — This is important because it needs to retain the same instance and sharing it to multiple subscribers.StateFlow
is now the type of flow.listenState
is our previous cold flow.- Optional: add operator like
debounce
. stateIn
operator is finally called.
StateIn operator 💪
This operator is really powerful. You can take an existing regular flow and convert it into a StateFlow
.
“It is useful in situations when there is a cold flow that provides updates to the value of some state and is expensive to create and/or to maintain, but there are multiple subscribers that need to collect the most recent state value.” ²
Our final operator was defined as follows:
- A scope: the coroutine scope on IO context where the
StateFlow
will be alive and the upstream flow will be executed, - A sharing strategy policy: the
SharingStarted.WhileSubscribed
strategy allows to cancel the upstream flow when there are no subscribers left — this prevents wasting resources when no one is listening, - An initial value: our
SocketState.WAITING
when we wait for a connection information from sockets events.
Listen to the states👂
We subscribed to our hot flow by observing val state
like this example:
We used launchIn
operator to launch
the coroutine into a lifecycleScope
directly to reduce boilerplate. It is a terminal operator and it’s an extension to call launch
and collect
. Then we applied onEach
to get the sockets states.
With this, we were able to do anything based on connection changes in different classes.
One last note 📓
Using WhileSubscribed
strategy, the upstream is restarted again when the last subscriber leaves and a new subscriber comes after. So we ended up to get the initial value whereas the socket is still connected. And, as I said before, the socket does not emit a new connected state unless something changes.
So we had to check the connection and update the StateFlow
before listening to the sockets events:
Therefore if the flow was restarted, we still had the actual connected state.
States everywhere 🎉
We succeed to create a one-to-many flow and to retrieve the actual state thanks to the StateFlow
. We used a regular flow and consumed it with stateIn
operator. With this, we were capable to listen to Websockets connection state in Activities, Fragments and so on.
If you found this post helpful, feel free to clap! 👏 Thanks for reading.
- ^ Cold flows, hot Channels by Roman Elizarov
- ^ StateIn operator official documentation