How can be implemented such use-case with `Flow` <...
# coroutines
u
How can be implemented such use-case with
Flow
https://pl.kotl.in/nEwZYT3Kl (I have implementation with recursive
launch
, but I think it could be more elegant with
Flow
, but don't know how) I want launch limited number of requests and as some request are completed, get the following from the queue. For instance, we have queue/list of 100 URLs. But we want limit number of simultaneously executed requests by 5. Not 20 butches of 5 requests, but first butch with 5 request and next, when some request is ready. How can we process result of each request using
collect
?
d
I've implemented something like this but I want to confirm your use case.
You want to process the queue in batches and the next batch only gets processed after the first one finishes?
Or you want to process all elements in the queue in order but limit the number on in-flight (concurrent) processes?
u
@Dominaezzz I want to process all elements in the queue in order but limit the number on in-flight (concurrent) processes
d
Flow doesn't have this yet but here's something I made. https://github.com/Kotlin/kotlinx.coroutines/issues/1147#issuecomment-489153085
u
I expect that
buffer
with
capacity
parameter should do the job, but it don't work as expected (also w/o
capacity
parameter). Just try example from official guide, but with other delays (I expect it should be done in about 110 seconds, but it takes >1000s):
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(1) // pretend we are processing it for 1 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
https://pl.kotl.in/qm9dV4CRv
d
That's because you
delay(100)
10 times.
Flows are sequential.
u
@Dominaezzz check guide https://github.com/Kotlin/kotlinx.coroutines/blob/1.3.0/docs/flow.md#buffering
It produces the same numbers faster, as we have effectively created a processing pipeline, only having to wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run
That's because you
delay(100)
10 times.
With
buffer()
these 10 delays should be done concurrently.
d
buffer
allows the previous operators to run concurrent to the next operators.
It doesn't make the flow concurrent.
u
@Dominaezzz but why following code (from official guide) takes around 1000ms?
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
d
The first flow executes in 300ms. The second does so in 900ms + 100ms. They overlap because of the
buffer
. I'm not good at explaining this sorta thing.
u
@Dominaezzz Ah, ok, now I understand. Thank you.