https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

svenjacobs

02/11/2019, 11:48 AM
Hi, I want to wrap a legacy listener that fires multiple times into a
Channel
. However I'm a bit lost here. Here's the simplified pseudo code:
Copy code
suspend fun Query.onSnapshotAsync(scope: CoroutineScope) =
    scope.produce {
        addSnapshotListener { snapshot ->
            send(snapshot.documents)
        }
    }
however at
send()
it gives me the error
suspension functions can be called only within coroutine body
. Usually I wrap legacy callbacks with
suspendCoroutine
but in this case this is not possible since the listener fires multiple times but a Continuation can only be resumed once. What is the best practice here?
Do I have to wrap
send()
in
scope.launch { send() }
or is there a better solution?
d

Dico

02/11/2019, 12:16 PM
You can try using
offer
, which returns true if the call was successful.
Would a conflated channel be appropriate here? Where each snapshot is an update to the previous, so only the last sent element is needed
s

svenjacobs

02/11/2019, 12:17 PM
Yes, conflated is perfectly fine
d

Dico

02/11/2019, 12:18 PM
Then just use
offer
with a conflated channel, it will always succeed unless the channel is closed or failed
s

svenjacobs

02/11/2019, 12:19 PM
Okay, thanks! 👍
Now I have another problem. Since
addSnapshotListener
adds an asynchronous listener, the
produce {}
block ist left immediatly. I think this is the reason why I receive a
Channel was closed
exception once
offer()
is called.
b

bdawg.io

02/11/2019, 3:25 PM
What do you want to happen if an exception occurs when you’re trying to send to the channel? Do you want your
addSnapshotListener
lambda to throw an exception? Do you want your coroutine scope to handle the exception? I would probably recommend to utilize your coroutine scope to handle the exception and to also not block your snapshot listener with offer. Your
onSnapshotAsync
method shouldn’t need to be suspending either.
Copy code
fun Query.onSnapshotAsync(scope: CoroutineScope) = scope.produce(capacity = Channel.CONFLATED) {
    addSnapshotListener { snapshot ->
        // utilizes your coroutine scope's exception handler
        // does not block your snapshot listener thread (potential bug if you move from a conflated channel)
        launch { send(snapshot.documents) }
    }
}
s

svenjacobs

02/11/2019, 3:27 PM
@bdawg.io don't I have the same issue as above with an immediately closed channel as
addSnapshotListener
is non-blocking?
b

bdawg.io

02/11/2019, 3:32 PM
Yes, sorry i meant to update that.
Copy code
= Channel<List<Snapshot.Document>>(capacity = Channel.CONFLATED).apply {
    // attach channel to the lifecycle of the scope (which is based on the Job, if one is attached)
    scope.coroutineContext[Job]?.invokeOnCompletion { 
        close()
    }
    addSnapshotListener { ... }
}
d

Dico

02/11/2019, 3:34 PM
Dont forget to unregister the listener when the channel is closed
s

svenjacobs

02/11/2019, 3:35 PM
The API doesn't provide an unregister
Cloud Firestore API on Android that is
d

Dico

02/11/2019, 3:36 PM
It must do right?
@bdawg.io Do I really have to explicitly close the channel? Doesn't this happen automatically when the associated scope is terminated/cancelled?
d

Dico

02/11/2019, 3:42 PM
Returns A registration object that can be used to remove the listener.
s

svenjacobs

02/11/2019, 3:42 PM
Oops I missed that 😣
b

bdawg.io

02/11/2019, 3:42 PM
Nope, because it’s not attached to a scope. Channels themselves don’t have CoroutineScopes. Not attaching it means the consumer is responsible for closing the channel or else it will be leaked
👍 1
d

Dico

02/11/2019, 3:43 PM
Theres no associated coroutine context when the channel is created like this
I think you can just use
produce
and join on the scope's job at the end
s

svenjacobs

02/11/2019, 3:44 PM
How do I invoke something when the channel is closed or do I have to attach this functionality to the Job's invokeOnCompletion?
d

Dico

02/11/2019, 3:44 PM
Though it might cause a...
channel.invokeOnClose
Deadlock is what I was gonna say. I swear there was a conversation somewhere about how to do this cleanly.
b

bdawg.io

02/11/2019, 3:50 PM
You can try to join on the context’s job, but it will immediately close the channel if there’s no job attached to the scope. So you have to weigh your options and document what will happen when the scope doesn’t have a Job attached: a) If you use
produce
and try joining, your channel will immediately close (or you could throw an IllegalArgumentException if the scope doesn’t have a
Job
attached). b) If you use
Channel
, your channel may leak if the consumer doesn’t close it when it is done (IMO is a bug itself). Choose your poison haha.
d

Dico

02/11/2019, 3:57 PM
I would go with the first option and require that a lifecycle is present
s

svenjacobs

02/12/2019, 6:14 AM
I think I go with the second option because if a consumer requests a
Channel
and stores the reference in a property, it should also be responsible for closing the channel, right?
b

bdawg.io

02/12/2019, 2:58 PM
I think they’re both very reasonable. The first one is a little more in line with structured concurrency. If the intent is have only 1 worker consuming a given channel, the worker should close the channel when it is no longer accepting items and cleanup after itself IMO
5 Views