Hey Everyone! so is I am getting started with mult...
# flow
a
Hey Everyone! so is I am getting started with multiprocessing and wanted to know if there is something similar to pythons Pool.map() in kotlin? Or an example showing how can we make a process pool and then run an iterable callback in it.
b
flowOf{... Your publisher goes here ...}.map{}
c
@Big Chungus that is sequential, I don't think that's what they asked for. @Ansh Tyagi in Kotlin we tend not to use sub processes much because they're very expensive. Instead, we use threads. Because of its design, Python cannot efficiently use threads. If you want parallelism with threads, you can either go the Java 8 approach with Stream, or the KotlinX.Coroutines approach with Channel. Flow is sequential.
If you really want to use sub processes, it's possible, but it's far from being as clean as these solutions.
b
You can make use of intermediate flow operators like buffer ir shareIn
Scratch that, the only way you can make fliw consumption parallel is to launch a bunch of coroutines in collect {}
r
@Big Chungus wouldn't something like this work? May be a horrible abuse of the system though, lol.
Copy code
myFlow.map { 
   async { methodToRun(it) }
 }
 .buffer()
 .collect {
    it.await()
  }
b
You need to convert it to broadcast channel and then launch multiple coroutines that read from it. I think there's flow.broadcastIn() or similar utility for that
c
@Richard Gomez Nope. Flow is a suspending iterator, it will only execute the map lambda for a value after the previous value has finished the pipeline. You could write this:
Copy code
someFlow
  .toList()
  .map { scope.async { ... } }
  .asFlow()
  .map { it.await() }
That would truly be parallel (depending on the scope of course), however the
toList()
call forces all previous results to be known before the parallel operation starts. With Java Streams (not multiplatform), you can write:
Copy code
someList
  .parallelStream()
  .map { ... }
That will execute in parallel, however it is not coroutine-aware, so it might not be convenient depending on your use case. There used to be a
map
function on Channel (which would be the equivalent to Stream), but I recall it was removed, I don't remember why.
d
I have used this code for a year in production with no problem https://gist.github.com/serpro69/3eccd8f38718d05949da047225e08ece A simple real example : suspend fun flowUserReports(userId: UserId) = flowUserReportRoots(userId) .parallelMap(100, 16) { safe { logTime(log,"readSingle<QuickSightReportRequest.Data>") { UserReportInfoItem(it.dataFile, it.dataFile.parent, getReportRequest(it.dataFile) ) } } }.filterNotNull() .flowOn(FlowUtil.IO)
c
Note that that implementation has a lot of overhead, so only use it if what you're doing in the loop is actually slow enough that it's worth it (measure it!)