There is so many concurrent options out there. I w...
# kotlin-native
j
There is so many concurrent options out there. I wonder, whats the most efficient and elegant way of doing a queue (like ConcurrentLinkedQueue) thats thread safe and able to add new items while consuming queue but only run one operation at the time (Protect another place not thread safe)? Feels like Mutex potentially but not sure.
p
Your use case sounds like
channel
j
Do you know any nice sample combine this? The combo doing it elegant, effective and right is hard find as so many stuff available.
Each operation being basically runBlocking inside queue fetch. I guess could be done with collect from channel or?
k
Can you be a bit more specific about what outcomes you’d like for sample code?
This piece feels really vague and nondescript to me:
thats thread safe and able to add new items while consuming queue but only run one operation at the time (Protect another place not thread safe)? Feels like Mutex potentially but not sure.
j
Like usage API of Channel, Queue, Coroutine APIs being thread safe. Also synchronized deprecated?
A queue of operations, and consume one operation at the time until each complete. When complete, consume next event if any. If new operations added, dont impact order or thread issues. And if queue empty temporary "pause" until new operations added :)
k
Copy code
suspend fun main() = coroutineScope {
  val channel = Channel(Channel.BUFFERED)
  launch { channel.consumeValues() }
  launch { channel.produceValues() }
}

suspend fun Channel<Int>.consumeValues() {
  for (item in this) { // suspends until items available in channel
    println(item)
  }
}

suspend fun Channel<Int>.produceValues(): Nothing {
  var i = 0
  while (true) {
    delay(100)
    send(i++) // suspends until there's space to submit into the channel
  }
}
Your use case is the specific use case for which channels were created. An async queue of elements.
You could potentially use Flow as well, but channel should be fine.
j
Wow wasnt aware that easy 😀 Did thought in that direction but didnt know Channel like that possible. Can produce and consume being launched in same scope be thread issue?
Also produce values would be a method I guess adding one at the time and no forever while loop in reality :)
k
Channels are concurrent (and thread) safe
Channels are purpose built to communicate between coroutines, so concurrency and thread safety are built in
j
I meant the launch thing itself produce and consume in same scope. To be sure called one at the time always :)
Got so used with Shared and State Flows forgot about Channels 😀 Also buffered, can it get buffer Overflow?
k
On buffer overflow channels will suspend until the buffer is ready to accept new elements
You can even set the buffer to 0 which creates a special type of channel, a rendezvous channel, which only delivers elements when both the sending and receiving coroutine can meet.
The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.
j
Nice, perfect! Yeah feels like perfect use case for this. Also ideal using Flow as Turbine test library. I now have some kind of queue and recursive loop thing with Mutex feeling messy. Hence wanted to check 🙂
k
Yeah definitely lean on channel and flow for this. It’s what they’re built for! 🙂
j
Yeah no more synchronized and such. Stuck in old Java world for thread stuff like this. Feeling awkward working with Kotlin and Coroutines since 2016 and forgetting Channel. Love flowable and reactive thinking. Easier think in ui world but in this case being more hardware close, need block while work going on because hardware limitation kind a.
Declaratibe coroutines mixing async and sync work is amazingly well. Thanks @kevin.cianfarini, really appreciate it! Tomorrow will be fun implement rest!
k
No problem!
j
I think moving consume values to init block in a class and produce into a new add value.
p
I had some class that kind of encapsulates a channel and a scope. Check it out https://github.com/pablichjenkov/Android-Actor/blob/master/app/src/main/java/com/hamperapp/actor/Actor.kt By that time I named it Actor but the name is a bit controversial. Should probably be named EventProcessingPipe or sort of
k
If you're using callbackflow, there's two things to note: 1. You probably don't need a separate channel. Just pump elements into the channel inside the
channelFlow
builder and consume them as a flow. 2. You're bridging the non-suspend world to the suspend world. This has consequences; namely, you can't apply backpressure to a producer from a synchronous callback without blocking the thread.
Oh wait, I think I may have misunderstood your question.
If I did, let me know. I'm not sure if I misunderstood or not.
p
If you want to suspend some code which execution was originated from the jni, I would say, just suspend. When the coroutine resumes at that suspension point then you would call back the jni. I don’t think is anything relevant with the fact that the call comes from and go to the jni, but I might be wrong
j
@Pablichjenkov I solved it by first using suspendCoroutine() and inside assign lambda block calling continuation.resume. Because JNI first has callback start something (not blocking) and later on do callback my Kotlin code method later on. Not sure if I can wrap the entire thing with suspend only called from Channel forEach? As it also need to block upcoming things calling JNI. Because JNI callback not thread safe.
Very ugly as using nullable (ContinuationData) -> Unit lambda type and if exists call it. Bad part need to make sure that I dont accidentally write over pending lambda. But solved by suspend inside the channel forEach.
@kevin.cianfarini Yeah my intention was maybe using callback flow instead but seems same issue as with suspendCoroutine, somewhere inside block need to have my JNI callback which it isnt as JNI code callbacks into another method. Ideally solved by refactor the entire JNI code ofc, but not possible right now.
p
Sounds like tricky yeah