Hey folks I am trying to parallelize the processi...
# coroutines
j
Hey folks I am trying to parallelize the processing of a flow. I am doing something like:
Copy code
private suspend fun process() = coroutineScope {    
    getFlow()
        .map { async(<http://Dispatchers.IO|Dispatchers.IO>) { ioBlockingFunction(it) } }
        .buffer(20)
        .map { it.await() }
        .collect()
}
my expectations is that 20 blocking operations will be trigger at time, then they will be awaited and then next 20 blocking operations will be trigger. What I am noticing is that somehow flow stop producing values and my app got block at flow element 25 of 30, or 100 (at a random point), even when flow function has more records to produce
s
It seems like your code does do what you want it to, it handles them 20 at a time in parallel https://pl.kotl.in/O1k_orbWI The only explanation I have for what you’re experiencing is that your
ioBlockingFunction()
somehow is never finishing? And it makes all those 20 parallel actions wait forever?
j
that was exactly the case @Stylianos Gakis thanks for your help
some random records where never finishing to be process due errors and processing pipeline got stuck
I am investigating those and adding withTimeout to avoid same situation in the future