Can I (and should I) emit to a Flow from multiple ...
# coroutines
n
Can I (and should I) emit to a Flow from multiple threads? For example, mapping a list of objects and making an HTTP request using the IO Dispatcher, and emitting the result one by one.
Copy code
ids.map { id ->
	async(<http://Dispatchers.IO|Dispatchers.IO>) {
		api.getMember(id) // I want to emit this
	}
}
Or is there a better-suited construct for this, like a channel?
o
I'm not sure where the Flow factors into this? I do think that FlowCollector.emit isn't thread safe, but you can probably batch up the Deferred results and then collect them in a single thread/coroutine for emission, e.g.
Copy code
flow.map { async { doSomethingToMakeList(it) } }
  .buffer(8)
  .map { it.await() }
  .flatMap { it.asFlow() }
although I suppose
.flatMap { it.await().asFlow() }
is slightly smoother
if
getMember
returns a Flow itself, this won't work, and you should probably use something like
flattenMerge
instead (read the docs on that first) + you won't need
async
n
getMember
doesn't return a Flow, it's a suspending function that returns a data object. What's your
flow
object in your first response?
o
it's just a Flow<T>, of whatever you want it to be. If you're mapping a list, I would just call
asFlow()
on it, then you can do the map-async-buffer-map like above
👍🏻 1
(you don't need the flatMap asFlow if it's not returning multiple values)
n
I see. I'm still a bit confused on what is "better," because there's also the
produce
strategy:
Copy code
private suspend fun loadAll0(ids: List<String>) = coroutineScope {
		produce {
			ids.forEach { id ->
				async(<http://Dispatchers.IO|Dispatchers.IO>) {
					send(api.getMember(id))
				}
			}
		}
	}
o
yes, you can do that too.
asFlow()
just generates a "cold" stream instead, which won't start until a terminal operation like
collect
is called. If you want a "hot" stream, you can use
produce
+
consumeAsFlow()
iirc
n
Yeah, the other main advantage of Flow is the powerful extension methods and such that it offers. It's very tempting to try and use it wherever possible
o
the benefit of the map-async-buffer-map, is that it prevents too many tasks from running at once
n
Ah, that is a good point
o
right now you're launching a task for every id at once, which if you have >64/128 (don't recall exactly), you may fill the IO dispatcher threads. the
buffer
prevents that by only allowing the flow to have that many + 1 tasks running
b
emit should throw an exception if you change contexts.
channelFlow
might be what you're looking for (being able to emit from multiple jobs across different threads)
If you do have more than >64 jobs filling up the IO threads, the other ones will naturally support back-pressure for free and just suspend until a thread becomes available