reactormonk
08/18/2023, 5:45 PMjoin()
on a Continuation
? I'm trying to block sending more requests until the previous one has been completed. Currently trying the hard way with subclassing CancellableContinuation
, but that brings it s own issues.reactormonk
08/18/2023, 5:46 PMCoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
to the coroutine
implementation.
kotlin
class SubscribableContinuation<T>(val coroutine: CancellableContinuation<T>): CancellableContinuation<T> {
override val context: CoroutineContext
get() = coroutine.context
override val isActive: Boolean
get() = coroutine.isActive
override val isCancelled: Boolean
get() = coroutine.isCancelled
override val isCompleted: Boolean
get() = coroutine.isCompleted
override fun cancel(cause: Throwable?): Boolean = coroutine.cancel(cause)
@InternalCoroutinesApi
override fun completeResume(token: Any) = coroutine.completeResume(token)
@InternalCoroutinesApi
override fun initCancellability() = coroutine.initCancellability()
override fun invokeOnCancellation(handler: CompletionHandler) = coroutine.invokeOnCancellation(handler)
@InternalCoroutinesApi
override fun tryResumeWithException(exception: Throwable): Any? = coroutine.tryResumeWithException(exception)
@OptIn(ExperimentalStdlibApi::class)
@ExperimentalCoroutinesApi
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
}
@OptIn(ExperimentalStdlibApi::class)
@ExperimentalCoroutinesApi
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dispatcher = context[CoroutineDispatcher]
}
@InternalCoroutinesApi
override fun tryResume(
value: T,
idempotent: Any?,
onCancellation: ((cause: Throwable) -> Unit)?
): Any? {
TODO("Not yet implemented")
}
@InternalCoroutinesApi
override fun tryResume(value: T, idempotent: Any?): Any? {
TODO("Not yet implemented")
}
@ExperimentalCoroutinesApi
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) {
TODO("Not yet implemented")
}
override fun resumeWith(result: Result<T>) {
TODO("Not yet implemented")
}
}
kevin.cianfarini
08/18/2023, 5:48 PMreactormonk
08/18/2023, 5:48 PMreactormonk
08/18/2023, 5:49 PMkevin.cianfarini
08/18/2023, 5:49 PMreactormonk
08/18/2023, 5:52 PMprivate val incomingData = Channel<ByteArray>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = {
Log.e(TAG, "Failed to process incoming data $it")
})
private val requests = Channel<Command>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = { it.callback.cancel(
CancellationException("Too many requests have been stacked.")
)})
init {
scope.launch {
incomingData.consumeAsFlow().runningFold(ParsingState.NoData as ParsingState) { state, bytes ->
parse(state, bytes)
}.map {
when(it) {
// deleted a few
is ParsingState.ParsingSuccessful -> onMessage(it)
}
}.collect()
}
scope.launch {
requests.consumeAsFlow().collect {
writeToReader(it.toBytes(comByteManager))
it.callback.join() // TODO
}
}
}
sealed interface Command {
val callback: CancellableContinuation<ParsingState.ParsingSuccessful>
fun toBytes(comByteManager: ComByteManager): ByteArray
data class AutoCardSearch(override val callback: CancellableContinuation<ParsingState.ParsingSuccessful>, val enabled: Boolean, val delayMs: Byte, val cardType: Byte): Command {
override fun toBytes(comByteManager: ComByteManager): ByteArray {
return comByteManager.autoSearchCardCmdBytes(enabled, delayMs, cardType)
}
}
}
Where the onMessage()
should continue the continutation if possible.reactormonk
08/18/2023, 5:53 PMCommand
is issued, I can figure out if the request has been answered by a specific response. They come back tagged which request type caused this response type.reactormonk
08/18/2023, 5:54 PMkevin.cianfarini
08/18/2023, 5:57 PMkevin.cianfarini
08/18/2023, 5:57 PMkevin.cianfarini
08/18/2023, 5:57 PMreactormonk
08/18/2023, 5:57 PMkevin.cianfarini
08/18/2023, 5:58 PMreactormonk
08/18/2023, 5:58 PMreactormonk
08/18/2023, 6:00 PMreactormonk
08/18/2023, 6:00 PMkevin.cianfarini
08/18/2023, 6:01 PMkevin.cianfarini
08/18/2023, 6:01 PMreactormonk
08/18/2023, 6:02 PMreactormonk
08/18/2023, 6:02 PMreactormonk
08/18/2023, 6:03 PMChannel
, basically)kevin.cianfarini
08/18/2023, 6:27 PMclass BluetoothClient {
private val scope = CoroutineScope(Job())
private val requests = Channel<BluetoothRequest>(Channel.Unlimited) // Pretend that BluetoothRequest exists
private val requestTrigger = Channel<Unit>(1).apply { trySend(Unit) } // Used to signal that the next request can begin.
private val responses = Channel<BluetoothResponse>(Channel.Unlimited) // Again, pretend this class exists.
private val sharedResponses = responses.consumeAsFlow().shareIn(scope)
init {
scope.launch { processRequests() }
}
fun request(request: BluetoothRequest): Flow<BluetoothResponse> {
requests.trySend(request).getOrThrow()
return flow {
val firstReponse = sharedResponses.first { it.matches(request) }
requestTrigger.trySend(Unit) // Allow the next request to begin.
emit(firstResponse)
emitAll(sharedResponses.filter { it.matches(request) }
}
}
private suspend fun processRequests() {
for (request in requests) {
for (_ in requestTrigger) {
bluetoothPlatform.queueRequest(request)
}
}
}
}
My only concern is that with this protocol, since you never know if a request is going to generate one or many responses, and they’re not really de-dupable, I think every request would have to be active…forever?reactormonk
08/18/2023, 6:39 PMreactormonk
08/18/2023, 6:40 PMkevin.cianfarini
08/18/2023, 6:41 PMreactormonk
08/18/2023, 6:43 PMreactormonk
08/21/2023, 4:16 PMfun sendCommand(command: Command) {
requests.trySend(command).onFailure {
Log.e(TAG, it?.stackTraceToString() ?: "Failed to send BT Command")
}
}
val cardChannel = Channel<Ntag21x?>(10, BufferOverflow.DROP_OLDEST)
private val incomingData = Channel<ByteArray>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = {
Log.e(TAG, "Failed to process incoming data $it")
})
private val requests = Channel<Command>(100, BufferOverflow.DROP_OLDEST, onUndeliveredElement = { it.callback.cancel(
CancellationException("Too many requests have been stacked.")
)})
private var currentRequest: Command? = null
val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + job)
init {
ble.writeReaderDataTo(incomingData)
scope.launch {
incomingData.consumeAsFlow().runningFold(ParsingState.NoData as ParsingState) { state, bytes ->
parse(state, bytes)
}.map {
when(it) {
ParsingState.NoData -> Log.d(TAG, "No Data")
is ParsingState.ParsingSuccessful -> {
val responseToCommand = (it.cmd - 1).toByte()
val req = currentRequest
Log.d(TAG, "Processing response: ${readableCommand(responseToCommand) ?: "Unknown command"}, expected ${
req?.commandByte?.let { it1 -> readableCommand(it1) }} with message: ${it.error.dump()} and data: ${it.data.dump()}")
if (req != null && responseToCommand == req.commandByte) {
req.channel.send(it)
req.channel.close()
} else {
onMessage(it)
if (responseToCommand == BtConstants.ACTIVATE_PICC_COM) {
Log.d(TAG, "Found a card, not a response.")
} else {
Log.e(TAG, "Got a message not awaited: $it")
}
}
}
}
}.collect()
}
val requestTrigger = Channel<Unit>(1).apply { trySend(Unit) } // Used to signal that the next request can begin.
scope.launch {
requests.consumeEach { btCommand ->
requestTrigger.receive()
Log.d(TAG, "Processing BT Command: $btCommand")
currentRequest = btCommand
ble.writeToReader(btCommand.toBytes(ble.comByteManager))
val retryTicker = ticker(500, 500)
btCommand.callback.invokeOnCancellation {
Log.i(TAG, "Callback of BT command got canceled, advancing...")
scope.launch {
requestTrigger.send(Unit)
btCommand.channel.close()
retryTicker.cancel()
}
}
@Suppress("ControlFlowWithEmptyBody")
while(select {
btCommand.channel.onReceive {
retryTicker.cancel()
requestTrigger.send(Unit)
try {
btCommand.callback.resume(it)
false
} catch (e: IllegalStateException) {
Log.e(TAG, e.stackTraceToString())
false
}
}
retryTicker.onReceiveCatching {
if (!it.isClosed) {
Log.w(TAG, "Didn't get a response for $btCommand, sending request again.")
ble.writeToReader(btCommand.toBytes(ble.comByteManager))
true
// TODO limit retries
} else {
Log.d(TAG, "Retry ticker got closed, probably success.")
false
}
}
}) {
}
}
}
}