Hi. I’m starting to learn about coroutines and flo...
# coroutines
j
Hi. I’m starting to learn about coroutines and flows, and I’m wondering how one might do the equivalent of reactor’s
Flux.flatMapSequential()
, i.e. a flattenMerge(), but which would emit the values in the initial order of the merged flows. Any idea? I find such an operator quite useful, for example when you want to send HTTP requests in parallel, but with a limited concurrency, and still want to process the responses as if they had been sent sequentially.
o
basically, I have a Channel to hold Channels made from the result Flows (this allows us to start the flows and hold on to their elements) and then I take the Channels from that and emit every element in order. the channels close when their respective flow does, allowing the next channel to start sending its elements
as it exists now, it uses
Channel.UNLIMITED
capacity, meaning that you could potentially have a lot of memory build up. you could change it to take a capacity parameter or two, depending on if you want to cap the number of active flows, or the number of items held in the backlog
j
Thanks a lot. I’m not able to process everything yet, but I think I get the idea. And it works as expected. What is missing is the concurrency level though, which allows specifying how many of the inner flows can be collected concurrently. That would be important in my hypothetical usecase to make sure I’m not spamming the server with too many concurrent requests.
Is there a chance such an operator becomes standard?
b
Have you tried
flow.flatMapConcat { it }
?
o
the issue with
flatMapConcat
in this context, is that if the
Flow
is cold, it won't run things in parallel
re: operator becoming standard, I can't really speak to what the Kotlin Coroutines team thinks is best, I know they've previously talked about keeping the core API slim