U75957
08/23/2019, 6:53 PMFlow
https://pl.kotl.in/nEwZYT3Kl (I have implementation with recursive launch
, but I think it could be more elegant with Flow
, but don't know how)
I want launch limited number of requests and as some request are completed, get the following from the queue.
For instance, we have queue/list of 100 URLs. But we want limit number of simultaneously executed requests by 5. Not 20 butches of 5 requests, but first butch with 5 request and next, when some request is ready. How can we process result of each request using collect
?Dominaezzz
08/23/2019, 8:12 PMU75957
08/24/2019, 5:20 AMDominaezzz
08/24/2019, 10:25 AMU75957
08/24/2019, 12:17 PMbuffer
with capacity
parameter should do the job, but it don't work as expected (also w/o capacity
parameter).
Just try example from official guide, but with other delays (I expect it should be done in about 110 seconds, but it takes >1000s):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..10) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(1) // pretend we are processing it for 1 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
https://pl.kotl.in/qm9dV4CRvDominaezzz
08/24/2019, 1:03 PMdelay(100)
10 times.U75957
08/24/2019, 1:28 PMIt produces the same numbers faster, as we have effectively created a processing pipeline, only having to wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run
That's because youWith10 times.delay(100)
buffer()
these 10 delays should be done concurrently.Dominaezzz
08/24/2019, 1:34 PMbuffer
allows the previous operators to run concurrent to the next operators.U75957
08/24/2019, 1:38 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
Dominaezzz
08/24/2019, 5:15 PMbuffer
. I'm not good at explaining this sorta thing.U75957
08/25/2019, 6:00 AM