https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

Dico

09/09/2019, 3:53 PM
I have some code that takes items from a channel and sends them in batches to a HTTP api with ktor. How can I make it detect cancellation and send the remaining accumulated items in a clean way? Pseudo code here:
Copy code
private val processor = scope.launch {
        val batch = ArrayList<String>(MAX_BATCH)
        
        suspend fun sendBatch() {
            val payload = batch.joinToString(separator = "\n")
            batch.clear()
            return sendPayload(payload)
        }
        
        try {
            while (isActive) {
                batch.add(hitChannel.receive())
                if (batch.size == MAX_BATCH) {
                    sendBatch()
                }
            }
        } finally {
            if (batch.isNotEmpty()) {
                sendBatch()
            }
        }
    }
I expect that final
sendBatch
will throw exception because coroutine no longer
isActive
. I can't do
withContext(NonCancellable)
either at that point.
d

Dominaezzz

09/09/2019, 3:59 PM
You want to send remaining items inside the batch? or batches after the batch?
d

Dico

09/09/2019, 4:00 PM
remaining items inside the batch
d

Dominaezzz

09/09/2019, 4:00 PM
Ah, if
hitChannel.receive()
fails, you want to call
sendBatch
?
What's being cancelled here?
d

Dico

09/09/2019, 4:03 PM
The scope is being cancelled, sorry for lack of clarity
that means
receive()
will fail I think
Perhaps I should test it with a modified
sendBatch
d

Dominaezzz

09/09/2019, 4:06 PM
I would suggest a refactor.
d

Dico

09/09/2019, 4:06 PM
you mean start from scratch with the logic?
d

Dominaezzz

09/09/2019, 4:06 PM
The processor and the batching should have different lifecycles.
d

Dico

09/09/2019, 4:07 PM
I see what you mean!
so how would you implement the batching on its own lifecycle?
I suppose I should close the channel and then detect that, that makes sense
d

Dominaezzz

09/09/2019, 4:09 PM
Abstract away the batching from the request sender caller.
Basically pass the request to another coroutine which will be cancelled differently from the processor.
Closing the channel also works. 😅
d

Dico

09/09/2019, 4:13 PM
Thanks for the help
For reference - the code becomes
Copy code
private val processor = scope.launch {
        val batch = ArrayList<String>(MAX_BATCH)

        suspend fun sendBatch() {
            val payload = batch.joinToString(separator = "\n")
            batch.clear()
            return sendPayload(payload)
        }

        for (item in hitChannel) {
            batch.add(hitChannel.receive())
            if (batch.size == MAX_BATCH) {
                sendBatch()
            }
        }

        if (batch.isNotEmpty()) {
            sendBatch()
        }
        
        scope.cancel()
    }

    fun stop() {
        hitChannel.close()
    }
d

Dominaezzz

09/09/2019, 4:16 PM
You removed the try/finally?
d

Dico

09/09/2019, 4:17 PM
I don't expect errors
If there are errors they will be reported by the uncaught exception handler
And the data handled here is just for google analytics so I'm not fussed
In hindsight, it should probably be there
d

Dominaezzz

09/09/2019, 4:18 PM
If
hitChannel
is closed before
MAX_BATCH
is attained, then some items won't be sent, even though they've gone through the channel.
d

Dico

09/09/2019, 4:19 PM
This is the only coroutine receiving from the channel
I'm using a for loop, so receive() will never fail
d

Dominaezzz

09/09/2019, 4:19 PM
Oh. You want
batch.add(item)
d

Dico

09/09/2019, 4:19 PM
What do you mean?
d

Dominaezzz

09/09/2019, 4:20 PM
Instead of
hitChannel.receive()
.
for loop does
receive
for you.
d

Dico

09/09/2019, 4:20 PM
right, I'm an idiot.
thx
d

Dominaezzz

09/09/2019, 4:21 PM
No problem. This was fun!
🔥 1
😁 1
3 Views