https://kotlinlang.org logo
#coroutines
Title
# coroutines
c

Czar

03/07/2020, 9:40 AM
Hi. I'm just starting to use coroutines. I need to create a refreshable countdown thingie. Basically it has to do something 10 minutes after some event, if the event happens during those 10 minutes again, countdown should be refreshed. Here's my naive implementation, any suggestions? Am I shooting myself in the foot here? Should I add synchronization/locking?
l

louiscad

03/07/2020, 11:10 AM
Hi! You can make something with flow and plain loops without having to (dangerously) manipulate variable
Deferred
that could break structured concurrency.
c

Czar

03/07/2020, 12:15 PM
how would that look?
l

louiscad

03/07/2020, 12:44 PM
@Czar When you say "the countdown should be refreshed", it's not completely clear. Do you mean reset to its initial 10 minutes? I think rewording what you want to do exactly as clear as possible can help me find the most simple solution for you. Also, about the action you want to run after the 10 minutes? Or event? Is it a suspending action that you would want to cancel in some cases? Should it delay the countdown? You say a countdown, do you just want to get the value for display, or is it a special logic?
c

Czar

03/07/2020, 12:47 PM
So, there is an object with an on/off state. When TURN_ON event is coming in, I'm turning it on if it's off and start counting down. After countdown finishes the object needs to be turned off. meanwhile if while I'm counting another TURN_ON came in, I need to reset the countdown. So. say it's 00:00 and we I received TURN_ON I turn the object on and wait till 00:10 to turn it off But if at 00:05 another TURN_ON came in, I need to ensure that the object will be turned off at 00:15, not 00:10
l

louiscad

03/07/2020, 12:59 PM
@Czar Alright, so you want to toggle off something 10 minutes after the last request to turn it on has been made. Here's one way to do it.
Copy code
val turnOnEvents: Flow<Unit> = getTurnOnEvents() // Can be whatever and whatever type you want.
turnOnEvents.collectLatest {
    ensureTurnedOn()
    delay(10.minutes.toLongMilliseconds())
    turnOff()
}
There's also ways to turn it into a
Flow<Boolean>
where the
Boolean
represents whether the thing should be on and off. Let me know (with a mention to ping me) if you need that or a variation for optimal integration for your use case.
c

Czar

03/07/2020, 1:06 PM
I don't see how this will work, first of all, how do I supply new incoming events into the flow, second, how do I cancel pending turnOff?
l

louiscad

03/07/2020, 1:11 PM
turnOff
can only be called if
delay
completes, which is not happening if its cancelled because a new element came into
turnOnEvents
(as per the
collectLatest
operator behavior that you can see documented). As for creating a flow, there multiple (infinite) ways to do so. The most basic way is to use the
flow { }
builder, do whatever you want and call
emit
once you have a value. If your source is always hot, you'll probably want to use a
BroadcastChannel
, offer values to it (
Unit
if there's nothing special to provide apart from the event itself), and use
asFlow
(or copy paste its implementation into your code since its
@FlowPreview
API that can change).
c

Czar

03/07/2020, 1:12 PM
I'm basically trying to implement interface:
Copy code
interface EventHandler<T : Event> {
  suspend fun onEvent(event: T)
}
The handler is a singleton
l

louiscad

03/07/2020, 1:12 PM
I recommend you to read the documentation of Flow https://kotlinlang.org/docs/reference/coroutines/flow.html
You can use
callbackFlow
for that (wrap calls to
offer
with
runCatching
to avoid race conditions on cancellation and unregistering of your callback).
There's also this talk if you want to get further and see how
Flow
can help you not just for the use case you're having today: https://kotlinconf.com/2019/talks/video/2019/131903/
c

Czar

03/07/2020, 1:36 PM
Still don't see it, but will watch/read further. Seems callbackFlow won't work here, as I don't control the calling code, only thing I have is that interface
l

louiscad

03/07/2020, 1:38 PM
You don't need to control the API you're using to use
callbackFlow
I'm encouraging you to try in a simple project with way shorter timeouts (like seconds, or less)
c

Czar

03/07/2020, 1:43 PM
callbackFlow is a builder, right? so, on execution of onEvent I can call the builder, but I'll have to assign the result somewhere, right? and we get the same situation with reassigning the var in my original code, no?
l

louiscad

03/07/2020, 1:44 PM
You'll want to do the registration of your callback right inside the
callbackFlow
builder, and call
offer
(wrapped in
runCatching
) in your
onEvent
callback. I mean, just like in the KDoc of
callbackFlow
.
If your callback if application wide, you can use a
BroadcastChannel
instead and use
asFlow()
on it as I said before.
c

Czar

03/07/2020, 2:02 PM
you mean something like:
Copy code
object Handler : EventHandler<OnEvent> {
  private val cdc: BroadcastChannel<Long> = BroadcastChannel(1)
  private val countDownFlow = cdc.asFlow()
  override fun handle(event: OnEvent) {
    cdc.send(0L)
    coundDownFlow.collectLatest {
      ensureOn()
      delay(10.minutes)
      turnOff()
    }
  }
}
this doesn't work for me by the way. nothing in
collectLatest
block gets executed.
l

louiscad

03/07/2020, 2:07 PM
No, that's not what I mean. You cannot collect a flow from a callback, it's not designed for it and it doesn't really make sense. You need to create a Flow from the callback, and then, collect it.
@Czar Is your
EventHandler
ever registered and unregistered somewhere?
c

Czar

03/07/2020, 2:08 PM
yes it is picked up by Spring Boot
l

louiscad

03/07/2020, 2:08 PM
I'm not familiar with Spring Boot, but do you, manually call a register and unregister function?
c

Czar

03/07/2020, 2:09 PM
I guess I could
l

louiscad

03/07/2020, 2:09 PM
I have a question: did you read the doc of
callbackFlow
?
c

Czar

03/07/2020, 2:10 PM
Yes, and it's kinda hard to understand how to apply it to what I have
l

louiscad

03/07/2020, 2:10 PM
Because the last snippet doesn't look at all like the example of the doc from
callbackFlow
c

Czar

03/07/2020, 2:10 PM
yes, the last example is based on your suggestion to use BroadcastChannel, not on callbackFlow
l

louiscad

03/07/2020, 2:10 PM
Where is your
EventHandler
coming from, first, and foremost, and is it already registered, or do you pass it somewhere?
c

Czar

03/07/2020, 2:12 PM
simplified it looks like this:
Copy code
val handlers = List<EventHandler<*>>
handlers.forEach { eventBus.register(it) }
l

louiscad

03/07/2020, 2:13 PM
@Czar Correction of your snippet then:
Copy code
private val broadcastChannel: BroadcastChannel<Unit> = BroadcastChannel(1)
val handler = object Handler : EventHandler<OnEvent> {
    override fun handle(event: OnEvent) {
        runCatching { broadcastChannel.offer(Unit) } // Workaround for <https://github.com/Kotlin/kotlinx.coroutines/issues/974>
    }
}
val coundDownFlow = broadcastChannel.asFlow()
coundDownFlow.collectLatest {
    ensureOn()
    delay(10.minutes)
    turnOff()
}
c

Czar

03/07/2020, 2:15 PM
that asFlow().collectLatest should be in a suspending function somewhere
and it should be called by something...
l

louiscad

03/07/2020, 2:16 PM
Yes, nothing new, you can launch a coroutine for that in the scope you need
callbackFlow
is designed to transform a single callback registration, callbacks arriving and unregistering into a
Flow
that you can collect multiple times. If you're using multiple callbacks together (I'm not debating if that's a bad idea or not), you cannot use
callbackFlow
, but the
BroadcastChannel
trick I corrected for you should do.
BTW, Spring has a bunch of
Flow
extensions, maybe there's already something to replace the
EventHandler
you have. In that case, you'd just need to use the
collectLatest
operator on it.
c

Czar

03/07/2020, 2:22 PM
I'm using Spring Boot to tie things together, but the eventBus is not from Spring, it's an external thing
you can launch a coroutine for that in the scope you need
what do you mean? I don't need it anywhere, I just need it to happen 😄 Thanks for all the help by the way, I'm very confused by all of this at the moment
The only suspending thing I've got here is the
handle
function, so where would I launch it? Tried to put it in
init
, but then realizsed it will block the construction of the handler
So the broadcast channel trick is not really working either, as I don't have any suitable place to do the
asFlow().collectLatest{}
Found a way and it does work, but it seems to me ugly as hell:
Copy code
// in the handler object:
init {
  thread(start = true) {
    runBlocking {
      broadcastChannel.asFlow()
        .collectLatest { }
    }
  }
}
It is also unsustainable, as in fact I'll have many such objects and thread per object is exactly something I'm trying to circumvent with coroutines 😞
l

louiscad

03/07/2020, 2:57 PM
I'm really confused, you are looking for a coroutines solution, but you seem to not really use it, at least, no structured concurrency apparently. Anyway, you don't have to create a thread to run a coroutine, you can just use an appropriate scope. If you need it to be global to your application, you can use
GlobalScope.launch
. Otherwise, you can reuse an existing scope, and cancel it if/when you need it (in which case it's probably wiser to replace
BroadcastChannel
with a proper flow built using
callbackFlow
that correctly unregisters the callback in the
awaitClose
lambda.
c

Czar

03/07/2020, 3:22 PM
Aboslutely forgot about GlobalScope...
Probably not such a good idea to skip sleep 😄
Thanks, now everything works
It's just too much theory around coroutines, and I've just started with them, so I'm kinda trying to make stuff work figuring the theory out along the way.
l

louiscad

03/07/2020, 3:34 PM
Oh yeah, sleep 8-9 hours for a while (or better, most of the days in your life) before working on dangerous things like software, it's best!
1
c

Czar

03/08/2020, 11:01 AM
Sleep helped 🙂 Reimplemented the whole thing using actors 🙂 Now it's a beaut
3 Views