Czar
03/07/2020, 9:40 AMlouiscad
03/07/2020, 11:10 AMDeferred that could break structured concurrency.Czar
03/07/2020, 12:15 PMlouiscad
03/07/2020, 12:44 PMCzar
03/07/2020, 12:47 PMlouiscad
03/07/2020, 12:59 PMval turnOnEvents: Flow<Unit> = getTurnOnEvents() // Can be whatever and whatever type you want.
turnOnEvents.collectLatest {
ensureTurnedOn()
delay(10.minutes.toLongMilliseconds())
turnOff()
}
There's also ways to turn it into a Flow<Boolean> where the Boolean represents whether the thing should be on and off. Let me know (with a mention to ping me) if you need that or a variation for optimal integration for your use case.Czar
03/07/2020, 1:06 PMlouiscad
03/07/2020, 1:11 PMturnOff can only be called if delay completes, which is not happening if its cancelled because a new element came into turnOnEvents (as per the collectLatest operator behavior that you can see documented). As for creating a flow, there multiple (infinite) ways to do so. The most basic way is to use the flow { } builder, do whatever you want and call emit once you have a value. If your source is always hot, you'll probably want to use a BroadcastChannel , offer values to it (Unit if there's nothing special to provide apart from the event itself), and use asFlow (or copy paste its implementation into your code since its @FlowPreview API that can change).Czar
03/07/2020, 1:12 PMinterface EventHandler<T : Event> {
suspend fun onEvent(event: T)
}
The handler is a singletonlouiscad
03/07/2020, 1:12 PMlouiscad
03/07/2020, 1:13 PMcallbackFlow for that (wrap calls to offer with runCatching to avoid race conditions on cancellation and unregistering of your callback).louiscad
03/07/2020, 1:14 PMFlow can help you not just for the use case you're having today: https://kotlinconf.com/2019/talks/video/2019/131903/Czar
03/07/2020, 1:36 PMlouiscad
03/07/2020, 1:38 PMcallbackFlowlouiscad
03/07/2020, 1:38 PMCzar
03/07/2020, 1:43 PMlouiscad
03/07/2020, 1:44 PMcallbackFlow builder, and call offer (wrapped in runCatching) in your onEvent callback. I mean, just like in the KDoc of callbackFlow.louiscad
03/07/2020, 1:45 PMBroadcastChannel instead and use asFlow() on it as I said before.Czar
03/07/2020, 2:02 PMobject Handler : EventHandler<OnEvent> {
private val cdc: BroadcastChannel<Long> = BroadcastChannel(1)
private val countDownFlow = cdc.asFlow()
override fun handle(event: OnEvent) {
cdc.send(0L)
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}
}
}
this doesn't work for me by the way. nothing in collectLatest block gets executed.louiscad
03/07/2020, 2:07 PMlouiscad
03/07/2020, 2:07 PMEventHandler ever registered and unregistered somewhere?Czar
03/07/2020, 2:08 PMlouiscad
03/07/2020, 2:08 PMCzar
03/07/2020, 2:09 PMlouiscad
03/07/2020, 2:09 PMcallbackFlow?Czar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMcallbackFlowCzar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMEventHandler coming from, first, and foremost, and is it already registered, or do you pass it somewhere?Czar
03/07/2020, 2:12 PMval handlers = List<EventHandler<*>>
handlers.forEach { eventBus.register(it) }louiscad
03/07/2020, 2:13 PMprivate val broadcastChannel: BroadcastChannel<Unit> = BroadcastChannel(1)
val handler = object Handler : EventHandler<OnEvent> {
override fun handle(event: OnEvent) {
runCatching { broadcastChannel.offer(Unit) } // Workaround for <https://github.com/Kotlin/kotlinx.coroutines/issues/974>
}
}
val coundDownFlow = broadcastChannel.asFlow()
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}Czar
03/07/2020, 2:15 PMCzar
03/07/2020, 2:16 PMlouiscad
03/07/2020, 2:16 PMlouiscad
03/07/2020, 2:17 PMcallbackFlow is designed to transform a single callback registration, callbacks arriving and unregistering into a Flow that you can collect multiple times.
If you're using multiple callbacks together (I'm not debating if that's a bad idea or not), you cannot use callbackFlow, but the BroadcastChannel trick I corrected for you should do.louiscad
03/07/2020, 2:19 PMFlow extensions, maybe there's already something to replace the EventHandler you have. In that case, you'd just need to use the collectLatest operator on it.Czar
03/07/2020, 2:22 PMCzar
03/07/2020, 2:23 PMyou can launch a coroutine for that in the scope you needwhat do you mean? I don't need it anywhere, I just need it to happen 😄 Thanks for all the help by the way, I'm very confused by all of this at the moment
Czar
03/07/2020, 2:26 PMhandle function, so where would I launch it? Tried to put it in init, but then realizsed it will block the construction of the handlerCzar
03/07/2020, 2:32 PMasFlow().collectLatest{}Czar
03/07/2020, 2:47 PM// in the handler object:
init {
thread(start = true) {
runBlocking {
broadcastChannel.asFlow()
.collectLatest { }
}
}
}
It is also unsustainable, as in fact I'll have many such objects and thread per object is exactly something I'm trying to circumvent with coroutines 😞louiscad
03/07/2020, 2:57 PMGlobalScope.launch. Otherwise, you can reuse an existing scope, and cancel it if/when you need it (in which case it's probably wiser to replace BroadcastChannel with a proper flow built using callbackFlow that correctly unregisters the callback in the awaitClose lambda.Czar
03/07/2020, 3:22 PMCzar
03/07/2020, 3:22 PMCzar
03/07/2020, 3:23 PMCzar
03/07/2020, 3:24 PMlouiscad
03/07/2020, 3:34 PMCzar
03/08/2020, 11:01 AM