Hi, I want to wrap a legacy listener that fires mu...
# coroutines
s
Hi, I want to wrap a legacy listener that fires multiple times into a
Channel
. However I'm a bit lost here. Here's the simplified pseudo code:
Copy code
suspend fun Query.onSnapshotAsync(scope: CoroutineScope) =
    scope.produce {
        addSnapshotListener { snapshot ->
            send(snapshot.documents)
        }
    }
however at
send()
it gives me the error
suspension functions can be called only within coroutine body
. Usually I wrap legacy callbacks with
suspendCoroutine
but in this case this is not possible since the listener fires multiple times but a Continuation can only be resumed once. What is the best practice here?
Do I have to wrap
send()
in
scope.launch { send() }
or is there a better solution?
d
You can try using
offer
, which returns true if the call was successful.
Would a conflated channel be appropriate here? Where each snapshot is an update to the previous, so only the last sent element is needed
s
Yes, conflated is perfectly fine
d
Then just use
offer
with a conflated channel, it will always succeed unless the channel is closed or failed
s
Okay, thanks! 👍
Now I have another problem. Since
addSnapshotListener
adds an asynchronous listener, the
produce {}
block ist left immediatly. I think this is the reason why I receive a
Channel was closed
exception once
offer()
is called.
b
What do you want to happen if an exception occurs when you’re trying to send to the channel? Do you want your
addSnapshotListener
lambda to throw an exception? Do you want your coroutine scope to handle the exception? I would probably recommend to utilize your coroutine scope to handle the exception and to also not block your snapshot listener with offer. Your
onSnapshotAsync
method shouldn’t need to be suspending either.
Copy code
fun Query.onSnapshotAsync(scope: CoroutineScope) = scope.produce(capacity = Channel.CONFLATED) {
    addSnapshotListener { snapshot ->
        // utilizes your coroutine scope's exception handler
        // does not block your snapshot listener thread (potential bug if you move from a conflated channel)
        launch { send(snapshot.documents) }
    }
}
s
@bdawg.io don't I have the same issue as above with an immediately closed channel as
addSnapshotListener
is non-blocking?
b
Yes, sorry i meant to update that.
Copy code
= Channel<List<Snapshot.Document>>(capacity = Channel.CONFLATED).apply {
    // attach channel to the lifecycle of the scope (which is based on the Job, if one is attached)
    scope.coroutineContext[Job]?.invokeOnCompletion { 
        close()
    }
    addSnapshotListener { ... }
}
d
Dont forget to unregister the listener when the channel is closed
s
The API doesn't provide an unregister
Cloud Firestore API on Android that is
d
It must do right?
@bdawg.io Do I really have to explicitly close the channel? Doesn't this happen automatically when the associated scope is terminated/cancelled?
d
Returns A registration object that can be used to remove the listener.
s
Oops I missed that 😣
b
Nope, because it’s not attached to a scope. Channels themselves don’t have CoroutineScopes. Not attaching it means the consumer is responsible for closing the channel or else it will be leaked
👍 1
d
Theres no associated coroutine context when the channel is created like this
I think you can just use
produce
and join on the scope's job at the end
s
How do I invoke something when the channel is closed or do I have to attach this functionality to the Job's invokeOnCompletion?
d
Though it might cause a...
channel.invokeOnClose
Deadlock is what I was gonna say. I swear there was a conversation somewhere about how to do this cleanly.
b
You can try to join on the context’s job, but it will immediately close the channel if there’s no job attached to the scope. So you have to weigh your options and document what will happen when the scope doesn’t have a Job attached: a) If you use
produce
and try joining, your channel will immediately close (or you could throw an IllegalArgumentException if the scope doesn’t have a
Job
attached). b) If you use
Channel
, your channel may leak if the consumer doesn’t close it when it is done (IMO is a bug itself). Choose your poison haha.
d
I would go with the first option and require that a lifecycle is present
s
I think I go with the second option because if a consumer requests a
Channel
and stores the reference in a property, it should also be responsible for closing the channel, right?
b
I think they’re both very reasonable. The first one is a little more in line with structured concurrency. If the intent is have only 1 worker consuming a given channel, the worker should close the channel when it is no longer accepting items and cleanup after itself IMO