https://kotlinlang.org logo
Title
d

dave08

08/25/2019, 1:42 AM
Given a flow of ids (Int) that are mapped to create a data class with the record's info (pulled from the cache or db) using a suspend function, is there a way to do the mapping with a concurrency of say 5 without doing flowOf(suspendFunResult) and flatMapMerge... why would I need those extra allocations for this?
o

octylFractal

08/25/2019, 1:46 AM
so what you're describing is:
val input: Flow<Int>
val result: Data = computation(input)
and you want to do
computation
in parallel?
d

dave08

08/25/2019, 2:15 AM
Something like
flowOf(1,2,3).map { getUserInfo(it) }
and getUserInfo is a suspend function that returns a single user, not a Flow<User>. I want the map to be run in parallel without having to box each user into a flow to use flatMapMerge...
o

octylFractal

08/25/2019, 2:19 AM
Yea, I don't think there's an easy way besides
flatMapMerge
-- Flow is intended to be more of a sequential API at the moment, since parallel is still under design: https://github.com/Kotlin/kotlinx.coroutines/issues/1147
👍 1
d

dave08

08/25/2019, 5:36 AM
Thanks for the reference! I guess I'll have to work around this for now then...
I ended up with:
findUserIds()
  .map { async(<http://Dispatchers.IO|Dispatchers.IO>) { getUserInfo(it) } }
  .buffer(10)
  .map { it.await() }
Which seems to do the trick pretty nicely. Any better suggestions?
I don't think this looks right... is there a better way to make this reusable?
suspend fun <T> Flow<T>.mapParallel(concurrency: Int, transform: suspend CoroutineScope.() -> T): Flow<T> = coroutineScope {
	map { async(<http://Dispatchers.IO|Dispatchers.IO>, block = transform) }
		.buffer(concurrency)
		.map { it.await() }
}
?
o

octylFractal

08/25/2019, 7:40 AM
you should drop
suspend
and make
async
launch using
GlobalScope.async(<http://Dispatchers.IO|Dispatchers.IO> + Job(coroutineContext[Job]))
-- this methods called here (
map
,
buffer
) aren't actually suspend functions, and the
async
launch should be tied to the parent of whatever starts the flow
👍🏼 1
if you want to limit the concurrency properly, you should remove
buffer
in favor of adding a
Semaphore
around the block (see
ChannelFlowMerge#mergeImpl
internal implementation)
d

dave08

08/25/2019, 7:45 AM
Thanks! Do I need the ChannelFlow part? Or maybe since I'm using
async
it would take care of that...?
o

octylFractal

08/25/2019, 7:48 AM
you will still need it, since you don't know if the IO dispatcher has exactly
concurrency
threads -- I think that
buffer
might work, but it might go one over the limit, since it can't tell the buffer is full until after it starts the task
d

dave08

08/25/2019, 7:50 AM
I would have just used
flatMapMerge
, but I really need to keep the original order... I'd suppose that my current implementation does that... anyways the db connections are limited, so they'll be the ones to limit concurrency here... I would have felt better with a proper implementation, but this might just be enough for now...
m

Marko Mitic

08/25/2019, 8:16 AM
I think using GlobalScope is a bad idea because it would make the flow uncancelable
o

octylFractal

08/25/2019, 8:17 AM
Not quite -- note that I included the inherited job from whatever invokes the flow originally
m

Marko Mitic

08/25/2019, 8:20 AM
Ok, I see the point
look weird at the first look 🙂
o

octylFractal

08/25/2019, 8:21 AM
Yea, I'm trying to think up a better option, because I've wanted this type of operator too
This is what I came up with: https://pl.kotl.in/q_G6Vq0Y5 Uses
flow { coroutineScope {
to establish a new context to run the async tasks in, limits concurrency by using semaphore before submitting a task, then buffers all of the
Deferred
objects (to ensure they are submitted, since flows are cold by default) up to
UNLIMITED
but in practice never larger than
concurrency
. Then finally joins in order to produce the result. In my testing it seemed to be about 200ms slower than it should be, but I chalk that up to setting up the threads + other coroutine machinery. It's a constant startup cost, and in the long run is amortized to zero.
d

dave08

08/25/2019, 9:42 AM
When I tried running it instead of my initial code, it seemed to be 3X slower... is that because of the semaphore, or because the concurrency is really being limited...?
o

octylFractal

08/25/2019, 9:43 AM
probably because it's really being limited
d

dave08

08/25/2019, 9:46 AM
But you're right, that on the second run it's the same (but then the requests are being served by the cache already... but the results are still similar to the second run of the original code I had). It's just a bit funny, because my db connection pool is at 10 and my concurrency setting is also at 10... 🤔, I guess there's other factors involved here... But thanks for the code 😒imple_smile: !
d

Dominaezzz

08/25/2019, 10:09 AM
Here's a cool implementation you can borrow. 🙂 https://github.com/Kotlin/kotlinx.coroutines/issues/1147#issuecomment-489153085
👍🏼 1