https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

Dean Djermanović

07/21/2022, 1:22 PM
Is there a way to collect each item in a separate coroutine in Kotlin Flow? For example, if I have this source:
Copy code
val source = flow {
        emit(1)
        delay(200)
        emit(2)
        delay(200)
        emit(3)
    }

source
    .onEach { delay(500) }
    .collect {
        ...
    }
I want my items to be collected after:
Copy code
1 - 500ms
2 - 700ms
3 - 900ms
j

Joseph Hawkes-Cates

07/21/2022, 4:29 PM
I’m not sure if this is what you are looking for but the following should get the timing that you list in your example:
Copy code
source
     .onStart{ delay(500) }
     .collect {
         ...
     }
n

Nick Allen

07/21/2022, 4:33 PM
If you read the
Flow
interface docs, it guarantees that items are collected sequentially, so it's not possible in the way you are thinking. However you can
launch
from
collect
Copy code
coroutineScope {
    source.collect {
        launch {
            delay(500)
            //Move original collect code here
        }
    }
You could potentially leverage something like
flattenMerge
which has optional param to limit concurrency. One
Flow
must emit items sequentially but if you turn it into many `Flow`s then the restriction doesn't apply.
Copy code
source.flattenMerge {
    flow {
        delay(500)
        //Move original collect code here
        //Or could emit here so the collect below gets items and that part is sequential.
    }
}
    .collect()
d

Dean Djermanović

07/25/2022, 6:28 AM
Thanks @Nick Allen
3 Views