Mattias Flodin
01/13/2021, 3:53 PMsuspend fun <T> collectFromHere(flow: SharedFlow<T>): Flow<T> {
val channel = Channel<T>()
flow.collect { channel.send(it) }
return object: Flow<T> {
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
for (v in channel) {
collector.emit(v)
}
}
}
}
Marc Knaup
01/13/2021, 5:01 PMFlow
. Instead use the flow { … }
builder.return channel.consumeAsFlow()
Mattias Flodin
01/14/2021, 7:30 AMMarc Knaup
01/14/2021, 10:08 AMMattias Flodin
01/14/2021, 10:47 AMMarc Knaup
01/14/2021, 10:50 AMcollect
never returns until the Flow is complete, so your channel's Flow is never returned.Mattias Flodin
01/14/2021, 10:51 AMMarc Knaup
01/14/2021, 10:52 AMMattias Flodin
01/14/2021, 10:57 AMcollectFromHereAsync(changedIdsFlow.take(1))
so the flow will be returned as soon as there is an event available. I suppose there's really no point in returning a flow at all though, I should just use first() and return the event right away.Marc Knaup
01/14/2021, 10:58 AMMattias Flodin
01/14/2021, 11:00 AMMarc Knaup
01/14/2021, 11:00 AMMattias Flodin
01/14/2021, 11:05 AMMarc Knaup
01/14/2021, 11:06 AMMattias Flodin
01/14/2021, 11:06 AMMarc Knaup
01/14/2021, 11:08 AM.dropWhile { it < X }.onEach { println(it) }.launchIn(scope)
?.shareIn
Mattias Flodin
01/14/2021, 11:15 AMMarc Knaup
01/14/2021, 11:17 AMlaunchIn
is only useful with onEach
Mattias Flodin
01/14/2021, 11:19 AMMarc Knaup
01/14/2021, 11:20 AMMattias Flodin
01/14/2021, 11:23 AMMarc Knaup
01/14/2021, 11:26 AMchangedIdsFlow
when the collector is too slow and there’s backpressure?changedIdsFlow
do the needed buffering?Mattias Flodin
01/14/2021, 11:28 AMMarc Knaup
01/14/2021, 11:33 AMMattias Flodin
01/14/2021, 11:35 AMMarc Knaup
01/14/2021, 11:45 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> =
Channel<T>(capacity)
.also { channel -> onEach(channel::send).launchIn(scope) }
.consumeAsFlow()
suspend fun main() {
val events = flow {
repeat(10) {
println("emit $it")
emit(it)
delay(1000)
}
}
coroutineScope {
val flow = events
.bufferIn(this, capacity = Channel.UNLIMITED)
.dropWhile { it <= 1 }
println("wait")
delay(5000)
println("collect")
flow.collect { println(it) }
}
}
Mattias Flodin
01/14/2021, 11:52 AMMarc Knaup
01/14/2021, 11:54 AMMattias Flodin
01/14/2021, 11:55 AMMarc Knaup
01/14/2021, 11:58 AMMattias Flodin
01/14/2021, 12:39 PMMarc Knaup
01/14/2021, 12:41 PMGlobalScope
.
But that’s only useful if you actually use that scope. If you create a reactive producer scope and use that one instead for bufferIn(scope, …)
then you don’t need the additional scope.Mattias Flodin
01/14/2021, 3:09 PMbufferIn
launches a new coroutine to collect the flow (and as a side effect call channel::send
).. But what if anything guarantees that that collect()
is scheduled before the delay occurs?Marc Knaup
01/14/2021, 3:17 PMMattias Flodin
01/14/2021, 3:17 PMMarc Knaup
01/14/2021, 3:18 PMsuspend fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> {
val startedFlow = CompletableDeferred<Flow<T>>()
Channel<T>(capacity).also { channel ->
onStart { startedFlow.complete(channel.consumeAsFlow()) }
.onEach(channel::send)
.launchIn(scope)
}
return startedFlow.await()
}
What about this?runBlocking
in your code for thatval flow = runBlocking {
upstream.bufferIn(…)
}
runBlocking
introduces a new scope. So you wouldn’t want to use bufferIn(this, …)
Mattias Flodin
01/14/2021, 3:25 PMMarc Knaup
01/14/2021, 3:26 PMbufferIn
is run before you set up your DB queryMattias Flodin
01/14/2021, 3:29 PMThe [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
after this `onStart` action will be collected
(see [onSubscription] for an alternative operator on shared flows).
Marc Knaup
01/14/2021, 3:30 PMMattias Flodin
01/14/2021, 3:42 PMMarc Knaup
01/14/2021, 3:44 PMMattias Flodin
01/14/2021, 3:49 PMstartedFlow.await()
will wait for changedIdsFlow.first()
to begin collecting. I'm just unsure what guarantees I have that the async
code will ever run. Seems that it does, though.Marc Knaup
01/14/2021, 3:50 PM.first()
won’t just begin collecting. It will begin and then end again after the first collected element 🤔Mattias Flodin
01/14/2021, 3:50 PMMarc Knaup
01/14/2021, 3:51 PMMattias Flodin
01/14/2021, 3:55 PM