Hey <@U93088VNX>, any specific use-case you’re loo...
# arrow
s
Hey @kartoffelsup, any specific use-case you’re looking for?
Queue
works just like any other non-blocking
Queue
you might be familiar with. You can split concerns into
Enqueue
and
Dequeue
if you want, to split the concern into who is producing events and who is consuming them. They all work on a 1-offer to 1-taker basis tho, since it removes from the
Queue
unless you use the
peek
operators. So if you consume an event with
take
than it’s also removed from the
Queue
, so this work similarly to
java.util.concurrent.Queue
or
scala.concurrent.Queue
but with a functional non-blocking api in
F
with full support for cancelation etc. We’ll be writing on some additional docs on
Queue
and the other new stuff soon, so if there are any specific use-cases you’re trying to solve you cannot solve with the current feature set in 0.10.5 I’d happy to look into them and perhaps include them in some form of documentation.
k
Thank you for the explanation :) So if I want an event bus for multiple event types I would need one queue per event type. Mmh, maybe I need to stick to Guava's Event bus then :(
p
If you have a global event bus you can use sealed classes and then dispatch to the correct child bus for example.
or even without sealed classes with a
Map<Class<*>, List<Queue>>
k
Thanks Paco, I'll try that 🙂
Is .take() blocking from fx.Queue? Should it be returning the first event when found?
p
listen will only ever get one result, methinks
you need something that’s traversable
or you need to write a function using IO.tailrecM that calls
take()
to get results
IO is supposed to be stack safe too, so it can be a recursive function too
so something like
Copy code
fun bla() = IO.fx {
  val result = !eventBus.listen(Request::class)
  !effect { somethingWith(result) }
  if (!eventBus.globalStop()) {
    !bla()
  }
}
k
Thanks, the issue is though that it never gets past that listen call(it gets stuck there) and my deliverEvents is already recursive. :(
p
oh I missed that
lemme double check
right
.attempt().unsafeRunSync()
that’s blocking. We know how to make it non-blocking for our own internal stuff but not when mixed with kotlinx.coroutines
use
attempt().suspended()
instead
Thread.sleep(15000L)
doesn’t work for either Arrow or kx.coroutines, as it’s also blocking
if you need for the main not to immediately finish, don’t use kx.coroutines, use
IO(IO.dispatchers().io()) { }.unsafeRunSync()
that way you’re blocking reentrantly
and you’re still on a background thread
with those tips, try to go back to it, and tell me what’s the last text message you see
I believe we can debug it that way 😄
k
Great thanks, will try this tomorrow :)
Hi Paco, thanks again for helping 🙂 I've updated the gist: https://gist.github.com/kartoffelsup/c3e3b509d2645b41bae4abab0740b42b unfortunately it doesn't get past 'take':
Copy code
Test
Checking for events...
foobar
take
Offer: Event@2cfe9c1a
Event sent.
Maybe my EventBus#listen implementation is the issue?
IntelliJ is struggling hard with this 😄
p
increase the bounds a bit
the code looks correct
@simon.vergauwen any idea?
s
Set a reminder to look into tomorrow.
❤️ 1
Hey @kartoffelsup, I took a look, and there was only a small mistake hidden in there 🙂 It works fine now. You are storing
Kind<F, Queue<F, Event>>
, but that’s the factory of the
Queue
. So you want to store
Queue<F, Event>
instead which is the actual
Queue
with its state. Now you were creating a new
Queue
before you called
take
, so you were
offering
and
taking
from different `Queue`s.
My apologies, I also removed the Guava reference and I’m using Arrow Fx’s Fibers instead 😉 I also removed KotlinX delay calls and replaced it with IO.sleep 😁
I think we could get rid of
Copy code
private val listeners: MutableMap<KClass<*>, MutableList<Queue<F, Event>>> =
  Collections.synchronizedMap(mutableMapOf())
and replace it with a
Ref
. Why are you storing a list of `Queue`s for a given type? Wouldn’t
Ref<Map<KClass<*>, Queue<F, Event>>
be more desired?
k
Hi @simon.vergauwen, amazing! Thank you so much 🙂 I'm using a List in case I need more than one Listener for the same Event. Frankly, I do not know What Ref does, what is its advantage?
s
Ref is like
AtomicReference
except it has some more powerful operators like
update
and
modify
which require looping over
compareAndSet
if you do it directly over
AtomicReference
. It has a
MonadDefer<F>
constraint.
I’ll try to whip up an example with
Ref
after dinner 😉
k
Awesome, thank you 🙂 And enjoy your dinner!
s
Thanks!
Here is a version with a bit more
IO
but a quite nice
EventBus
with
Ref
. I was wondering, wouldn’t you want an
Unbounded Queue
instead of a
List<Queue<F, Event>>
? That way you would have a single
Queue
per type, and you could install as many
takes
as you want.
k
Brilliant, thank you! That is exactly what I need :)
🙌 1
Nvm, I'm blind :)
Yes, an unbounded Queue sounds even better, though I only see a bounded companion function on Queue? How do I get to unbounded?
Here is where I use it, looks a bit more hacky in my app now because Producer/Consumer aren't all in the main but I'm very glad it's not guava's EventBus 🙂 https://github.com/kartoffelsup/nuntius/blob/master/application/src/main/kotlin/io/github/kartoffelsup/nuntius/Application.kt#L59-L72
😍 2
p
remind me again, is the
fork
necessary there?
s
Yes, an unbounded Queue sounds even better, though I only see a bounded companion function on Queue? How do I get to unbounded?
Dropping, sliding and unbounded will be released in 0.10.5 in the following days. I’ll take a look later to see if we can refactor it to use
Unbounded
instead 😉
💯 1
remind me again, is the
fork
necessary there?
The fork there is so
Enqueue
can happen in parallel from
Dequeue
.
p
if they’re on the same Dispatcher, right? It’s not necessary if they’re on different ones
s
No, since it’s all sequential with
flatMap
. So there are async jumps but no parallelism. I should check that piece of code again but
raceN
could also be interessing there to launch parallel process and wire the cancellation.
p
No as in, it’s still necessary even on different dispatchers?
s
Yes, it’s sequential with
flatMap
and
deliverEvents
loops forever, so otherwise it will never reach the
effect
that sets up
Ktor
.
p
okok
s
It just launches
deliveryEvents
into a fiber running on the
io
pool. It’s should be fine to launch on
default
as well tho, there is no blocking IO going on there.
k
@simon.vergauwen is what I'm doing an anti pattern? Mixing coroutines with arrow fx?