Hey guys how do run flows concurrently then collec...
# coroutines
l
Hey guys how do run flows concurrently then collect the output
Copy code
@Test
    fun `experimental`() {
        runBlocking {
            val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) 
            val songs = listOf("a", "b", "c", "d", "e")
            songs.asFlow().flatMapMerge {
                requestFlow("${Thread.currentThread().name} $it check")
            }
            .collect {
                println("$it")
            }
        }
    }

    fun requestFlow(i: String): Flow<String> = flow {
        emit("$i: First")
        Thread.sleep(1000)
        emit("$i: Second")
    }
main @coroutine#2 a check: First main @coroutine#2 a check: Second main @coroutine#2 b check: First main @coroutine#2 b check: Second main @coroutine#2 c check: First main @coroutine#2 c check: Second main @coroutine#2 d check: First main @coroutine#2 d check: Second main @coroutine#2 e check: First main @coroutine#2 e check: Second
o
provide a parameter to
flatMapMerge
for a concurrency level other than 1
l
Copy code
songs.asFlow().flatMapMerge(16)  makes no difference it already defaults to 16
Copy code
public val DEFAULT_CONCURRENCY = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
    16, 1, Int.MAX_VALUE
)
o
oh, I see
I presume the issue is that you're not using that
scope
then
for some reason I recalled
flatMapMerge
as defaulting to 1
l
when I do delay(1000) it seems to do what I want
o
what you probably want is to do
flow { ... }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
l
but doesn't work with thread sleep
o
otherwise you're executing in `runBlocking`'s dispatcher
l
thank you that works
j
@Luis Munoz Would you mind posting the final working version of your code?
l
Copy code
@Test
fun `experimental`() {
    runBlocking {
        val songs = listOf("a", "b", "c", "d", "e")
        songs.asFlow().flatMapMerge(16) {
            requestFlow(" $it check").flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        }.collect {
            println("$it")
        }
    }
}

fun requestFlow(i: String): Flow<String> = flow {
    emit("${Thread.currentThread().name} $i: First")
    Thread.sleep(1000)
    emit("${Thread.currentThread().name} $i: Second")
}
j
Thanks @Luis Munoz!
z
Any reason you're using
sleep
instead of
delay
? Unless you're actually trying to simulate IO that is almost always wrong.
l
exactly wanted to simulate a blocking IO call