Am I doing something wrong here? I am trying to us...
# coroutines
t
Am I doing something wrong here? I am trying to use flow with firebase but I am getting
kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
Copy code
@ExperimentalCoroutinesApi
fun <T> firebaseDataAsFlow(reference: DatabaseReference) = callbackFlow {
        reference.addValueEventListener(object : ValueEventListener {

            override fun onDataChange(dataSnapshot: DataSnapshot) {
                val type = object : GenericTypeIndicator<T>() {}
                offer(dataSnapshot.getValue(type))
            }

            override fun onCancelled(error: DatabaseError) {
                cancel(CancellationException(error.details, error.toException()))
            }

        })
    }
The exception gets thrown whenever
offer()
is called
t
You need to call
awaitClose { /* dispose the listener here */ }
at the end of the block, otherwise the channel will close itself. This is easy to forget it !
👍 2
2
v
I wonder if we can unconditionally throw an exception with a meaningful stacktrace if
awaitClose
is forgotten
3
t
Thanks @tseisel
a
just for
callbackFlow
and not for
channelFlow
?
(I kind of hope so since it seems like it would be prohibitive for
channelFlow
to do that, but useful for the callback bridging case)
t
@Adam Powell I had the same exception when I tried channelFlow earlier, missing awaitClose{} was likely the problem too
a
"Missing"
awaitClose {}
isn't necessarily an error or problem for a
channelFlow
though, since if you're using it to serialize emits from multiple child coroutines instead of bridging to callbacks, the parent scope won't exit until the child jobs join, and the channel won't close until that happens. The error above comes from bridging to callbacks that don't participate in structured concurrency, so
awaitClose {}
is used to hold the door open.
t
Great ! thanks for the enlightenment