I have some code that takes items from a channel a...
# coroutines
d
I have some code that takes items from a channel and sends them in batches to a HTTP api with ktor. How can I make it detect cancellation and send the remaining accumulated items in a clean way? Pseudo code here:
Copy code
private val processor = scope.launch {
        val batch = ArrayList<String>(MAX_BATCH)
        
        suspend fun sendBatch() {
            val payload = batch.joinToString(separator = "\n")
            batch.clear()
            return sendPayload(payload)
        }
        
        try {
            while (isActive) {
                batch.add(hitChannel.receive())
                if (batch.size == MAX_BATCH) {
                    sendBatch()
                }
            }
        } finally {
            if (batch.isNotEmpty()) {
                sendBatch()
            }
        }
    }
I expect that final
sendBatch
will throw exception because coroutine no longer
isActive
. I can't do
withContext(NonCancellable)
either at that point.
d
You want to send remaining items inside the batch? or batches after the batch?
d
remaining items inside the batch
d
Ah, if
hitChannel.receive()
fails, you want to call
sendBatch
?
What's being cancelled here?
d
The scope is being cancelled, sorry for lack of clarity
that means
receive()
will fail I think
Perhaps I should test it with a modified
sendBatch
d
I would suggest a refactor.
d
you mean start from scratch with the logic?
d
The processor and the batching should have different lifecycles.
d
I see what you mean!
so how would you implement the batching on its own lifecycle?
I suppose I should close the channel and then detect that, that makes sense
d
Abstract away the batching from the request sender caller.
Basically pass the request to another coroutine which will be cancelled differently from the processor.
Closing the channel also works. 😅
d
Thanks for the help
For reference - the code becomes
Copy code
private val processor = scope.launch {
        val batch = ArrayList<String>(MAX_BATCH)

        suspend fun sendBatch() {
            val payload = batch.joinToString(separator = "\n")
            batch.clear()
            return sendPayload(payload)
        }

        for (item in hitChannel) {
            batch.add(hitChannel.receive())
            if (batch.size == MAX_BATCH) {
                sendBatch()
            }
        }

        if (batch.isNotEmpty()) {
            sendBatch()
        }
        
        scope.cancel()
    }

    fun stop() {
        hitChannel.close()
    }
d
You removed the try/finally?
d
I don't expect errors
If there are errors they will be reported by the uncaught exception handler
And the data handled here is just for google analytics so I'm not fussed
In hindsight, it should probably be there
d
If
hitChannel
is closed before
MAX_BATCH
is attained, then some items won't be sent, even though they've gone through the channel.
d
This is the only coroutine receiving from the channel
I'm using a for loop, so receive() will never fail
d
Oh. You want
batch.add(item)
d
What do you mean?
d
Instead of
hitChannel.receive()
.
for loop does
receive
for you.
d
right, I'm an idiot.
thx
d
No problem. This was fun!
🔥 1
😁 1