https://kotlinlang.org logo
Title
u

540grunkspin

08/12/2019, 1:58 PM
Hi oh lovely community! Is there something similar to a condition variable on coroutines?
d

Dominaezzz

08/12/2019, 2:14 PM
Hmm,
CompletableDeferred
might be what you're looking for.
If you outline more of your use case, I might be able to recommend something better.
u

540grunkspin

08/12/2019, 2:18 PM
This is all proprietary stuff so sorry in advance for the vagueness. I have a “controller” object that might come and go. I have an interface that aquires an instance of the controller object and has methods that call methods on the controller.
So what I do today is far from optimal but I use a cond and a threadpool. I post a task on the thread pool and wait (with a timeout) for a controller instance to be ready for use and then I use that one.
I more or less want the uncertainty of wether or not a controller is currently available to be hidden from the user of the API
d

Dominaezzz

08/12/2019, 2:21 PM
So am I correct in saying there's a single controller that multiple threads have to share? You use conditions to synchronise access?
u

540grunkspin

08/12/2019, 2:23 PM
I use conditions to ping the waiting threads about the fact that a controller is ready for use: 1. Connect the controller 2. Do something with the still connecting controller. 3. Controller comes online an wakes up the waiting threads.
d

Dominaezzz

08/12/2019, 2:24 PM
Oh, I see.
u

540grunkspin

08/12/2019, 2:24 PM
Now what I actually want coroutines to provide me with is to make the methods suspend so that the user can get notified when a method completes on a controller without having to use a callback
d

Dominaezzz

08/12/2019, 2:28 PM
Something like this or...
// This has internal threadpool
val api = TODO("Get super secret api. :) ")

// 1) This suspends
api.submitTask { controller ->
    // Do stuff with connected controller.
}

// 2) This suspends
val controller = api.getController()
// Do stuff with connected controller.
1 or 2?
u

540grunkspin

08/12/2019, 2:30 PM
More like api.sendCommand() where api.sendCommand suspends
d

Dominaezzz

08/12/2019, 2:31 PM
Oh and
sendCommand
will use the controller internally. The user won't touch the controller directly.
u

540grunkspin

08/12/2019, 2:31 PM
Exactly!
The user will have no idea that the controller exists. The controller is provided via a callback that more or less has a
onConnected
and an
onDisconnected
method
The callback is also handled internally
d

Dominaezzz

08/12/2019, 2:33 PM
I'm guessing inside
onConnected
you signal the internal threadpool to stop waiting and use the controller.
What do you do in
onDisconnected
?
Cancel tasks?
Or just pause future tasks until
onConnected
is called again?
u

540grunkspin

08/12/2019, 2:36 PM
Lock and set to null more or less.
fun sendCommand() {
  lock.lock()
  try {
    while(!controller.isConnected) {
      cond.await()
    }
    controller.runCommand()
  } finally {
    lock.unlock()
  }
}
That is more or less the code
With some timeouts for the await
d

Dominaezzz

08/12/2019, 2:38 PM
From what I've seen you could use a
ConflatedBroadcastChannel
.
u

540grunkspin

08/12/2019, 2:38 PM
Now I could make the function suspend and wrap it in
suspendCoroutine
so that way the user can use coroutines but internally it is threads
d

Dominaezzz

08/12/2019, 2:40 PM
fun sendCommand() {
  val input = channel.openSubscription()
  try {
    input.receive()
    controller.runCommand()
  } finally {
    input.cancel()
  }
}
u

540grunkspin

08/12/2019, 2:40 PM
Will is remember the last used element?
Or will it only trigger on new events?
d

Dominaezzz

08/12/2019, 2:41 PM
ConflatedBroadcastChannel
is designed to remember the last sent element.
u

540grunkspin

08/12/2019, 2:41 PM
That is awesome!
So if i send null then it will not receive any new messages?
d

Dominaezzz

08/12/2019, 2:43 PM
You can't send null.
fun onConnected() {
  channel.offer(Unit)
}
u

540grunkspin

08/12/2019, 2:43 PM
Is there a way for me to clear it? I was thinking that I could send the actual controller through the channel and then clear it if it goes away
d

Dominaezzz

08/12/2019, 2:44 PM
Oh
u

540grunkspin

08/12/2019, 2:44 PM
Because otherwise I will probably have to use some sort of mutex anyhow to ensure threadsafety on the controller.
d

Dominaezzz

08/12/2019, 2:44 PM
You can make the channel
ConflatedBroadcastChannel<Controller?>
then yes.
u

540grunkspin

08/12/2019, 2:45 PM
In that case this is exactly what I was looking for!
d

Dominaezzz

08/12/2019, 2:45 PM
Oh, so you need synchronised access to the controller.
u

540grunkspin

08/12/2019, 2:45 PM
Yeah that too hehe
d

Dominaezzz

08/12/2019, 2:45 PM
Hold on, this might change things.
(I forgot how conditions work).
u

540grunkspin

08/12/2019, 2:46 PM
I want synchronized access to an object that might be in an unusable state. If so I want all tasks to wait for it to get back to a usable state
And wake up once it reached the usable state
d

Dominaezzz

08/12/2019, 2:47 PM
Okay, I have a better idea now but I have one last question.
u

540grunkspin

08/12/2019, 2:47 PM
Shoot!
d

Dominaezzz

08/12/2019, 2:47 PM
Why do you use a threadpool if you need synchronisation?
u

540grunkspin

08/12/2019, 2:49 PM
Because I don’t want the methods in the interface to be blocking and they might have to block until the controller is usable. I also don’t want the risk of spawning an unlimited amount of threads
d

Dominaezzz

08/12/2019, 2:51 PM
Okay, you don't need the
ConflatedBroadcastChannel
in that case.
In
sendCommand
you can pass a some data into a channel.
Then have a coroutine run through the channel.
I'll try and find an example of it.
u

540grunkspin

08/12/2019, 2:53 PM
But how do I make the worker coroutine wait if the controller is in an unusable state?
d
For that you can use an
AtomicBoolean
to keep track.
You can use a regular channel with conflation.
u

540grunkspin

08/12/2019, 2:55 PM
So one channel for tasks and one for conflation?
d

Dominaezzz

08/12/2019, 2:56 PM
Yes
This way you only have one running thread.
u

540grunkspin

08/12/2019, 2:56 PM
That is pretty cool actually.
I will give it a shot!
d

Dominaezzz

08/12/2019, 2:56 PM
😎
u

540grunkspin

08/12/2019, 2:57 PM
Thank you so much for the nice discussion. By the way i have had a slight break from android and kotlin but are channels in stable yet or is it still experimental?
d

Dominaezzz

08/12/2019, 2:58 PM
It's mostly out of experimental.
u

540grunkspin

08/12/2019, 2:59 PM
So there are some things missing but most of it is available?
d

Dominaezzz

08/12/2019, 2:59 PM
Some classes like
ConflatedBroadcastChannel
are still experimental but luckily you don't need it anymore. 🙂
Basically everything you need for this is not experimental.
u

540grunkspin

08/12/2019, 3:00 PM
But didn’t you say that i should use a regular channel with conflation?
d

Dominaezzz

08/12/2019, 3:01 PM
Yes, you can use
Channel(capacity = CONFLATED)
.
u

540grunkspin

08/12/2019, 3:01 PM
Aaah
Cool. I will for sure give this a go. I like it a lot actually.
Though I think I would like a suspending cond more but that is probably because of braindamage caused by C
d

Dominaezzz

08/12/2019, 3:04 PM
Haha, you'll recover with Kotlin. 😂
Forget to mention, you can handle timeouts with
withTimeout(1000) { api.sendCommand(...) }
.
u

540grunkspin

08/12/2019, 3:17 PM
I might be stupid here but how can I clear the channel if my controller disconnects?
d

Dominaezzz

08/12/2019, 3:18 PM
Which channel?
Task channel or controller channel?
u

540grunkspin

08/12/2019, 3:18 PM
The second channel that indicates if the controller is in a ready state
Or should that just be a channel of booleans?
d

Dominaezzz

08/12/2019, 3:19 PM
Ha, you beat me to it.
Yeah
Actually maybe not, because you still need some way to wait for the channel to become true/non-null.
u

540grunkspin

08/12/2019, 3:20 PM
Yeah exactly what i thougt
d

Dominaezzz

08/12/2019, 3:21 PM
This is tricky.
You can send `CompletableDeferred`s down the channel.
u

540grunkspin

08/12/2019, 3:23 PM
Not sure. How would that help me?
d

Dominaezzz

08/12/2019, 3:24 PM
Then in
onConnected
call
complete
on the current
CompletableDeferred
and in
onDisconnected
send a new
CompletableDeferred
.
u

540grunkspin

08/12/2019, 3:25 PM
Maybe I could actually go for the takeIf and use an AtomicBoolean?
d

Dominaezzz

08/12/2019, 3:25 PM
Then in the processing coroutine you can call await.
Or don't use conflation and keep track of the state in the processing coroutine.
AtomicBoolean
won't let you wait.
So in the processing coroutine, you'll call
poll()
to see if there's a new state change, if the new value is
true
then you can continue processing, if the new value is
false
you then call
receive()
to wait.
u

540grunkspin

08/12/2019, 3:29 PM
I was thinking that I do something like
val isConnected = AtomicBoolean(false)

eventChannel.takeIf { isConnected.get() }.onReceive {event ->
  try {
    controller.handleEvent()
  } finally {
    if (!isConnected.get()) {
      eventChannel.send(event)
    }
  }
}
So if the executing of the eventt fails due to disconnection i will just re-add the event to the channel.
d

Dominaezzz

08/12/2019, 3:32 PM
It still doesn't solve the problem of having to wait for connection.
u

540grunkspin

08/12/2019, 3:33 PM
Aaaah that is very true!
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) { }
}
Something like that?
Or could i not just do this
while (!isConnectedChannel.receive()) {}
Oh no I can’t because the channel is Conflated
d

Dominaezzz

08/12/2019, 3:37 PM
Yeah
u

540grunkspin

08/12/2019, 3:38 PM
Hopefully this:
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) {}
}
Should do the trick right?
d

Dominaezzz

08/12/2019, 3:40 PM
Yes but only for a single event.
You have to remember that the controller is connected for future events.
u

540grunkspin

08/12/2019, 3:42 PM
if (isConnectedChannel.poll() != true) {
  while (!isConnectedChannel.receive()) { }
  isConnectedChannel.send(true)
}
Hehe
d

Dominaezzz

08/12/2019, 3:43 PM
Haha, wow.
u

540grunkspin

08/12/2019, 3:44 PM
Should work I guess
d

Dominaezzz

08/12/2019, 3:44 PM
I don't know where but I see a race condition.
That will race with the actual
onConnected
and
onDisconnected
.
u

540grunkspin

08/12/2019, 3:45 PM
Aaah yeah that is true…
Im starting to feel like it’s giving up time I think. My current solution is sort of ugly but it works
d

Dominaezzz

08/12/2019, 3:47 PM
Give me 5 minutes 😁. I'm close to something.
u

540grunkspin

08/12/2019, 3:47 PM
Haha
Love the enthusiasm. I guess I will go and make some coffee. Sounds like it might be a long night!
d

Dominaezzz

08/12/2019, 3:48 PM
😂
u

540grunkspin

08/12/2019, 3:52 PM
What I actually want sort of is live data. An observable data holder
That suspends
d

Dominaezzz

08/12/2019, 3:53 PM
If you don't mind experimental,
Flow
is an option.
u

540grunkspin

08/12/2019, 3:53 PM
How would it work with Flow?
Haven’t really looked in to it all the much
d

Dominaezzz

08/12/2019, 3:56 PM
I haven't needed to use it much but it has some operators that might help express our intentions better. Like the conflation. Although it would require more info from you end on how to abstract it well.
u

540grunkspin

08/12/2019, 3:57 PM
Can’t it be done with a
combine
and a
takeWhile
d

Dominaezzz

08/12/2019, 3:57 PM
Like how many other callbacks does your controller expose, apart from
onConnected
,
onDisconnected
, etc.
Yeah, pretty much.
u

540grunkspin

08/12/2019, 3:58 PM
The controller had connect and disconnect + a sendCommand where the command is a bytebuffer 🎉
Awfully low level stuff that I am now trying to provide a workable API for
d

Dominaezzz

08/12/2019, 4:01 PM
Oh okay, that should simplify things.
u

540grunkspin

08/12/2019, 4:07 PM
Do I have to launch a new coroutine just to emit a value?
d

Dominaezzz

08/12/2019, 4:08 PM
callbackFlow<Boolean> { channel -> 
        val controller = TODO("")
            
        controller.onConnected {
            channel.offer(true)
        }
        controller.onDisconnected {
            channel.offer(false)
         }
            
        awaitClose { controller.close() }
    }
u

540grunkspin

08/12/2019, 4:09 PM
Aaah
Great!
d

Dominaezzz

08/12/2019, 4:28 PM
I can't figure out how to solve this with
Flow
and the
Channel
solution is getting out of hand. The original
ConflatedBroadcastChannel
will do the trick.
val connectionChannel = ConflatedBroadcastChannel<Boolean>()

GlobalScope.launch {
    while (isActive) {
        if (!connectionChannel.value) {
            val channel = connectionChannel.openSubscription()
            while (!channel.receive());
            channel.cancel()
        }

        try {
            // Handle event
        } finally {
            // Requeue if necessary.
        }
    }
}
u

540grunkspin

08/12/2019, 5:17 PM
Cool! I have a look tomorrow. Had to go home and feed the cats.
Thank you so much for taking the time to help me ❤️
d

Dominaezzz

08/12/2019, 5:30 PM
No problem!
u

540grunkspin

08/13/2019, 7:44 AM
Implemented it now and it works wonders! Thank you for showing me these awesome things that you can do with coroutines!
d

Dominaezzz

08/13/2019, 9:24 AM
No problem.