Jonathan
01/21/2025, 4:08 PMThread
and an ArrayList
(access was synchronized for thread safety) and I would iterate over the list (peek) and remove the first (priority) data and send it,
Since I’m rewriting this functionality of Kotlin I would like to take advantage of the coroutine library. Looking around, it seems like Channels are what I want to achieve similar functionality. My only issue is that I cannot figure out how to peek at the content of a Channel (seems like this is by design). Is there something I’m missing? Can Channels be made peekable? Is my assumption of Channels being the correct strategy incorrect?
Pseudocode in thread =>Jonathan
01/21/2025, 4:09 PMprivate fun sourceIsBusy(): Boolean {
return false
}
private suspend fun getNextPriorityData(): ByteArray? {
TODO()
}
private suspend fun performBTWrite(data: ByteArray): Boolean {
TODO()
}
private suspend fun getNextData(): ByteArray? {
TODO()
}
private fun CoroutineScope.writeQueuedData() {
val writeJob = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
while(isActive) {
while(sourceIsBusy() && isActive) {
// Get priority packet..
val data = getNextPriorityData()
if(data != null) {
if(performBTWrite(data)) {
// Log success
} else {
// Log failure
}
}
}
// Safe to write next available data
val data = getNextData()
if(data != null) {
if(performBTWrite(data)) {
// Log success
} else {
// Log failure
}
}
}
}
}
Sam
01/21/2025, 4:11 PMkevin.cianfarini
01/21/2025, 4:13 PMkevin.cianfarini
01/21/2025, 4:14 PMkevin.cianfarini
01/21/2025, 4:16 PMn
channels and select
across them by declaring them in priority order. That will (I think?) bias data races towards the higher priority clause.Jonathan
01/21/2025, 4:17 PMkevin.cianfarini
01/21/2025, 4:18 PMJonathan
01/21/2025, 4:18 PMkevin.cianfarini
01/21/2025, 4:19 PMselect
across both of them to receive an element, but place the priority channel first in the select clause. Perform that select in a while loop to make it infinite.Jonathan
01/21/2025, 4:21 PMkevin.cianfarini
01/21/2025, 4:26 PMval rendezvous = Channel<Element>(Channel.RENDEZVOUS)
val buffered = Channel<Element>(Channel.BUFFERED) // or Channel.UNLIMITED
A rendezvous channel has a buffer of 0. This means that the receiver and the sender of the channel have to meet to pass along the data. The rendezvous channel is the special sauce that will ensure unprioritized elements don’t get sent while the consumer is busy. It will force senders to suspend until the consumer is ready to process it.
A buffered channel will allow senders to synchronously send data so long as the buffer isn’t full. For example, a channel with a buffer of size 64 will allow 64 Channel.send
invocations without calling Channel.recieve
to occur before it forces senders to suspend. Channel.UNLIMITED
will always allow sender to enqueue another element without suspending, but at the expense of an ever growing memory consumption.
What will the additional while around the select do?
select
is a single shot function. Essentially, it’d be like calling Channel.recieve
one time. Wrapping it in a while loop allows you to iterate on both of these channels forever.kevin.cianfarini
01/21/2025, 4:26 PMJonathan
01/21/2025, 4:32 PMJonathan
01/21/2025, 4:34 PMWon’t the outer loop take care of this use case? I was imagining refactoring theis a single shot function. Essentially, it’d be like callingselect
one time. Wrapping it in a while loop allows you to iterate on both of these channels forever.Channel.recieve
select {}
login to a function that just blocks until the select returns a value. Would that not work?kevin.cianfarini
01/21/2025, 4:34 PMkevin.cianfarini
01/21/2025, 4:34 PMJonathan
01/21/2025, 4:35 PMJonathan
01/21/2025, 4:37 PMselect {}
. How do I guarantee I don’t return a non priority datum while in the busy condition?kevin.cianfarini
01/21/2025, 4:39 PMkevin.cianfarini
01/21/2025, 4:39 PMkevin.cianfarini
01/21/2025, 4:47 PM…pulls data from a queue…
Jonathan
01/21/2025, 4:51 PMJonathan
01/21/2025, 4:51 PMkevin.cianfarini
01/21/2025, 4:54 PMJonathan
01/21/2025, 4:56 PMkevin.cianfarini
01/21/2025, 4:56 PMkevin.cianfarini
01/21/2025, 5:01 PMfun main() = runBlocking {
// The priority channel will allow an unlimited amount of entries in it to be enqueued.
val priorityChannel = Channel<ByteArray>(Channel.UNLIMITED)
// The unprioritized channel will force senders to suspend until the consumer is ready to process
// those elements, thus stopping sending that data to the bluetooth device while it's busy.
val unprioritizedChannel = Channel<ByteArray>(Channel.RENDEZVOUS)
launch { writeQueueData(priorityChannel, unprioritizedChannel) }
// ...Do other stuff like send data to the channels...
}
private suspend fun writeQueueData(
priorityChannel: Channel<ByteArray>,
unprioritizedChannel: Channel<ByteArray>,
): Nothing {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
while (true) {
select {
// The select function is biased towards the first clause, so in the event that both
// a prioritized and unprioritized element are recieved at the same time it will
// prefer the priorty one.
priorityChannel.onReceive { priorityData ->
sendToBluetooth(priorityData)
}
unprioritizedChannel.onReceive { unprioritizedData ->
sendToBluetooth(unprioritizedData)
}
}
}
}
}
/**
* Send [data] to the bluetooth device and suspend until it's stopped processing that data.
*/
private suspend fun sendToBluetooth(data: ByteArray) {
TODO("Implement")
}
kevin.cianfarini
01/21/2025, 5:02 PMclass PriorityData(val data: ByteArray, val isPrioritized: Boolean)
class PriorityDataManager {
private val priorityChannel = ...
private val unprioritizedChannel = ...
public suspend fun send(data: PriorityData) {
val channel = if (data.isPrioritized) priorityChannel else unprioritizedChannel
channel.send(data.data)
}
public suspend fun receive(): ByteArray {
return select {
priorityChannel.onReceive { priorityData -> priorityData }
unprioritizedChannel.onReceive { unprioritizedData -> unprioritizedData }
}
}
}
Jonathan
01/21/2025, 5:08 PMkevin.cianfarini
01/21/2025, 5:09 PMkevin.cianfarini
01/21/2025, 5:09 PMkevin.cianfarini
01/21/2025, 5:11 PMWont’ both channels receive data but the unpriorized channel will get thrown away and lost for ever.. I’d like the unpriorized data to remain until the BT device is no longer busy.Which line of code makes you think data would get lost? Perhaps I could explain
Jonathan
01/21/2025, 5:13 PMselect {}
it’s unclear to me what happens when both channels have data to receive. The doc. say that it’s biased towards the first clause. What happens to the result of the second clause?Zach Klippenstein (he/him) [MOD]
01/21/2025, 5:18 PMJonathan
01/21/2025, 5:19 PMselect {}
it’s guaranteed the first will always be received from and preserving data?Zach Klippenstein (he/him) [MOD]
01/21/2025, 5:20 PMJonathan
01/21/2025, 5:21 PMJonathan
01/21/2025, 5:29 PMchannel.send(…)
call must be balanced with a channel.receive(…)
before it returns?kevin.cianfarini
01/21/2025, 5:31 PMselect
clause) will iterate through that data as quickly as possible.kevin.cianfarini
01/21/2025, 5:32 PMJonathan
01/21/2025, 6:00 PMkevin.cianfarini
01/21/2025, 6:13 PMI queue 2 priority datums and 3 non priority datums, the subsequent queue calls are blocked until the first two priority datums have been removed from the rendezvous channel. Correct?No, that’s not correct.
kevin.cianfarini
01/21/2025, 6:15 PMclass PriorityDataManager {
private val priorityChannel = ...
private val unprioritizedChannel = ...
public suspend fun send(data: PriorityData) {
val channel = if (data.isPrioritized) priorityChannel else unprioritizedChannel
channel.send(data.data)
}
}
send
here will only suspend when PriorityData.isPrioritized
is false. A priority data will always synchronously enqueue the element to priorityChannel
.Jonathan
01/22/2025, 2:12 AMkevin.cianfarini
01/22/2025, 4:03 AMChannel.send
without affecting each other if that's your questionkevin.cianfarini
01/22/2025, 4:03 AMChannel.send
does not prevent other calls to Channel.send
from also suspending. Once the channel is ready it will resume those suspended callers and process the datakevin.cianfarini
01/22/2025, 4:04 AMtrySend
with a buffer of 1Jonathan
01/22/2025, 5:25 AMJonathan
01/22/2025, 5:52 AMselect {}
will return the data received non priority channel (unblock any potentially blocked senders) even though it's not valid to write the data (device may still be busy). I'll admit that I'm new to Channels so I may be overlooking this functionality, if so please enlighten me.kevin.cianfarini
01/22/2025, 11:22 AMselect {}
will return the data received non priority channel (unblock any potentially blocked senders) even though it’s not valid to write the data (device may still be busy).
In my above example sendToBluetooth
is a suspending function.
/**
* Send [data] to the bluetooth device and suspend until it's stopped processing that data.
*/
private suspend fun sendToBluetooth(data: ByteArray) {
TODO("Implement")
}
This function is called from the lambda within the select
function, and will suspend until the data is written to the BT device. Since this suspends, the code won’t attempt another iteration of the select
function to pluck off another piece of data until sendToBluetooth
resumes (which also means it’s ready to process another piece of data).Jonathan
01/22/2025, 1:55 PM