I have a block of code whose job it is to send a B...
# coroutines
m
I have a block of code whose job it is to send a BLE message in a loop until it succeeds, or until the device disconnects. Here is the code:
Copy code
coroutineScope {
    select {
        launch {
            item.gatt.connectionState.first { it == GattConnectionState.STATE_DISCONNECTED }
        }.onJoin { }

        launch @SuppressLint("MissingPermission") {
            while (isActive) {
                try {
                    item.characteristic.splitWrite(item.data, writeType = BleWriteType.NO_RESPONSE)
                    break
                } catch (e: Exception) {
                    delay(200.milliseconds)
                }
            }
        }.onJoin { }
    }

    cancel()
}
My problem is that after one of the 2 select branches completes, the
cancel()
call does nothing and the coroutineScope never completes, as it seems like the
first()
call cannot be cancelled and the
splitWrite()
call is also implemented using
suspendCoroutine
and therefore also cannot be cancelled. How do I get around this issue?
z
Use
suspendCancellableCoroutine
for the latter. For the former, is connectionState a flow?
m
The
splitWrite
implementation comes from a library, so I cannot really change it to use
suspendCancellableCoroutine
sadly. Is there a way to work around this? And yes,
connectionState
is a flow.
z
Not if you want to cleanly cancel it i don’t think, since
suspendCoroutine
does not provide any way for the coroutine to be notified of cancellation. You could launch the coroutine outside of structured concurrency, with a different parent job, and let it leak and hope it finishes on its own, but that’s pretty dangerous. I would submit a bug report to the library.
Idk why
first
wouldn’t cancel though. Is the flow suspending in a non-cancellable way somewhere upstream?
Does the cancellation issue repro if you don’t have
select
? It shouldn’t matter but trying to eliminate things
m
It looks like the
connectionState
property just exposes an internal
MutableStateFlow
as a flow. I have had problems before with
first
not being cancellable and I found a website claiming that cancellation is only being checked for every time the flow emits an item, so if it is stuck and you never get any new items, there is no way to cancel it. Is that true?
Does the cancellation issue repro if you don’t have
select
? It shouldn’t matter but trying to eliminate things
So you mean I should try only awaiting one of the two branches?
z
just for repro, launch one coroutine in a way that you know it will suspend, then cancel it.
i just looked at the impl of
first
and it just does a normal
collect
call, so it should cancel fine as long as the upstream doesn’t block it
and i would be surprised if
MutableStateFlow
didn’t allow cancellation , it would definitely be a bug if it did
m
It definitely behaves weirdly. I replaced the block of code with
Copy code
coroutineScope {
    launch {
        item.gatt.connectionState.first { it == GattConnectionState.STATE_DISCONNECTED }
    }.cancel()
}
and that does not get stuck. So cancelling outside of the select seems to work fine.
I assume this is an issue with how I am using select? This is the first time I have even used it so far. Earlier I got stuck for an hour because I used
.onJoin
instead of
.onJoin { }
. I still don't know what the difference is, but the second one works and the first one just gets stuck forever...
z
It didn’t work because
onJoin
doesn’t do anything, it just gives you a special thing that the `select`’s scope defines an
invoke
function on. You were probably stuck because when the coroutine completed, you hadn’t actually ended up joining on it.
I haven’t used select to join on jobs much, so there might be some subtlety i’m not aware of, but nothing about how you’re using it looks super wrong. I would probably launch both coroutines first, before entering the select block, just to make the code a bit clearer/organized, but i don’t think that should affect the behavior in this way
m
So do I need to manually call
join
on the jobs after I launch them for the select to do something?
z
no,
onJoin {}
will join on them. But that’s actually
onJoin.invoke { }
. Just reading the
onJoin
property won’t do anything
m
I see. I ended up using this code:
Copy code
var callbackCalled = false

val sendDataJob = launch @SuppressLint("MissingPermission") {
    item.characteristic.splitWrite(item.data, writeType = BleWriteType.NO_RESPONSE)

    callbackCalled = true
    item.callback()
}

while (item.gatt.isConnected && !sendDataJob.isCompleted) {
    delay(20.milliseconds)
}

if (item.gatt.isConnected) {
    if (!callbackCalled) {
        item.callback()
    }

    Napier.d { "Sent message from BLE message queue: ${item.data.value.toList()}" }
} else {
    Napier.w { "Failed to send message from BLE message queue, device disconnected." }
}
I don't think that this is how you are supposed to do it, but at least it works for now. It kind of does what you suggested earlier with launching the job outside of structured concurrency.
z
yea i’m not sure what’s going on with your select, i suspect there’s some other code somewhere that’s doing something weird
i’m wary of the quality of this library already if it doesn’t support cancellable suspension
m
It seems like people have complained about it before and the authors said that it was intentionally designed this way: https://github.com/NordicSemiconductor/Kotlin-BLE-Library/issues/21
z
I wouldn’t be shocked if Android’s Bluetooth APIs didn’t allow cancellation. So if you have to wait for the callback anyway, can you just join on that job as well instead?
m
The problem is that that callback is not always called and sometimes the logic just gets stuck waiting for it. In this case I want to give the queue a way to recover by at least progressing when the device is disconnected.