I’m trying to create a suspending function that ru...
# coroutines
j
I’m trying to create a suspending function that runs indefinitely and pulls data from a queue and attempts to write it via a bluetooth backed output stream. If the destination is busy processing the previously sent data, we should hold off sending more data. There is certain data (priority) that is exempt from this rule and doesn’t care if the destination is busy. Previously, when I wrote this code (in-Java), I achieved what wanted via a dedicated writer
Thread
and an
ArrayList
(access was synchronized for thread safety) and I would iterate over the list (peek) and remove the first (priority) data and send it, Since I’m rewriting this functionality of Kotlin I would like to take advantage of the coroutine library. Looking around, it seems like Channels are what I want to achieve similar functionality. My only issue is that I cannot figure out how to peek at the content of a Channel (seems like this is by design). Is there something I’m missing? Can Channels be made peekable? Is my assumption of Channels being the correct strategy incorrect? Pseudocode in thread =>
Copy code
private fun sourceIsBusy(): Boolean {
    return false
}

private suspend fun getNextPriorityData(): ByteArray? {
    TODO()
}

private suspend fun performBTWrite(data: ByteArray): Boolean {
    TODO()
}

private suspend fun getNextData(): ByteArray? {
    TODO()
}

private fun CoroutineScope.writeQueuedData() {
    val writeJob = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        while(isActive) {
            while(sourceIsBusy() && isActive) {
                // Get priority packet..
                val data = getNextPriorityData()
                if(data != null) {
                    if(performBTWrite(data)) {
                        // Log success
                    } else {
                        // Log failure
                    }
                }
            }
            // Safe to write next available data
            val data = getNextData()
            if(data != null) {
                if(performBTWrite(data)) {
                    // Log success
                } else {
                    // Log failure
                }
            }
        }
    }
}
s
Could you use two channels? One for high priority items and one for lower priority ones.
k
I don’t think that would help. It seems like this person’s needs are: 1. If a data doesn’t have a priority associated with it, it should be subject to backpressure, eg. the data source doesn’t send new elements until its consumer is available to process an event again. 2. If a data does have a priority associated with it, it should be buffered and iterating elements of that buffer should be sorted by priority. Is that correct?
The unprioritized use case can be easily handled with a rendezvous channel, but the prioritized channel will be difficult. You’ll essentially be trying to implementing a coroutine aware max heap.
If you are aware of the different levels priority and there’s a finite amount of them, you could use
n
channels and
select
across them by declaring them in priority order. That will (I think?) bias data races towards the higher priority clause.
j
Currently, priority is binary. Can send while BT source is busy and cannot send while BT source is busy.
k
Oh well that’s easy then
j
The priority data will be uncommon (98% of the packets will be non-priority).
k
You’ll want two channels. A rendezvous channel for the unprioritized and a buffered channel for the prioritized.
select
across both of them to receive an element, but place the priority channel first in the select clause. Perform that select in a while loop to make it infinite.
j
What is the rendezvous channel for vs a regular channel? What will the additional while around the select do? I figured it would block until there was data received from the channels. Is my assumption wrong here?
k
Copy code
val rendezvous = Channel<Element>(Channel.RENDEZVOUS)
val buffered = Channel<Element>(Channel.BUFFERED) // or Channel.UNLIMITED
A rendezvous channel has a buffer of 0. This means that the receiver and the sender of the channel have to meet to pass along the data. The rendezvous channel is the special sauce that will ensure unprioritized elements don’t get sent while the consumer is busy. It will force senders to suspend until the consumer is ready to process it. A buffered channel will allow senders to synchronously send data so long as the buffer isn’t full. For example, a channel with a buffer of size 64 will allow 64
Channel.send
invocations without calling
Channel.recieve
to occur before it forces senders to suspend.
Channel.UNLIMITED
will always allow sender to enqueue another element without suspending, but at the expense of an ever growing memory consumption.
What will the additional while around the select do?
select
is a single shot function. Essentially, it’d be like calling
Channel.recieve
one time. Wrapping it in a while loop allows you to iterate on both of these channels forever.
If your downstream channel consumer is currently processing an unprioritized element and a prioritized element becomes available in the Channel, what should happen? Should processing the unprioritized element be canceled?
j
Currently, in the Java code, if the BT source isn’t busy then data is just sent in the order its queued. The only time priority comes into play is when the BT source is busy.
select
is a single shot function. Essentially, it’d be like calling
Channel.recieve
one time. Wrapping it in a while loop allows you to iterate on both of these channels forever.
Won’t the outer loop take care of this use case? I was imagining refactoring the
select {}
login to a function that just blocks until the select returns a value. Would that not work?
k
Sure, that works too
I didn’t realize you already had a loop in the mix here
j
I posted pseudocode above, to demonstrate roughly what I want the code to do.
With that being said I’m not sure that structure will work using a
select {}
. How do I guarantee I don’t return a non priority datum while in the busy condition?
k
I don’t think your psuedocode structure above will work. Essentially, what you want is to race two coroutines with a preference for the priority one.
Let me draft something up really quick
🙏🏿 1
What is the source queue?
…pulls data from a queue…
j
The queue would be the Channel(s)…
This write function is in a library I’m writing to communicate with a bluetooth device. Based on how much data was written to the device it might get busy processing the data, and it will instruct my library to wait to send more data (non priorotized data).
k
Right
j
The contract of the library is all data is written via bluetooth in the order its queued, with the only exception being when the device is busy. Then only priority data can be sent, since this data maybe an instruction to the device to restart, cancel processing, etc…
k
Yup, drafting something up that might help you
Perhaps this will help:
Copy code
fun main() = runBlocking {
    // The priority channel will allow an unlimited amount of entries in it to be enqueued. 
    val priorityChannel = Channel<ByteArray>(Channel.UNLIMITED)
    // The unprioritized channel will force senders to suspend until the consumer is ready to process 
    // those elements, thus stopping sending that data to the bluetooth device while it's busy. 
    val unprioritizedChannel = Channel<ByteArray>(Channel.RENDEZVOUS)
    launch { writeQueueData(priorityChannel, unprioritizedChannel) }
    // ...Do other stuff like send data to the channels...
}

private suspend fun writeQueueData(
    priorityChannel: Channel<ByteArray>,
    unprioritizedChannel: Channel<ByteArray>,
): Nothing {
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        while (true) {
            select {
                // The select function is biased towards the first clause, so in the event that both 
                // a prioritized and unprioritized element are recieved at the same time it will
                // prefer the priorty one. 
                priorityChannel.onReceive { priorityData ->
                    sendToBluetooth(priorityData)
                }
                unprioritizedChannel.onReceive { unprioritizedData -> 
                    sendToBluetooth(unprioritizedData)
                }
            }
        }
    }
}

/**
 * Send [data] to the bluetooth device and suspend until it's stopped processing that data. 
 */
private suspend fun sendToBluetooth(data: ByteArray) {
    TODO("Implement")
}
You could perhaps wrap this up a little bit neater, too, by creating something like:
Copy code
class PriorityData(val data: ByteArray, val isPrioritized: Boolean)

class PriorityDataManager {

  private val priorityChannel = ...
  private val unprioritizedChannel = ...

  public suspend fun send(data: PriorityData) {
    val channel = if (data.isPrioritized) priorityChannel else unprioritizedChannel 
    channel.send(data.data)
  }

  public suspend fun receive(): ByteArray {
    return select {
      priorityChannel.onReceive { priorityData -> priorityData }
      unprioritizedChannel.onReceive { unprioritizedData -> unprioritizedData }
    }
  }
}
j
This code looks like it would throw away data.. Example scenario. 1. BT device is busy. 2. Non prioritized data in channel 3. Then priority data is sent to channel Wont’ both channels receive data but the unpriorized channel will get thrown away and lost for ever.. I’d like the unpriorized data to remain until the BT device is no longer busy.
k
No, that’s not right
Data shouldn’t be lost here
Wont’ both channels receive data but the unpriorized channel will get thrown away and lost for ever.. I’d like the unpriorized data to remain until the BT device is no longer busy.
Which line of code makes you think data would get lost? Perhaps I could explain
j
Looking at the documentation of
select {}
it’s unclear to me what happens when both channels have data to receive. The doc. say that it’s biased towards the first clause. What happens to the result of the second clause?
z
Nothing until you loop around and hit the select again (or something else reads from the channel)
j
If data is available in both channels when
select {}
it’s guaranteed the first will always be received from and preserving data?
z
Yes
j
@Zach Klippenstein (he/him) [MOD] Thank you for clearing that up for me. @kevin.cianfarini Thank you for taking the time to help me and providing me with example code.
blob no problem 2
When using the rendezvous channel, when my BT device is not busy, won’t this slow down queuing multiple priority datums? Since each
channel.send(…)
call must be balanced with a
channel.receive(…)
before it returns?
k
The channels are operating differently. • unprioritized data is rendezvous and will force suspension for every element. • priority data is infinitely buffered and you can append as much as you’d like. The consumer of that channel (the
select
clause) will iterate through that data as quickly as possible.
Those two channels, other than the select clause, don’t know anything about each other. Sending a value to one has no impact on the other.
j
There is only one function exposed to rest of my library, to queue data. If for example, I queue 2 priority datums and 3 non priority datums, the subsequent queue calls are blocked until the first two priority datums have been removed from the rendezvous channel. Correct? I’m just trying to figure out if this will have harmful effect on my library for consumers.
k
I queue 2 priority datums and 3 non priority datums, the subsequent queue calls are blocked until the first two priority datums have been removed from the rendezvous channel. Correct?
No, that’s not correct.
Take for example this bit of code:
Copy code
class PriorityDataManager {

  private val priorityChannel = ...
  private val unprioritizedChannel = ...

  public suspend fun send(data: PriorityData) {
    val channel = if (data.isPrioritized) priorityChannel else unprioritizedChannel 
    channel.send(data.data)
  }
}
send
here will only suspend when
PriorityData.isPrioritized
is false. A priority data will always synchronously enqueue the element to
priorityChannel
.
j
@kevin.cianfarini Reading over your example above. I’m still a little unclear on how the blocking will work when my BT device is busy and there is no data in the priority channel but there is data in the non priority channel.
k
Many senders can suspend on a call to
Channel.send
without affecting each other if that's your question
Said differently, a single suspended call to
Channel.send
does not prevent other calls to
Channel.send
from also suspending. Once the channel is ready it will resume those suspended callers and process the data
If you would instead prefer to drop the data instead of suspending, you could
trySend
with a buffer of 1
j
I'm trying to fully understand the example you provided. Looking it over, I don't see what will suspend the writer coroutine while my Bluetooth device is busy and there is no priority data in the priority channel. I want to preserve the blocking write blocking functionality (for non priority data) I had with my Java code but I'm not able to discern where that equivalent functionality is in this Channel/coroutine equivalent.
If the priority channel has no data to receive, the
select {}
will return the data received non priority channel (unblock any potentially blocked senders) even though it's not valid to write the data (device may still be busy). I'll admit that I'm new to Channels so I may be overlooking this functionality, if so please enlighten me.
k
> If the priority channel has no data to receive, the
select {}
will return the data received non priority channel (unblock any potentially blocked senders) even though it’s not valid to write the data (device may still be busy). In my above example
sendToBluetooth
is a suspending function.
Copy code
/**
 * Send [data] to the bluetooth device and suspend until it's stopped processing that data. 
 */
private suspend fun sendToBluetooth(data: ByteArray) {
    TODO("Implement")
}
This function is called from the lambda within the
select
function, and will suspend until the data is written to the BT device. Since this suspends, the code won’t attempt another iteration of the
select
function to pluck off another piece of data until
sendToBluetooth
resumes (which also means it’s ready to process another piece of data).
j
My actual sendToBluetooth() doesn't have logic in it to wait based on the data type. I opted to keep it simple and make my writer coroutine contain the waiting/pacing logic. Otherwise, I would need to have priority checking logic in two places instead of just my writer coroutine.