simon.vergauwen
03/21/2020, 12:04 PMQueue works just like any other non-blocking Queue you might be familiar with.
You can split concerns into Enqueue and Dequeue if you want, to split the concern into who is producing events and who is consuming them.
They all work on a 1-offer to 1-taker basis tho, since it removes from the Queue unless you use the peek operators. So if you consume an event with take than it’s also removed from the Queue, so this work similarly to java.util.concurrent.Queue or scala.concurrent.Queue but with a functional non-blocking api in F with full support for cancelation etc.
We’ll be writing on some additional docs on Queue and the other new stuff soon, so if there are any specific use-cases you’re trying to solve you cannot solve with the current feature set in 0.10.5 I’d happy to look into them and perhaps include them in some form of documentation.kartoffelsup
03/21/2020, 12:16 PMpakoito
03/21/2020, 12:57 PMpakoito
03/21/2020, 12:58 PMMap<Class<*>, List<Queue>>kartoffelsup
03/21/2020, 3:21 PMkartoffelsup
03/24/2020, 7:30 PMkartoffelsup
03/24/2020, 8:01 PMpakoito
03/24/2020, 8:28 PMpakoito
03/24/2020, 8:28 PMpakoito
03/24/2020, 8:30 PMtake() to get resultspakoito
03/24/2020, 8:31 PMpakoito
03/24/2020, 8:31 PMpakoito
03/24/2020, 8:32 PMfun bla() = IO.fx {
val result = !eventBus.listen(Request::class)
!effect { somethingWith(result) }
if (!eventBus.globalStop()) {
!bla()
}
}kartoffelsup
03/24/2020, 9:54 PMpakoito
03/24/2020, 9:57 PMpakoito
03/24/2020, 9:57 PMpakoito
03/24/2020, 9:58 PMpakoito
03/24/2020, 9:59 PM.attempt().unsafeRunSync() that’s blocking. We know how to make it non-blocking for our own internal stuff but not when mixed with kotlinx.coroutinespakoito
03/24/2020, 9:59 PMattempt().suspended() insteadpakoito
03/24/2020, 9:59 PMThread.sleep(15000L) doesn’t work for either Arrow or kx.coroutines, as it’s also blockingpakoito
03/24/2020, 10:01 PMIO(IO.dispatchers().io()) { }.unsafeRunSync()pakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMkartoffelsup
03/24/2020, 10:54 PMkartoffelsup
03/25/2020, 6:39 AMTest
Checking for events...
foobar
take
Offer: Event@2cfe9c1a
Event sent.kartoffelsup
03/25/2020, 6:40 AMkartoffelsup
03/25/2020, 6:52 AMpakoito
03/25/2020, 11:30 AMpakoito
03/25/2020, 11:30 AMpakoito
03/25/2020, 11:30 AMsimon.vergauwen
03/25/2020, 11:35 AMsimon.vergauwen
03/26/2020, 2:05 PMKind<F, Queue<F, Event>>, but that’s the factory of the Queue. So you want to store Queue<F, Event> instead which is the actual Queue with its state.
Now you were creating a new Queue before you called take, so you were offering and taking from different `Queue`s.simon.vergauwen
03/26/2020, 2:06 PMsimon.vergauwen
03/26/2020, 2:08 PMprivate val listeners: MutableMap<KClass<*>, MutableList<Queue<F, Event>>> =
Collections.synchronizedMap(mutableMapOf())
and replace it with a Ref. Why are you storing a list of `Queue`s for a given type? Wouldn’t Ref<Map<KClass<*>, Queue<F, Event>> be more desired?kartoffelsup
03/26/2020, 5:14 PMsimon.vergauwen
03/26/2020, 5:17 PMAtomicReference except it has some more powerful operators like update and modify which require looping over compareAndSet if you do it directly over AtomicReference. It has a MonadDefer<F> constraint.simon.vergauwen
03/26/2020, 5:18 PMRef after dinner 😉kartoffelsup
03/26/2020, 5:18 PMsimon.vergauwen
03/26/2020, 5:18 PMsimon.vergauwen
03/26/2020, 6:24 PMIO but a quite nice EventBus with Ref. I was wondering, wouldn’t you want an Unbounded Queue instead of a List<Queue<F, Event>>? That way you would have a single Queue per type, and you could install as many takes as you want.kartoffelsup
03/26/2020, 7:49 PMkartoffelsup
03/26/2020, 8:12 PMkartoffelsup
03/26/2020, 8:14 PMkartoffelsup
03/26/2020, 9:16 PMpakoito
03/26/2020, 10:30 PMfork necessary there?simon.vergauwen
03/27/2020, 9:13 AMYes, an unbounded Queue sounds even better, though I only see a bounded companion function on Queue? How do I get to unbounded?Dropping, sliding and unbounded will be released in 0.10.5 in the following days. I’ll take a look later to see if we can refactor it to use
Unbounded instead 😉simon.vergauwen
03/27/2020, 9:20 AMremind me again, is theThe fork there is sonecessary there?fork
Enqueue can happen in parallel from Dequeue.pakoito
03/27/2020, 10:24 AMsimon.vergauwen
03/27/2020, 10:26 AMflatMap. So there are async jumps but no parallelism.
I should check that piece of code again but raceN could also be interessing there to launch parallel process and wire the cancellation.pakoito
03/27/2020, 10:27 AMsimon.vergauwen
03/27/2020, 10:28 AMflatMap and deliverEvents loops forever, so otherwise it will never reach the effect that sets up Ktor.pakoito
03/27/2020, 10:29 AMsimon.vergauwen
03/27/2020, 10:30 AMdeliveryEvents into a fiber running on the io pool. It’s should be fine to launch on default as well tho, there is no blocking IO going on there.kartoffelsup
03/31/2020, 8:21 PM