Czar
03/07/2020, 9:40 AMlouiscad
03/07/2020, 11:10 AMDeferred
that could break structured concurrency.Czar
03/07/2020, 12:15 PMlouiscad
03/07/2020, 12:44 PMCzar
03/07/2020, 12:47 PMlouiscad
03/07/2020, 12:59 PMval turnOnEvents: Flow<Unit> = getTurnOnEvents() // Can be whatever and whatever type you want.
turnOnEvents.collectLatest {
ensureTurnedOn()
delay(10.minutes.toLongMilliseconds())
turnOff()
}
There's also ways to turn it into a Flow<Boolean>
where the Boolean
represents whether the thing should be on and off. Let me know (with a mention to ping me) if you need that or a variation for optimal integration for your use case.Czar
03/07/2020, 1:06 PMlouiscad
03/07/2020, 1:11 PMturnOff
can only be called if delay
completes, which is not happening if its cancelled because a new element came into turnOnEvents
(as per the collectLatest
operator behavior that you can see documented). As for creating a flow, there multiple (infinite) ways to do so. The most basic way is to use the flow { }
builder, do whatever you want and call emit
once you have a value. If your source is always hot, you'll probably want to use a BroadcastChannel
, offer values to it (Unit
if there's nothing special to provide apart from the event itself), and use asFlow
(or copy paste its implementation into your code since its @FlowPreview
API that can change).Czar
03/07/2020, 1:12 PMinterface EventHandler<T : Event> {
suspend fun onEvent(event: T)
}
The handler is a singletonlouiscad
03/07/2020, 1:12 PMlouiscad
03/07/2020, 1:13 PMcallbackFlow
for that (wrap calls to offer
with runCatching
to avoid race conditions on cancellation and unregistering of your callback).louiscad
03/07/2020, 1:14 PMFlow
can help you not just for the use case you're having today: https://kotlinconf.com/2019/talks/video/2019/131903/Czar
03/07/2020, 1:36 PMlouiscad
03/07/2020, 1:38 PMcallbackFlow
louiscad
03/07/2020, 1:38 PMCzar
03/07/2020, 1:43 PMlouiscad
03/07/2020, 1:44 PMcallbackFlow
builder, and call offer
(wrapped in runCatching
) in your onEvent
callback. I mean, just like in the KDoc of callbackFlow
.louiscad
03/07/2020, 1:45 PMBroadcastChannel
instead and use asFlow()
on it as I said before.Czar
03/07/2020, 2:02 PMobject Handler : EventHandler<OnEvent> {
private val cdc: BroadcastChannel<Long> = BroadcastChannel(1)
private val countDownFlow = cdc.asFlow()
override fun handle(event: OnEvent) {
cdc.send(0L)
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}
}
}
this doesn't work for me by the way. nothing in collectLatest
block gets executed.louiscad
03/07/2020, 2:07 PMlouiscad
03/07/2020, 2:07 PMEventHandler
ever registered and unregistered somewhere?Czar
03/07/2020, 2:08 PMlouiscad
03/07/2020, 2:08 PMCzar
03/07/2020, 2:09 PMlouiscad
03/07/2020, 2:09 PMcallbackFlow
?Czar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMcallbackFlow
Czar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMEventHandler
coming from, first, and foremost, and is it already registered, or do you pass it somewhere?Czar
03/07/2020, 2:12 PMval handlers = List<EventHandler<*>>
handlers.forEach { eventBus.register(it) }
louiscad
03/07/2020, 2:13 PMprivate val broadcastChannel: BroadcastChannel<Unit> = BroadcastChannel(1)
val handler = object Handler : EventHandler<OnEvent> {
override fun handle(event: OnEvent) {
runCatching { broadcastChannel.offer(Unit) } // Workaround for <https://github.com/Kotlin/kotlinx.coroutines/issues/974>
}
}
val coundDownFlow = broadcastChannel.asFlow()
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}
Czar
03/07/2020, 2:15 PMCzar
03/07/2020, 2:16 PMlouiscad
03/07/2020, 2:16 PMlouiscad
03/07/2020, 2:17 PMcallbackFlow
is designed to transform a single callback registration, callbacks arriving and unregistering into a Flow
that you can collect multiple times.
If you're using multiple callbacks together (I'm not debating if that's a bad idea or not), you cannot use callbackFlow
, but the BroadcastChannel
trick I corrected for you should do.louiscad
03/07/2020, 2:19 PMFlow
extensions, maybe there's already something to replace the EventHandler
you have. In that case, you'd just need to use the collectLatest
operator on it.Czar
03/07/2020, 2:22 PMCzar
03/07/2020, 2:23 PMyou can launch a coroutine for that in the scope you needwhat do you mean? I don't need it anywhere, I just need it to happen 😄 Thanks for all the help by the way, I'm very confused by all of this at the moment
Czar
03/07/2020, 2:26 PMhandle
function, so where would I launch it? Tried to put it in init
, but then realizsed it will block the construction of the handlerCzar
03/07/2020, 2:32 PMasFlow().collectLatest{}
Czar
03/07/2020, 2:47 PM// in the handler object:
init {
thread(start = true) {
runBlocking {
broadcastChannel.asFlow()
.collectLatest { }
}
}
}
It is also unsustainable, as in fact I'll have many such objects and thread per object is exactly something I'm trying to circumvent with coroutines 😞louiscad
03/07/2020, 2:57 PMGlobalScope.launch
. Otherwise, you can reuse an existing scope, and cancel it if/when you need it (in which case it's probably wiser to replace BroadcastChannel
with a proper flow built using callbackFlow
that correctly unregisters the callback in the awaitClose
lambda.Czar
03/07/2020, 3:22 PMCzar
03/07/2020, 3:22 PMCzar
03/07/2020, 3:23 PMCzar
03/07/2020, 3:24 PMlouiscad
03/07/2020, 3:34 PMCzar
03/08/2020, 11:01 AM