dave08
08/25/2019, 1:42 AMoctylFractal
08/25/2019, 1:46 AMval input: Flow<Int>
val result: Data = computation(input)
and you want to do computation in parallel?dave08
08/25/2019, 2:15 AMflowOf(1,2,3).map { getUserInfo(it) } and getUserInfo is a suspend function that returns a single user, not a Flow<User>. I want the map to be run in parallel without having to box each user into a flow to use flatMapMerge...octylFractal
08/25/2019, 2:19 AMflatMapMerge -- Flow is intended to be more of a sequential API at the moment, since parallel is still under design: https://github.com/Kotlin/kotlinx.coroutines/issues/1147dave08
08/25/2019, 5:36 AMdave08
08/25/2019, 7:24 AMfindUserIds()
.map { async(<http://Dispatchers.IO|Dispatchers.IO>) { getUserInfo(it) } }
.buffer(10)
.map { it.await() }
Which seems to do the trick pretty nicely. Any better suggestions?octylFractal
08/25/2019, 7:40 AMsuspend and make async launch using GlobalScope.async(<http://Dispatchers.IO|Dispatchers.IO> + Job(coroutineContext[Job])) -- this methods called here (map, buffer) aren't actually suspend functions, and the async launch should be tied to the parent of whatever starts the flowoctylFractal
08/25/2019, 7:42 AMbuffer in favor of adding a Semaphore around the block (see ChannelFlowMerge#mergeImpl internal implementation)dave08
08/25/2019, 7:45 AMasync it would take care of that...?octylFractal
08/25/2019, 7:48 AMconcurrency threads -- I think that buffer might work, but it might go one over the limit, since it can't tell the buffer is full until after it starts the taskdave08
08/25/2019, 7:50 AMflatMapMerge, but I really need to keep the original order... I'd suppose that my current implementation does that... anyways the db connections are limited, so they'll be the ones to limit concurrency here...
I would have felt better with a proper implementation, but this might just be enough for now...Marko Mitic
08/25/2019, 8:16 AMoctylFractal
08/25/2019, 8:17 AMMarko Mitic
08/25/2019, 8:20 AMMarko Mitic
08/25/2019, 8:21 AMoctylFractal
08/25/2019, 8:21 AMoctylFractal
08/25/2019, 8:58 AMflow { coroutineScope { to establish a new context to run the async tasks in, limits concurrency by using semaphore before submitting a task, then buffers all of the Deferred objects (to ensure they are submitted, since flows are cold by default) up to UNLIMITED but in practice never larger than concurrency. Then finally joins in order to produce the result. In my testing it seemed to be about 200ms slower than it should be, but I chalk that up to setting up the threads + other coroutine machinery. It's a constant startup cost, and in the long run is amortized to zero.dave08
08/25/2019, 9:42 AMoctylFractal
08/25/2019, 9:43 AMdave08
08/25/2019, 9:46 AMDominaezzz
08/25/2019, 10:09 AM