https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

nwh

12/28/2019, 3:19 AM
Can I (and should I) emit to a Flow from multiple threads? For example, mapping a list of objects and making an HTTP request using the IO Dispatcher, and emitting the result one by one.
Copy code
ids.map { id ->
	async(<http://Dispatchers.IO|Dispatchers.IO>) {
		api.getMember(id) // I want to emit this
	}
}
Or is there a better-suited construct for this, like a channel?
o

octylFractal

12/28/2019, 3:21 AM
I'm not sure where the Flow factors into this? I do think that FlowCollector.emit isn't thread safe, but you can probably batch up the Deferred results and then collect them in a single thread/coroutine for emission, e.g.
Copy code
flow.map { async { doSomethingToMakeList(it) } }
  .buffer(8)
  .map { it.await() }
  .flatMap { it.asFlow() }
although I suppose
.flatMap { it.await().asFlow() }
is slightly smoother
if
getMember
returns a Flow itself, this won't work, and you should probably use something like
flattenMerge
instead (read the docs on that first) + you won't need
async
n

nwh

12/28/2019, 3:28 AM
getMember
doesn't return a Flow, it's a suspending function that returns a data object. What's your
flow
object in your first response?
o

octylFractal

12/28/2019, 3:30 AM
it's just a Flow<T>, of whatever you want it to be. If you're mapping a list, I would just call
asFlow()
on it, then you can do the map-async-buffer-map like above
👍🏻 1
(you don't need the flatMap asFlow if it's not returning multiple values)
n

nwh

12/28/2019, 3:36 AM
I see. I'm still a bit confused on what is "better," because there's also the
produce
strategy:
Copy code
private suspend fun loadAll0(ids: List<String>) = coroutineScope {
		produce {
			ids.forEach { id ->
				async(<http://Dispatchers.IO|Dispatchers.IO>) {
					send(api.getMember(id))
				}
			}
		}
	}
o

octylFractal

12/28/2019, 3:38 AM
yes, you can do that too.
asFlow()
just generates a "cold" stream instead, which won't start until a terminal operation like
collect
is called. If you want a "hot" stream, you can use
produce
+
consumeAsFlow()
iirc
n

nwh

12/28/2019, 3:39 AM
Yeah, the other main advantage of Flow is the powerful extension methods and such that it offers. It's very tempting to try and use it wherever possible
o

octylFractal

12/28/2019, 3:39 AM
the benefit of the map-async-buffer-map, is that it prevents too many tasks from running at once
n

nwh

12/28/2019, 3:40 AM
Ah, that is a good point
o

octylFractal

12/28/2019, 3:41 AM
right now you're launching a task for every id at once, which if you have >64/128 (don't recall exactly), you may fill the IO dispatcher threads. the
buffer
prevents that by only allowing the flow to have that many + 1 tasks running
b

bdawg.io

12/28/2019, 12:40 PM
emit should throw an exception if you change contexts.
channelFlow
might be what you're looking for (being able to emit from multiple jobs across different threads)
If you do have more than >64 jobs filling up the IO threads, the other ones will naturally support back-pressure for free and just suspend until a thread becomes available
3 Views