Handle exceptions in callbackFlow with Kotlin

Florent Blot
5 min readMar 3, 2022
Photo by Buzz Andersen on Unsplash

In our android app @Geev, we tried Kotlin Flow to handle Websockets calls for our new instant messaging. We mostly used callbackFlow builder to emit and receive data. This builder is easy to use and can be kept alive listening to sockets events.

But we struggled to catch unexpected exceptions inside this builder. Even a try/catch clause around the flow’s collector was useless. We needed to properly handle unexpected exceptions in this flow builder.

Catching exceptions 👨‍🚒

Assuming we have a simple flow numbers() to collect integers from 0 to 2, but at the end, something happens and it throws an unexpected exception — simulates by the check function below:

fun numbers(): Flow<Int> = callbackFlow { trySend(0) trySend(1) check(0 == 1) // throws IllegalStateException }

When collecting it, we apply catch operator to handle exceptions. This operator catches only upstream exceptions (all exceptions from the operators above and not below it).

numbers() .onEach { println(“Getting $it”) } .catch { println(“Caught $it”) } .launchIn(lifecycleScope)

That’s why we use onEach with launchIn instead of collect alone, to be sure that catch will catches every exceptions occurring in the upstream flow.

When collecting, it gives this output:

Getting: 0
Getting: 1
Caught: java.lang.IllegalStateException: Check failed

The numbers are well collected, and even the thrown exception is handled.

Callback’s case 💥

What will happen if the exception is not thrown in the builder but inside a callback? Let’s found out.

We need to create a simple callback which will give us new values to emit:

var callback: NumberCallback? = null interface NumberCallback { fun onNextNumber(i: Int) }

We redesign numbers() to consume this new callback:

fun numbers() = callbackFlow { callback = object: NumberCallback { override fun onNextNumber(i: Int) { check(i < 2) trySend(i) } } awaitClose { callback = null } }

Let’s walkthrough this snippet above:

  • We still use our callbackFlow builder returning a Flow<Int>.
  • The var callback defines our new listener: the callback.
  • We override onNextNumber to received the new integer i from the aforementioned callback.
  • The check function verifies that received integers are strictly lower than 2. If not, it throws an IllegalStateException (like our first snippet, it will simulate an unexpected exception).
  • If the flow goes on, we emit the new integers into it with trySend.
  • And awaitClose keeps the flow running. It is mandatory to let the stream opened and to cleanup the resources after the completion, preventing memory leaks.

Finally, thanks to the repeat function, we send 0 to 2 (our index it below) through our callback:

repeat(3) { delay(200) callback?.onNextNumber(it) }

Now, using the same collector as we called previously, we get this output:

Getting: 0 
Getting: 1
AndroidRuntime: FATAL EXCEPTION: main
Process: app, PID: 8141
java.lang.IllegalStateException: Check failed.
at MainActivity$numbers$1$1.onNextNumber(MainActivity.kt:40)
at MainActivity$onCreate$3.invokeSuspend(MainActivity.kt:32)
at BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

Wait... 😱 The app crashes unexpectedly despite the fact that we applied catch operator. Also, we end up in the same situation with a try/catch clause around the flow. The exception inside the callback is not handled.

Different scopes 🚧

Our first flow succeed to catch the exception whereas the second didn’t. Why?

The block of the flow builder is a suspend ProducerScope, it is an extension of a CoroutineScope. This interface defines a scope for new coroutines and, in case of an exception, this scope handles it. So, in our first snippet, it is thrown in the flow builder’s scope and be handled by our catch operator.

But in our second snippet, the exception is thrown inside the callback listener. The scope is no longer the coroutine’s scope therefore, the flow cannot handle it. It is the callback’s responsibility to deal with such exceptions.

Let’s surrounding the repeat function with a try/catch clause to verify:

try { repeat(3) { delay(200) callback?.onNextNumber(it) } } catch (e: Exception) { println(“Callback caught: $e”) }

Collecting it now confirms it, the callback’s log above is printed:

Getting 0
Getting 1
Callback caught: java.lang.IllegalStateException: Check failed.

Propagate the exception 🚨

How can we throw the exception into the right scope and trigger our catch operator then?

The ProducerScope is also an extension of a SendChannel. So under the hood, numbers() uses a Channel. In order to propagate the exception, we have two choices: cancelling its coroutine’s scope or closing its channel.

Cancelling the scope can be done with cancel. It is a CoroutineScope’s extension which also cancels its job and all its children. It throws a CancellationException and we are able to either specify a cause or to include a message with a specific exception.

fun CoroutineScope.cancel(cause: CancellationException? = null)fun CoroutineScope.cancel(message: String, cause: Throwable? = null)

On the other hand, we have close. It sends a special “close token” over the channel with an optional cause. We could add a specific exception as a non-null cause. This will close the channel and therefore it will invoke the flow completion.

abstract fun close(cause: Throwable? = null): Boolean

They both behave the same way as the flow will complete exceptionally therefore catch and onCompletion will be called. However, cancelling the scope with a cause is mainly used to provide a reason for debugging purpose. If you prefer to deal with the specific type of exception, you should use close.

Catch the unexpected 🦸‍♂️

Then, with close (or cancel), we have the ability to propagate the exception into the right scope.

We just have to surround our callback block with a try/catch clause and close the channel with a non-null cause:

fun numbers() = callbackFlow { callback = object: NumberCallback { override fun onNextNumber(i: Int) { try { check(i < 2) trySend(i) } catch (e: Exception) { close(e) } } } awaitClose { … } }

Collecting our flow will now gives us:

Getting: 0
Getting: 1
Caught: java.lang.IllegalStateException: Check failed

By closing the channel with the exception, we can now tigger our catch operator and handle any unexpected error.

Everything is caught 💪

Calling scope or channel functions, we are now able to trigger our catch operator from a different scope. Every exception from the callback to the builder, even from within the flow operators will be handled by catch.

If you found this post helpful, feel free to clap! 👏 Thanks for reading.



Florent Blot

Maker, Mobile developer, Kotlin & Dart enthusiast, IoT & DIY addict. @Geev: https://www.geev.com