Is there a way to `join()` on a `Continuation`? I'...
# coroutines
r
Is there a way to
join()
on a
Continuation
? I'm trying to block sending more requests until the previous one has been completed. Currently trying the hard way with subclassing
CancellableContinuation
, but that brings it s own issues.
Code, I'm not sure how to forward
CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
to the
coroutine
implementation.
Copy code
kotlin
class SubscribableContinuation<T>(val coroutine: CancellableContinuation<T>): CancellableContinuation<T> {
    override val context: CoroutineContext
        get() = coroutine.context
    override val isActive: Boolean
        get() = coroutine.isActive
    override val isCancelled: Boolean
        get() = coroutine.isCancelled
    override val isCompleted: Boolean
        get() = coroutine.isCompleted

    override fun cancel(cause: Throwable?): Boolean = coroutine.cancel(cause)

    @InternalCoroutinesApi
    override fun completeResume(token: Any) = coroutine.completeResume(token)

    @InternalCoroutinesApi
    override fun initCancellability() = coroutine.initCancellability()

    override fun invokeOnCancellation(handler: CompletionHandler) = coroutine.invokeOnCancellation(handler)

    @InternalCoroutinesApi
    override fun tryResumeWithException(exception: Throwable): Any? = coroutine.tryResumeWithException(exception)

    @OptIn(ExperimentalStdlibApi::class)
    @ExperimentalCoroutinesApi
    override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
    }

    @OptIn(ExperimentalStdlibApi::class)
    @ExperimentalCoroutinesApi
    override fun CoroutineDispatcher.resumeUndispatched(value: T) {
        val dispatcher = context[CoroutineDispatcher]
    }

    @InternalCoroutinesApi
    override fun tryResume(
        value: T,
        idempotent: Any?,
        onCancellation: ((cause: Throwable) -> Unit)?
    ): Any? {
        TODO("Not yet implemented")
    }

    @InternalCoroutinesApi
    override fun tryResume(value: T, idempotent: Any?): Any? {
        TODO("Not yet implemented")
    }

    @ExperimentalCoroutinesApi
    override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) {
        TODO("Not yet implemented")
    }

    override fun resumeWith(result: Result<T>) {
        TODO("Not yet implemented")
    }

}
k
For clarity, do you mean http requests?
r
Nope, It's bluetooth requests to an external device, which get answered on a different channel
So the protocol doesn't haven an inherent way to tie request/response together, gotta do that manually.
k
Can you give me a small gist on how you’re tying them together?
r
Roughly
Copy code
private val incomingData = Channel<ByteArray>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = {
        Log.e(TAG, "Failed to process incoming data $it")
    })
    private val requests = Channel<Command>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = { it.callback.cancel(
        CancellationException("Too many requests have been stacked.")
    )})

    init {
        scope.launch {
            incomingData.consumeAsFlow().runningFold(ParsingState.NoData as ParsingState) { state, bytes ->
                parse(state, bytes)
            }.map {
                when(it) {
                    // deleted a few
                    is ParsingState.ParsingSuccessful -> onMessage(it)
                }
            }.collect()
        }
        scope.launch {
            requests.consumeAsFlow().collect {
                writeToReader(it.toBytes(comByteManager))
                it.callback.join() // TODO
            }
        }
    }

sealed interface Command {
    val callback: CancellableContinuation<ParsingState.ParsingSuccessful>
    fun toBytes(comByteManager: ComByteManager): ByteArray
    data class AutoCardSearch(override val callback: CancellableContinuation<ParsingState.ParsingSuccessful>, val enabled: Boolean, val delayMs: Byte, val cardType: Byte): Command {
        override fun toBytes(comByteManager: ComByteManager): ByteArray {
            return comByteManager.autoSearchCardCmdBytes(enabled, delayMs, cardType)
        }
    }
}
Where the
onMessage()
should continue the continutation if possible.
Depending on which
Command
is issued, I can figure out if the request has been answered by a specific response. They come back tagged which request type caused this response type.
And I don't wanna send a new command until the previous one has been processed. The hardware on the other side doesn't have a buffer 😭
k
Okay so one last thought. If you’re only ever sending the commands in serial then I don’t think you need to worry about matching requests and responses?
Any response you get back will correspond to the request that was issued?
Just trying to clarify before I recommend something
r
It's not that easy, it can also send responses on its own (when I tell it to watch out for a specific event)
k
I see. So one request can generate many responses?
r
Albeit I could filter these out specifically 🤔
But it's the same response tag as another command I can send. Basically I can tell it to sweep for a card (it's a card reader) once, or I can send a command (which gets answered as usual), so it'll send me a response as soon as you put a card on the reader, tagged the same as the sweep command.
Otherwise it's pretty much one to one. Unless you overwhelm the reader, then it's one-to-zero.
k
Is the buffer of the hardware considered to be full if it’s sending back many responses to a single request?
eg. Request 1 generates resopnse 1, 2, and 3. You have to wait for all responses before you can submit another request?
r
So after requesting card search, the buffer is considered empty as soon as it told me it started it (by sending a response back to my request starting the card search)
No, I only have to wait for a response tagged for that request.
But it can send more responses, which I'll have to process separately (by stuffing them into their own
Channel
, basically)
k
Yeah this protocol definitely seems messy. I have a few thought… I think you shouldn’t be trying to force these requests to operate in serial at the continuation level. I think your bluetooth client should be responsible for flattening the concurrency here. The general premise is probably: 1. Execute your requests in serial by submitting them to a channel and having a single coroutine process requests from that channel 2. Have a trigger which doesn’t allow requests to be processed until the prior request has experienced at least one response. 3. smoosh the requests and responses together. Maybe something like this? Devils are in the details of course, which this doesn’t cover.
Copy code
class BluetoothClient {

  private val scope = CoroutineScope(Job())
  private val requests = Channel<BluetoothRequest>(Channel.Unlimited) // Pretend that BluetoothRequest exists
  private val requestTrigger = Channel<Unit>(1).apply { trySend(Unit) } // Used to signal that the next request can begin. 
  private val responses = Channel<BluetoothResponse>(Channel.Unlimited) // Again, pretend this class exists. 
  private val sharedResponses = responses.consumeAsFlow().shareIn(scope)

  init {
    scope.launch { processRequests() }
  }

  fun request(request: BluetoothRequest): Flow<BluetoothResponse> {
    requests.trySend(request).getOrThrow() 
    return flow {
        val firstReponse = sharedResponses.first { it.matches(request) }
        requestTrigger.trySend(Unit) // Allow the next request to begin. 
        emit(firstResponse)
        emitAll(sharedResponses.filter { it.matches(request) }
    }
  }

  private suspend fun processRequests() {
    for (request in requests) {
      for (_ in requestTrigger) {
        bluetoothPlatform.queueRequest(request)
      }
    }
  }
}
My only concern is that with this protocol, since you never know if a request is going to generate one or many responses, and they’re not really de-dupable, I think every request would have to be active…forever?
r
No, I don't care if one creates more than one response, the secondary responses are different.
So waiting for one only should be plenty.
k
Yup this will wait for one response before triggering the next request to begin.
r
Perfect, thank you very much!
Took some liberties, final result:
Copy code
fun sendCommand(command: Command) {
        requests.trySend(command).onFailure {
            Log.e(TAG, it?.stackTraceToString() ?: "Failed to send BT Command")
        }
    }

    val cardChannel = Channel<Ntag21x?>(10, BufferOverflow.DROP_OLDEST)
    private val incomingData = Channel<ByteArray>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = {
        Log.e(TAG, "Failed to process incoming data $it")
    })
    private val requests = Channel<Command>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = { it.callback.cancel(
        CancellationException("Too many requests have been stacked.")
    )})

    private var currentRequest: Command? = null
    val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + job)

    init {
        ble.writeReaderDataTo(incomingData)

        scope.launch {
            incomingData.consumeAsFlow().runningFold(ParsingState.NoData as ParsingState) { state, bytes ->
                parse(state, bytes)
            }.map {
                when(it) {
                    ParsingState.NoData -> Log.d(TAG, "No Data")
                    is ParsingState.ParsingSuccessful -> {
                        val responseToCommand = (it.cmd - 1).toByte()
                        val req = currentRequest
                        Log.d(TAG, "Processing response: ${readableCommand(responseToCommand) ?: "Unknown command"}, expected ${
                            req?.commandByte?.let { it1 -> readableCommand(it1) }} with message: ${it.error.dump()} and data: ${it.data.dump()}")
                        if (req != null && responseToCommand == req.commandByte) {
                            req.channel.send(it)
                            req.channel.close()
                        } else {
                            onMessage(it)
                            if (responseToCommand == BtConstants.ACTIVATE_PICC_COM) {
                                Log.d(TAG, "Found a card, not a response.")
                            } else {
                                Log.e(TAG, "Got a message not awaited: $it")
                            }
                        }
                    }
                }
            }.collect()
        }

        val requestTrigger = Channel<Unit>(1).apply { trySend(Unit) } // Used to signal that the next request can begin.
        scope.launch {
            requests.consumeEach { btCommand ->
                requestTrigger.receive()
                Log.d(TAG, "Processing BT Command: $btCommand")
                currentRequest = btCommand
                ble.writeToReader(btCommand.toBytes(ble.comByteManager))
                val retryTicker = ticker(500, 500)
                btCommand.callback.invokeOnCancellation {
                    Log.i(TAG, "Callback of BT command got canceled, advancing...")
                    scope.launch {
                        requestTrigger.send(Unit)
                        btCommand.channel.close()
                        retryTicker.cancel()
                    }
                }
                @Suppress("ControlFlowWithEmptyBody")
                while(select {
                        btCommand.channel.onReceive {
                            retryTicker.cancel()
                            requestTrigger.send(Unit)
                            try {
                                btCommand.callback.resume(it)
                                false
                            } catch (e: IllegalStateException) {
                                Log.e(TAG, e.stackTraceToString())
                                false
                            }
                        }
                        retryTicker.onReceiveCatching {
                            if (!it.isClosed) {
                                Log.w(TAG, "Didn't get a response for $btCommand, sending request again.")
                                ble.writeToReader(btCommand.toBytes(ble.comByteManager))
                                true
                                // TODO limit retries
                            } else {
                                Log.d(TAG, "Retry ticker got closed, probably success.")
                                false
                            }
                        }
                    }) {
                }
            }
        }
    }