tjohnn
12/16/2019, 3:04 PMkotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
@ExperimentalCoroutinesApi
fun <T> firebaseDataAsFlow(reference: DatabaseReference) = callbackFlow {
reference.addValueEventListener(object : ValueEventListener {
override fun onDataChange(dataSnapshot: DataSnapshot) {
val type = object : GenericTypeIndicator<T>() {}
offer(dataSnapshot.getValue(type))
}
override fun onCancelled(error: DatabaseError) {
cancel(CancellationException(error.details, error.toException()))
}
})
}
The exception gets thrown whenever offer()
is calledtseisel
12/16/2019, 3:12 PMawaitClose { /* dispose the listener here */ }
at the end of the block, otherwise the channel will close itself.
This is easy to forget it !Vsevolod Tolstopyatov [JB]
12/16/2019, 3:37 PMawaitClose
is forgottentjohnn
12/16/2019, 4:02 PMAdam Powell
12/17/2019, 2:12 AMcallbackFlow
and not for channelFlow
?Adam Powell
12/17/2019, 2:12 AMchannelFlow
to do that, but useful for the callback bridging case)tjohnn
12/17/2019, 6:58 AMAdam Powell
12/17/2019, 2:04 PMawaitClose {}
isn't necessarily an error or problem for a channelFlow
though, since if you're using it to serialize emits from multiple child coroutines instead of bridging to callbacks, the parent scope won't exit until the child jobs join, and the channel won't close until that happens. The error above comes from bridging to callbacks that don't participate in structured concurrency, so awaitClose {}
is used to hold the door open.tjohnn
12/17/2019, 7:01 PM