Given a flow of ids (Int) that are mapped to creat...
# coroutines
d
Given a flow of ids (Int) that are mapped to create a data class with the record's info (pulled from the cache or db) using a suspend function, is there a way to do the mapping with a concurrency of say 5 without doing flowOf(suspendFunResult) and flatMapMerge... why would I need those extra allocations for this?
o
so what you're describing is:
Copy code
val input: Flow<Int>
val result: Data = computation(input)
and you want to do
computation
in parallel?
d
Something like
flowOf(1,2,3).map { getUserInfo(it) }
and getUserInfo is a suspend function that returns a single user, not a Flow<User>. I want the map to be run in parallel without having to box each user into a flow to use flatMapMerge...
o
Yea, I don't think there's an easy way besides
flatMapMerge
-- Flow is intended to be more of a sequential API at the moment, since parallel is still under design: https://github.com/Kotlin/kotlinx.coroutines/issues/1147
👍 1
d
Thanks for the reference! I guess I'll have to work around this for now then...
I ended up with:
Copy code
findUserIds()
  .map { async(<http://Dispatchers.IO|Dispatchers.IO>) { getUserInfo(it) } }
  .buffer(10)
  .map { it.await() }
Which seems to do the trick pretty nicely. Any better suggestions?
o
you should drop
suspend
and make
async
launch using
GlobalScope.async(<http://Dispatchers.IO|Dispatchers.IO> + Job(coroutineContext[Job]))
-- this methods called here (
map
,
buffer
) aren't actually suspend functions, and the
async
launch should be tied to the parent of whatever starts the flow
👍🏼 1
if you want to limit the concurrency properly, you should remove
buffer
in favor of adding a
Semaphore
around the block (see
ChannelFlowMerge#mergeImpl
internal implementation)
d
Thanks! Do I need the ChannelFlow part? Or maybe since I'm using
async
it would take care of that...?
o
you will still need it, since you don't know if the IO dispatcher has exactly
concurrency
threads -- I think that
buffer
might work, but it might go one over the limit, since it can't tell the buffer is full until after it starts the task
d
I would have just used
flatMapMerge
, but I really need to keep the original order... I'd suppose that my current implementation does that... anyways the db connections are limited, so they'll be the ones to limit concurrency here... I would have felt better with a proper implementation, but this might just be enough for now...
m
I think using GlobalScope is a bad idea because it would make the flow uncancelable
o
Not quite -- note that I included the inherited job from whatever invokes the flow originally
m
Ok, I see the point
look weird at the first look 🙂
o
Yea, I'm trying to think up a better option, because I've wanted this type of operator too
This is what I came up with: https://pl.kotl.in/q_G6Vq0Y5 Uses
flow { coroutineScope {
to establish a new context to run the async tasks in, limits concurrency by using semaphore before submitting a task, then buffers all of the
Deferred
objects (to ensure they are submitted, since flows are cold by default) up to
UNLIMITED
but in practice never larger than
concurrency
. Then finally joins in order to produce the result. In my testing it seemed to be about 200ms slower than it should be, but I chalk that up to setting up the threads + other coroutine machinery. It's a constant startup cost, and in the long run is amortized to zero.
d
When I tried running it instead of my initial code, it seemed to be 3X slower... is that because of the semaphore, or because the concurrency is really being limited...?
o
probably because it's really being limited
d
But you're right, that on the second run it's the same (but then the requests are being served by the cache already... but the results are still similar to the second run of the original code I had). It's just a bit funny, because my db connection pool is at 10 and my concurrency setting is also at 10... 🤔, I guess there's other factors involved here... But thanks for the code simple smile !
d
👍🏼 1