I'm experimenting with something like this, but no...
# flow
m
I'm experimenting with something like this, but now I'm getting warnings about using internal coroutine API:s which doesn't seem very good...
Copy code
suspend fun <T> collectFromHere(flow: SharedFlow<T>): Flow<T> {
    val channel = Channel<T>()
    flow.collect { channel.send(it) }
    return object: Flow<T> {
        @InternalCoroutinesApi
        override suspend fun collect(collector: FlowCollector<T>) {
            for (v in channel) {
                collector.emit(v)
            }
        }
    }
}
m
Don’t subclass
Flow
. Instead use the
flow { … }
builder.
Or even better:
return channel.consumeAsFlow()
m
And then I use it like so. Will the async function be cancelled and stop putting things on the channel after producing the first item?
m
Using async launches a new coroutine asynchronously, so the Flow doesn't collect immediately.
m
My debugger says otherwise though?
it breaks at the call to collect() before collectFromHere returns
m
There is no guarantee in what order the coroutines are executed unless you set up a very specific environment. It may or may not execute before your function returns. Also,
collect
never returns until the Flow is complete, so your channel's Flow is never returned.
m
So do you have any other suggestions? The latter part isn't important, the important thing is that the collect() is initiated so it allocates a read position in the SharedFlow.
m
Sure the latter is important, otherwise you write to a channel you cannot read from 🤔
I don't have a solution for that yet, just the same problem. It's not an easy problem to solve.
m
I rewrote it to do
collectFromHereAsync(changedIdsFlow.take(1))
so the flow will be returned as soon as there is an event available. I suppose there's really no point in returning a flow at all though, I should just use first() and return the event right away.
m
I'm confused what you're trying to achieve. If there's only one element you care about then why do you need to return a Flow at all?
m
That's what I'm saying, I don't need to return a flow. 🙂 What I do need to do, is wait on a flow then if there are no stored events available, then I create a publisher for the next event obtained from the flow. If there are stored events though, I just return those.
m
My understanding is that you need to start collecting (= more than one) AND know the state from which you're collecting (first element of that Flow). If the initial state is emitted as part of the Flow you're collecting then good. Just put that into a CompletableDeferred
What are "stored events"? How does your code know what a stored event is?
m
Not sure if you know of Apache Kafka but it's similar to that. There's a log of events, persisted to database. The client makes a request saying "give me everything that's happened since event X". If there are persisted events after event X then those are returned. If there are no more events then the request blocks until a new event comes in, and that is returned.
m
So you know X at the time you start your Flow and you can compare if an event is > or < X?
m
Yes
Let's say I do it like this: 1. Read the log to see if there are new events, return those. 2. If not, wait for the event flow to produce an event. Then, between steps 1 and 2, an event may be triggered. Because the collection starts after that, I will miss the event.
So in order to not miss any events that occur during this process, the SharedFlow subscription needs to initiate before step 1.
m
Can't you use
.dropWhile { it < X }.onEach { println(it) }.launchIn(scope)
?
Or you don't have X before you start collecting?
You could also simply used a SharedFlow that starts eagerly and has some replay set.
And filter out old events later when you collect.
Means using
.shareIn
m
I do know X before I start collecting. But I'm not sure how .dropWhile would help if no event is ever received in the first place. I mean collect() or dropWhile() becomes the same problem - I need to call dropWhile() before checking the database, but I'm not interested in waiting on it unless I found that there's nothing in the database that I can return. So... maybe you're onto something with launchIn? Is flow.launchIn(GlobalScope) different in terms of scheduling from async(GlobalScope) { flow.collect(...) }?
I could use a replay set but that seems kind of fragile - I would have to decide how much backlog I need to expect to happen. If my replay value is too low I can still miss events, and if it's too high I will spend needless time dropping events every time.
m
Avoid GlobalScope. Not good except in very few cases :)
launchIn
is only useful with
onEach
Do you needs events you've already collected after X be collectible again? E.g. by a new Flow collector
m
no that's why I have the database. Events that have already happened will be in the database (or cache). The flow is only used for signalling new events.
so what can I use instead of GlobalScope? I don't understand how the scope is supposed to come in when I'm in a non-coroutine context.
m
I don't really envision any actual concurrency happening here. It's only about allocating a read position. The rest of the execution happens later in whatever scope Apache Coyote has for waiting on the reactive publisher, in the case that I don't have any data from the database to return.
m
What happens to
changedIdsFlow
when the collector is too slow and there’s backpressure?
Doesn’t
changedIdsFlow
do the needed buffering?
m
That's a good point. Yes I've set an extraBufferCapacity for the MutableSharedFlow, but if that runs out it drops the oldest event. Which could theoretically cause the client to miss events... and dealing with that would probably require exactly the same logic as dealing with race conditions.
hm... with a significant difference though, in that my original problem might result in an empty flow, hence the client would wait indefinitely even though something already happened. In the case of dropped events there's always new events available, so you wouldn't get hung waiting for them. But even a replay of 1 could probably solve that. You'd get an event, discover that it is "too new", then go to the database again to backtrack and find the lost events.
m
How many events per second are we talking about anyway?
m
I don't know exactly as this is all new stuff, I hope not more than 100 events per second. But it's kind of not relevant as I still want to understand Kotlin coroutines/flows well enough to handle this in future situations when it might be much more.
m
So like before maybe
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*


fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> =
    Channel<T>(capacity)
        .also { channel -> onEach(channel::send).launchIn(scope) }
        .consumeAsFlow()


suspend fun main() {
    val events = flow {
        repeat(10) {
            println("emit $it")
            emit(it)
            delay(1000)
        }
    }

    coroutineScope {
        val flow = events
            .bufferIn(this, capacity = Channel.UNLIMITED)
            .dropWhile { it <= 1 }

        println("wait")
        delay(5000)

        println("collect")
        flow.collect { println(it) }
    }
}
m
I'm not in a coroutine, this is a callback from graphql-java and I'm expected to return a reactive streams publisher. From where would I get the coroutine scope?
m
that uses GlobalScope though 🙂
m
Maybe, but it makes sure that errors and cancelations are properly propagated to their reactive counterparts.
m
what is the function of coroutineScope here? What would be different if it wasn't there?
m
The function won’t return until all coroutines in that scope have completed. And any errors in coroutines within that scope become errors of the entire scope instead of getting lost in an unhandled exception handler of
GlobalScope
. But that’s only useful if you actually use that scope. If you create a reactive producer scope and use that one instead for
bufferIn(scope, …)
then you don’t need the additional scope.
m
so
bufferIn
launches a new coroutine to collect the flow (and as a side effect call
channel::send
).. But what if anything guarantees that that
collect()
is scheduled before the delay occurs?
m
What delay?
m
The delay(5000) that you have there. That's the point where I would query the database to see if there are persisted events that I should return
I need to guarantee that the coroutine started by launchIn() has invoked collect() before that happens
m
Then let’s add that 🙂
Copy code
suspend fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> {
    val startedFlow = CompletableDeferred<Flow<T>>()

    Channel<T>(capacity).also { channel ->
        onStart { startedFlow.complete(channel.consumeAsFlow()) }
            .onEach(channel::send)
            .launchIn(scope)
    }

    return startedFlow.await()
}
What about this?
hmm no
that suspends and you’re in non-coroutine code
I guess you can’t avoid
runBlocking
in your code for that
Because you want to wait with your DB query
So
Copy code
val flow = runBlocking {
   upstream.bufferIn(…)
}
Just be careful that
runBlocking
introduces a new scope. So you wouldn’t want to use
bufferIn(this, …)
m
The db query will always run. If I don't get any events there, that's when I wait for the flow to produce an event instead. The only problem is that an event may be emitted to the flow during the db query and then my wait won't see that event.
m
Just make sure that
bufferIn
is run before you set up your DB query
m
Hmm
Copy code
The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
after this `onStart` action will be collected
(see [onSubscription] for an alternative operator on shared flows).
m
oh, good catch
I guess in that case there’s no way around modifying the upstream Flow to emit an initial “ready” element.
m
Well I can use onSubscription, I guess? Here's what I have now in my actual code.
m
I don’t know enough about Reactive for that
m
the key point is that
startedFlow.await()
will wait for
changedIdsFlow.first()
to begin collecting. I'm just unsure what guarantees I have that the
async
code will ever run. Seems that it does, though.
m
.first()
won’t just begin collecting. It will begin and then end again after the first collected element 🤔
m
yes but I only need one event
the flow never completes anyway
m
I guess I don’t know enough about your architecture anyway 🙂
m
I would guess SharedFlows typically never really complete, I mean it just signals whenever a change happens. They won't stop happening for as long as the system is running. My task is to notify the GraphQL subscription as soon as a change happens (and if the database already has recorded changes since earlier, it will notify about those immediately)
Here's how I ended up at the end. Seems to work and catches the event even if it happens during the 10-second delay.
👍 1