Handle exceptions in callbackFlow with Kotlin
In our android app @Geev, we tried Kotlin Flow to handle Websockets calls for our new instant messaging. We mostly used callbackFlow
builder to emit and receive data. This builder is easy to use and can be kept alive listening to sockets events.
But we struggled to catch unexpected exceptions inside this builder. Even a try/catch
clause around the flow’s collector was useless. We needed to properly handle unexpected exceptions in this flow builder.
Catching exceptions 👨🚒
Assuming we have a simple flow numbers()
to collect integers from 0 to 2, but at the end, something happens and it throws an unexpected exception — simulates by the check
function below:
When collecting it, we apply catch
operator to handle exceptions. This operator catches only upstream exceptions (all exceptions from the operators above and not below it).
That’s why we use onEach
with launchIn
instead of collect
alone, to be sure that catch
will catches every exceptions occurring in the upstream flow.
When collecting, it gives this output:
Getting: 0
Getting: 1
Caught: java.lang.IllegalStateException: Check failed
The numbers are well collected, and even the thrown exception is handled.
Callback’s case 💥
What will happen if the exception is not thrown in the builder but inside a callback? Let’s found out.
We need to create a simple callback which will give us new values to emit:
We redesign numbers()
to consume this new callback:
Let’s walkthrough this snippet above:
- We still use our
callbackFlow
builder returning aFlow<Int>
. - The
var callback
defines our new listener: the callback. - We override
onNextNumber
to received the new integeri
from the aforementioned callback. - The
check
function verifies that received integers are strictly lower than 2. If not, it throws anIllegalStateException
(like our first snippet, it will simulate an unexpected exception). - If the flow goes on, we emit the new integers into it with
trySend
. - And
awaitClose
keeps the flow running. It is mandatory to let the stream opened and to cleanup the resources after the completion, preventing memory leaks.
Finally, thanks to the repeat
function, we send 0 to 2 (our index it
below) through our callback:
Now, using the same collector as we called previously, we get this output:
Getting: 0
Getting: 1
AndroidRuntime: FATAL EXCEPTION: main
Process: app, PID: 8141
java.lang.IllegalStateException: Check failed.
at MainActivity$numbers$1$1.onNextNumber(MainActivity.kt:40)
at MainActivity$onCreate$3.invokeSuspend(MainActivity.kt:32)
at BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
...
Wait... 😱 The app crashes unexpectedly despite the fact that we applied catch
operator. Also, we end up in the same situation with a try/catch
clause around the flow. The exception inside the callback is not handled.
Different scopes 🚧
Our first flow succeed to catch the exception whereas the second didn’t. Why?
The block
of the flow builder is a suspend ProducerScope, it is an extension of a CoroutineScope. This interface defines a scope for new coroutines and, in case of an exception, this scope handles it. So, in our first snippet, it is thrown in the flow builder’s scope and be handled by our catch
operator.
But in our second snippet, the exception is thrown inside the callback listener. The scope is no longer the coroutine’s scope therefore, the flow cannot handle it. It is the callback’s responsibility to deal with such exceptions.
Let’s surrounding the repeat
function with a try/catch
clause to verify:
Collecting it now confirms it, the callback’s log above is printed:
Getting 0
Getting 1
Callback caught: java.lang.IllegalStateException: Check failed.
Propagate the exception 🚨
How can we throw the exception into the right scope and trigger our catch
operator then?
The ProducerScope is also an extension of a SendChannel. So under the hood, numbers()
uses a Channel. In order to propagate the exception, we have two choices: cancelling its coroutine’s scope or closing its channel.
Cancelling the scope can be done with cancel
. It is a CoroutineScope’s extension which also cancels its job and all its children. It throws a CancellationException and we are able to either specify a cause or to include a message with a specific exception.
fun CoroutineScope.cancel(cause: CancellationException? = null)fun CoroutineScope.cancel(message: String, cause: Throwable? = null)
On the other hand, we have close
. It sends a special “close token” over the channel with an optional cause. We could add a specific exception as a non-null cause. This will close the channel and therefore it will invoke the flow completion.
abstract fun close(cause: Throwable? = null): Boolean
They both behave the same way as the flow will complete exceptionally therefore catch
and onCompletion
will be called. However, cancelling the scope with a cause is mainly used to provide a reason for debugging purpose. If you prefer to deal with the specific type of exception, you should use close
.
Catch the unexpected 🦸♂️
Then, with close
(or cancel
), we have the ability to propagate the exception into the right scope.
We just have to surround our callback block with a try/catch
clause and close the channel with a non-null cause:
Collecting our flow will now gives us:
Getting: 0
Getting: 1
Caught: java.lang.IllegalStateException: Check failed
By closing the channel with the exception, we can now tigger our catch
operator and handle any unexpected error.
Everything is caught 💪
Calling scope or channel functions, we are now able to trigger our catch
operator from a different scope. Every exception from the callback to the builder, even from within the flow operators will be handled by catch
.
If you found this post helpful, feel free to clap! 👏 Thanks for reading.