Hello :wave: I am trying to use fibers to concurre...
# arrow
s
Hello 👋 I am trying to use fibers to concurrently load data chunks but I do not fully understand how it works with IO when I fork/join/bind Fiber. My IO continue to run on another thread (from worker pool) but initially started on ‘http-io’. I want to fire some IO heavy operations earlier, then fire a few more and start awaiting. I have an IO.fx block where I create different IO and then fork them. I want to run them concurrently on dispatchers().io() and so far they are running ok. But I notice by console-log-debugging that after first joining fiber IO is running on another thread. Are there any options or probably another class for this use case?
the code is something like this:
Copy code
fun foo() = IO.fx{
 val fiberOne = IO{}.fork(dispatchers().io()).bind()
 val fiberTwo = IO{}.fork(dispatchers().io()).bind()
 val res1 = fiberOne.join().bind()
 val res2 = fiberTwo.join().bind()
res1+res2
}
s
So
res1 + res2
doesn’t run on
IO
pool, and you’d want it to run there?
s
After first join it will be moved from http thread to some worker thread Yeah, I would like the result to be returned on the therad it started the IO (http in my case)
join docs says it creates IO that will await fiber
But I do not understand it fully tbh
s
Okay, so long story short
fork
launches the
IO
you call it on. It does so inside
IORunLoop
given a certain
CoroutineContext
. The result is memoized, and that is done using an internal
UnsafePromise
. Once it completes, the
UnsafePromise
will cache it. Awaiting the
UnsafePromise
is no different than awaiting a callback with
IO.async
. That brings us to the threading,
join
happens on the caller thread. So if
IO.fx
is running on a worker pool (
IO.dispatchers().default()
), and your two fibers are running on
dispatchers().io()
. Then the two fibers will run on IO Pool, and the joined results will be on worker pool.
🙏 1
👍 1
I hope that clears it up a bit
s
I created a small test case for this: the output:
Copy code
Thread: Test worker
Thread: Test worker
Thread: Test worker
Thread: io-arrow-kt-worker-0
Thread: io-arrow-kt-worker-0
So always when I join the fiber, the IO will continue on fiber’s thread?
s
Oh, so I was mistaken. I confused myself with the new Arrow Fx Coroutines impl which offers a new
IO
API in
suspend
style. Which also works in the way you expect it here to work. Sorry about that.
You can use
continueOn
to switch to any desired context in an
IO.fx
block.
Currently Arrow Fx Coroutines is being polished for release, and Arrow will promote it over
IO
. This’ll happen in over the summer 🙂
👍 1
s
Thanks, I will check it! Yeah, contiunueOn would be awesome but I need to come back to the exact thread I started from and it’s not possible (afaik) Actually, it’s because the APM library dependency that use ThreadLocal 😢