dave08
08/25/2019, 1:42 AMoctylFractal
08/25/2019, 1:46 AMval input: Flow<Int>
val result: Data = computation(input)
and you want to do computation
in parallel?dave08
08/25/2019, 2:15 AMflowOf(1,2,3).map { getUserInfo(it) }
and getUserInfo is a suspend function that returns a single user, not a Flow<User>. I want the map to be run in parallel without having to box each user into a flow to use flatMapMerge...octylFractal
08/25/2019, 2:19 AMflatMapMerge
-- Flow is intended to be more of a sequential API at the moment, since parallel is still under design: https://github.com/Kotlin/kotlinx.coroutines/issues/1147dave08
08/25/2019, 5:36 AMfindUserIds()
.map { async(<http://Dispatchers.IO|Dispatchers.IO>) { getUserInfo(it) } }
.buffer(10)
.map { it.await() }
Which seems to do the trick pretty nicely. Any better suggestions?suspend fun <T> Flow<T>.mapParallel(concurrency: Int, transform: suspend CoroutineScope.() -> T): Flow<T> = coroutineScope {
map { async(<http://Dispatchers.IO|Dispatchers.IO>, block = transform) }
.buffer(concurrency)
.map { it.await() }
}
?octylFractal
08/25/2019, 7:40 AMsuspend
and make async
launch using GlobalScope.async(<http://Dispatchers.IO|Dispatchers.IO> + Job(coroutineContext[Job]))
-- this methods called here (map
, buffer
) aren't actually suspend functions, and the async
launch should be tied to the parent of whatever starts the flowbuffer
in favor of adding a Semaphore
around the block (see ChannelFlowMerge#mergeImpl
internal implementation)dave08
08/25/2019, 7:45 AMasync
it would take care of that...?octylFractal
08/25/2019, 7:48 AMconcurrency
threads -- I think that buffer
might work, but it might go one over the limit, since it can't tell the buffer is full until after it starts the taskdave08
08/25/2019, 7:50 AMflatMapMerge
, but I really need to keep the original order... I'd suppose that my current implementation does that... anyways the db connections are limited, so they'll be the ones to limit concurrency here...
I would have felt better with a proper implementation, but this might just be enough for now...Marko Mitic
08/25/2019, 8:16 AMoctylFractal
08/25/2019, 8:17 AMMarko Mitic
08/25/2019, 8:20 AMoctylFractal
08/25/2019, 8:21 AMflow { coroutineScope {
to establish a new context to run the async tasks in, limits concurrency by using semaphore before submitting a task, then buffers all of the Deferred
objects (to ensure they are submitted, since flows are cold by default) up to UNLIMITED
but in practice never larger than concurrency
. Then finally joins in order to produce the result. In my testing it seemed to be about 200ms slower than it should be, but I chalk that up to setting up the threads + other coroutine machinery. It's a constant startup cost, and in the long run is amortized to zero.dave08
08/25/2019, 9:42 AMoctylFractal
08/25/2019, 9:43 AMdave08
08/25/2019, 9:46 AMDominaezzz
08/25/2019, 10:09 AM