Hi oh lovely community! Is there something similar...
# coroutines
u
Hi oh lovely community! Is there something similar to a condition variable on coroutines?
d
Hmm,
CompletableDeferred
might be what you're looking for.
If you outline more of your use case, I might be able to recommend something better.
u
This is all proprietary stuff so sorry in advance for the vagueness. I have a “controller” object that might come and go. I have an interface that aquires an instance of the controller object and has methods that call methods on the controller.
So what I do today is far from optimal but I use a cond and a threadpool. I post a task on the thread pool and wait (with a timeout) for a controller instance to be ready for use and then I use that one.
I more or less want the uncertainty of wether or not a controller is currently available to be hidden from the user of the API
d
So am I correct in saying there's a single controller that multiple threads have to share? You use conditions to synchronise access?
u
I use conditions to ping the waiting threads about the fact that a controller is ready for use: 1. Connect the controller 2. Do something with the still connecting controller. 3. Controller comes online an wakes up the waiting threads.
d
Oh, I see.
u
Now what I actually want coroutines to provide me with is to make the methods suspend so that the user can get notified when a method completes on a controller without having to use a callback
d
Something like this or...
Copy code
// This has internal threadpool
val api = TODO("Get super secret api. :) ")

// 1) This suspends
api.submitTask { controller ->
    // Do stuff with connected controller.
}

// 2) This suspends
val controller = api.getController()
// Do stuff with connected controller.
1 or 2?
u
More like api.sendCommand() where api.sendCommand suspends
d
Oh and
sendCommand
will use the controller internally. The user won't touch the controller directly.
u
Exactly!
The user will have no idea that the controller exists. The controller is provided via a callback that more or less has a
onConnected
and an
onDisconnected
method
The callback is also handled internally
d
I'm guessing inside
onConnected
you signal the internal threadpool to stop waiting and use the controller.
What do you do in
onDisconnected
?
Cancel tasks?
Or just pause future tasks until
onConnected
is called again?
u
Lock and set to null more or less.
Copy code
fun sendCommand() {
  lock.lock()
  try {
    while(!controller.isConnected) {
      cond.await()
    }
    controller.runCommand()
  } finally {
    lock.unlock()
  }
}
That is more or less the code
With some timeouts for the await
d
From what I've seen you could use a
ConflatedBroadcastChannel
.
u
Now I could make the function suspend and wrap it in
suspendCoroutine
so that way the user can use coroutines but internally it is threads
d
Copy code
fun sendCommand() {
  val input = channel.openSubscription()
  try {
    input.receive()
    controller.runCommand()
  } finally {
    input.cancel()
  }
}
u
Will is remember the last used element?
Or will it only trigger on new events?
d
ConflatedBroadcastChannel
is designed to remember the last sent element.
u
That is awesome!
So if i send null then it will not receive any new messages?
d
You can't send null.
Copy code
fun onConnected() {
  channel.offer(Unit)
}
u
Is there a way for me to clear it? I was thinking that I could send the actual controller through the channel and then clear it if it goes away
d
Oh
u
Because otherwise I will probably have to use some sort of mutex anyhow to ensure threadsafety on the controller.
d
You can make the channel
ConflatedBroadcastChannel<Controller?>
then yes.
u
In that case this is exactly what I was looking for!
d
Oh, so you need synchronised access to the controller.
u
Yeah that too hehe
d
Hold on, this might change things.
(I forgot how conditions work).
u
I want synchronized access to an object that might be in an unusable state. If so I want all tasks to wait for it to get back to a usable state
And wake up once it reached the usable state
d
Okay, I have a better idea now but I have one last question.
u
Shoot!
d
Why do you use a threadpool if you need synchronisation?
u
Because I don’t want the methods in the interface to be blocking and they might have to block until the controller is usable. I also don’t want the risk of spawning an unlimited amount of threads
d
Okay, you don't need the
ConflatedBroadcastChannel
in that case.
In
sendCommand
you can pass a some data into a channel.
Then have a coroutine run through the channel.
I'll try and find an example of it.
u
But how do I make the worker coroutine wait if the controller is in an unusable state?
d
For that you can use an
AtomicBoolean
to keep track.
You can use a regular channel with conflation.
u
So one channel for tasks and one for conflation?
d
Yes
This way you only have one running thread.
u
That is pretty cool actually.
I will give it a shot!
d
😎
u
Thank you so much for the nice discussion. By the way i have had a slight break from android and kotlin but are channels in stable yet or is it still experimental?
d
It's mostly out of experimental.
u
So there are some things missing but most of it is available?
d
Some classes like
ConflatedBroadcastChannel
are still experimental but luckily you don't need it anymore. 🙂
Basically everything you need for this is not experimental.
u
But didn’t you say that i should use a regular channel with conflation?
d
Yes, you can use
Channel(capacity = CONFLATED)
.
u
Aaah
Cool. I will for sure give this a go. I like it a lot actually.
Though I think I would like a suspending cond more but that is probably because of braindamage caused by C
d
Haha, you'll recover with Kotlin. 😂
Forget to mention, you can handle timeouts with
withTimeout(1000) { api.sendCommand(...) }
.
u
I might be stupid here but how can I clear the channel if my controller disconnects?
d
Which channel?
Task channel or controller channel?
u
The second channel that indicates if the controller is in a ready state
Or should that just be a channel of booleans?
d
Ha, you beat me to it.
Yeah
Actually maybe not, because you still need some way to wait for the channel to become true/non-null.
u
Yeah exactly what i thougt
d
This is tricky.
You can send `CompletableDeferred`s down the channel.
u
Not sure. How would that help me?
d
Then in
onConnected
call
complete
on the current
CompletableDeferred
and in
onDisconnected
send a new
CompletableDeferred
.
u
Maybe I could actually go for the takeIf and use an AtomicBoolean?
d
Then in the processing coroutine you can call await.
Or don't use conflation and keep track of the state in the processing coroutine.
AtomicBoolean
won't let you wait.
So in the processing coroutine, you'll call
poll()
to see if there's a new state change, if the new value is
true
then you can continue processing, if the new value is
false
you then call
receive()
to wait.
u
I was thinking that I do something like
Copy code
val isConnected = AtomicBoolean(false)

eventChannel.takeIf { isConnected.get() }.onReceive {event ->
  try {
    controller.handleEvent()
  } finally {
    if (!isConnected.get()) {
      eventChannel.send(event)
    }
  }
}
So if the executing of the eventt fails due to disconnection i will just re-add the event to the channel.
d
It still doesn't solve the problem of having to wait for connection.
u
Aaaah that is very true!
Copy code
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) { }
}
Something like that?
Or could i not just do this
Copy code
while (!isConnectedChannel.receive()) {}
Oh no I can’t because the channel is Conflated
d
Yeah
u
Hopefully this:
Copy code
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) {}
}
Should do the trick right?
d
Yes but only for a single event.
You have to remember that the controller is connected for future events.
u
Copy code
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) { }
  isConnectedChannel.send(true)
}
Hehe
d
Haha, wow.
u
Should work I guess
d
I don't know where but I see a race condition.
That will race with the actual
onConnected
and
onDisconnected
.
u
Aaah yeah that is true…
Im starting to feel like it’s giving up time I think. My current solution is sort of ugly but it works
d
Give me 5 minutes 😁. I'm close to something.
u
Haha
Love the enthusiasm. I guess I will go and make some coffee. Sounds like it might be a long night!
d
😂
u
What I actually want sort of is live data. An observable data holder
That suspends
d
If you don't mind experimental,
Flow
is an option.
u
How would it work with Flow?
Haven’t really looked in to it all the much
d
I haven't needed to use it much but it has some operators that might help express our intentions better. Like the conflation. Although it would require more info from you end on how to abstract it well.
u
Can’t it be done with a
combine
and a
takeWhile
d
Like how many other callbacks does your controller expose, apart from
onConnected
,
onDisconnected
, etc.
Yeah, pretty much.
u
The controller had connect and disconnect + a sendCommand where the command is a bytebuffer 🎉
Awfully low level stuff that I am now trying to provide a workable API for
d
Oh okay, that should simplify things.
u
Do I have to launch a new coroutine just to emit a value?
d
Copy code
callbackFlow<Boolean> { channel -> 
        val controller = TODO("")
            
        controller.onConnected {
            channel.offer(true)
        }
        controller.onDisconnected {
            channel.offer(false)
         }
            
        awaitClose { controller.close() }
    }
u
Aaah
Great!
d
I can't figure out how to solve this with
Flow
and the
Channel
solution is getting out of hand. The original
ConflatedBroadcastChannel
will do the trick.
Copy code
val connectionChannel = ConflatedBroadcastChannel<Boolean>()

GlobalScope.launch {
    while (isActive) {
        if (!connectionChannel.value) {
            val channel = connectionChannel.openSubscription()
            while (!channel.receive());
            channel.cancel()
        }

        try {
            // Handle event
        } finally {
            // Requeue if necessary.
        }
    }
}
u
Cool! I have a look tomorrow. Had to go home and feed the cats.
Thank you so much for taking the time to help me ❤️
d
No problem!
u
Implemented it now and it works wonders! Thank you for showing me these awesome things that you can do with coroutines!
d
No problem.