Does anyone know the best practice way to do a fla...
# coroutines
c
Does anyone know the best practice way to do a flatmapMerge on a Flow that runs every map in parallel ? It seems to be sequential by default.
z
It shouldn’t be sequential.
flatMapMerge
takes a
concurrency
parameter that defines how many upstream Flows may be collected simultaneously: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
c
Then I don’t grasp what they mean with
Copy code
This operator calls transform sequentially
My code maps a list of ip address to a rest result:
Copy code
return generateAddressesInSubnet(preferredInterfaceAddress) //returns a list of inetAddresses
            .asFlow()
            .flatMapMerge() {
                val url = "http://${it.hostAddress}/status"
                try {
                    logger.debug { "Connecting to $url" }
                    val apiResponse = discoveryApiService.fetchStatus(url)
                    flowOf(LanDiscoveryResult(it, apiResponse))
                } catch (ex: Exception) {
                    emptyFlow<LanDiscoveryResult>()
                }
            }
These rest calls should run in parallel with that code right ?
o
it calls the transform sequentially, but collects the flows concurrently. you should put the work inside of the flow
☝️ 1
c
Thanks @Zach Klippenstein (he/him) [MOD] & @octylFractal ! You guys are absolutely right. It works flawlessly include the concurrency parameter. My understanding of coroutines and flow seems to be lacking the fundamentals, I’ll check out the basics again first. If you have any favourite learning resources, feel free to send a link :)
z
This sort of mistake is pretty common and not specific to flow/kotlin. I’ve seen a lot of rxjava code that looks like this:
Copy code
Observable.just(expensiveFunction())
When the author really wanted this:
Copy code
Observable.fromCallable { expensiveFunction() }