https://kotlinlang.org logo
Title
m

Mattias Flodin

01/13/2021, 3:53 PM
I'm experimenting with something like this, but now I'm getting warnings about using internal coroutine API:s which doesn't seem very good...
suspend fun <T> collectFromHere(flow: SharedFlow<T>): Flow<T> {
    val channel = Channel<T>()
    flow.collect { channel.send(it) }
    return object: Flow<T> {
        @InternalCoroutinesApi
        override suspend fun collect(collector: FlowCollector<T>) {
            for (v in channel) {
                collector.emit(v)
            }
        }
    }
}
m

Marc Knaup

01/13/2021, 5:01 PM
Don’t subclass
Flow
. Instead use the
flow { … }
builder.
Or even better:
return channel.consumeAsFlow()
m

Mattias Flodin

01/14/2021, 7:30 AM
Ok so now I have this function. Does this make sense, or is there a better way of doing it?
And then I use it like so. Will the async function be cancelled and stop putting things on the channel after producing the first item?
m

Marc Knaup

01/14/2021, 10:08 AM
Using async launches a new coroutine asynchronously, so the Flow doesn't collect immediately.
m

Mattias Flodin

01/14/2021, 10:47 AM
My debugger says otherwise though?
it breaks at the call to collect() before collectFromHere returns
m

Marc Knaup

01/14/2021, 10:50 AM
There is no guarantee in what order the coroutines are executed unless you set up a very specific environment. It may or may not execute before your function returns. Also,
collect
never returns until the Flow is complete, so your channel's Flow is never returned.
m

Mattias Flodin

01/14/2021, 10:51 AM
So do you have any other suggestions? The latter part isn't important, the important thing is that the collect() is initiated so it allocates a read position in the SharedFlow.
m

Marc Knaup

01/14/2021, 10:52 AM
Sure the latter is important, otherwise you write to a channel you cannot read from 🤔
I don't have a solution for that yet, just the same problem. It's not an easy problem to solve.
m

Mattias Flodin

01/14/2021, 10:57 AM
I rewrote it to do
collectFromHereAsync(changedIdsFlow.take(1))
so the flow will be returned as soon as there is an event available. I suppose there's really no point in returning a flow at all though, I should just use first() and return the event right away.
m

Marc Knaup

01/14/2021, 10:58 AM
I'm confused what you're trying to achieve. If there's only one element you care about then why do you need to return a Flow at all?
m

Mattias Flodin

01/14/2021, 11:00 AM
That's what I'm saying, I don't need to return a flow. 🙂 What I do need to do, is wait on a flow then if there are no stored events available, then I create a publisher for the next event obtained from the flow. If there are stored events though, I just return those.
m

Marc Knaup

01/14/2021, 11:00 AM
My understanding is that you need to start collecting (= more than one) AND know the state from which you're collecting (first element of that Flow). If the initial state is emitted as part of the Flow you're collecting then good. Just put that into a CompletableDeferred
What are "stored events"? How does your code know what a stored event is?
m

Mattias Flodin

01/14/2021, 11:05 AM
Not sure if you know of Apache Kafka but it's similar to that. There's a log of events, persisted to database. The client makes a request saying "give me everything that's happened since event X". If there are persisted events after event X then those are returned. If there are no more events then the request blocks until a new event comes in, and that is returned.
m

Marc Knaup

01/14/2021, 11:06 AM
So you know X at the time you start your Flow and you can compare if an event is > or < X?
m

Mattias Flodin

01/14/2021, 11:06 AM
Yes
Let's say I do it like this: 1. Read the log to see if there are new events, return those. 2. If not, wait for the event flow to produce an event. Then, between steps 1 and 2, an event may be triggered. Because the collection starts after that, I will miss the event.
So in order to not miss any events that occur during this process, the SharedFlow subscription needs to initiate before step 1.
m

Marc Knaup

01/14/2021, 11:08 AM
Can't you use
.dropWhile { it < X }.onEach { println(it) }.launchIn(scope)
?
Or you don't have X before you start collecting?
You could also simply used a SharedFlow that starts eagerly and has some replay set.
And filter out old events later when you collect.
Means using
.shareIn
m

Mattias Flodin

01/14/2021, 11:15 AM
I do know X before I start collecting. But I'm not sure how .dropWhile would help if no event is ever received in the first place. I mean collect() or dropWhile() becomes the same problem - I need to call dropWhile() before checking the database, but I'm not interested in waiting on it unless I found that there's nothing in the database that I can return. So... maybe you're onto something with launchIn? Is flow.launchIn(GlobalScope) different in terms of scheduling from async(GlobalScope) { flow.collect(...) }?
I could use a replay set but that seems kind of fragile - I would have to decide how much backlog I need to expect to happen. If my replay value is too low I can still miss events, and if it's too high I will spend needless time dropping events every time.
m

Marc Knaup

01/14/2021, 11:17 AM
Avoid GlobalScope. Not good except in very few cases :)
launchIn
is only useful with
onEach
Do you needs events you've already collected after X be collectible again? E.g. by a new Flow collector
m

Mattias Flodin

01/14/2021, 11:19 AM
no that's why I have the database. Events that have already happened will be in the database (or cache). The flow is only used for signalling new events.
so what can I use instead of GlobalScope? I don't understand how the scope is supposed to come in when I'm in a non-coroutine context.
m

Mattias Flodin

01/14/2021, 11:23 AM
I don't really envision any actual concurrency happening here. It's only about allocating a read position. The rest of the execution happens later in whatever scope Apache Coyote has for waiting on the reactive publisher, in the case that I don't have any data from the database to return.
m

Marc Knaup

01/14/2021, 11:26 AM
What happens to
changedIdsFlow
when the collector is too slow and there’s backpressure?
Doesn’t
changedIdsFlow
do the needed buffering?
m

Mattias Flodin

01/14/2021, 11:28 AM
That's a good point. Yes I've set an extraBufferCapacity for the MutableSharedFlow, but if that runs out it drops the oldest event. Which could theoretically cause the client to miss events... and dealing with that would probably require exactly the same logic as dealing with race conditions.
hm... with a significant difference though, in that my original problem might result in an empty flow, hence the client would wait indefinitely even though something already happened. In the case of dropped events there's always new events available, so you wouldn't get hung waiting for them. But even a replay of 1 could probably solve that. You'd get an event, discover that it is "too new", then go to the database again to backtrack and find the lost events.
m

Marc Knaup

01/14/2021, 11:33 AM
How many events per second are we talking about anyway?
m

Mattias Flodin

01/14/2021, 11:35 AM
I don't know exactly as this is all new stuff, I hope not more than 100 events per second. But it's kind of not relevant as I still want to understand Kotlin coroutines/flows well enough to handle this in future situations when it might be much more.
m

Marc Knaup

01/14/2021, 11:45 AM
So like before maybe
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*


fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> =
    Channel<T>(capacity)
        .also { channel -> onEach(channel::send).launchIn(scope) }
        .consumeAsFlow()


suspend fun main() {
    val events = flow {
        repeat(10) {
            println("emit $it")
            emit(it)
            delay(1000)
        }
    }

    coroutineScope {
        val flow = events
            .bufferIn(this, capacity = Channel.UNLIMITED)
            .dropWhile { it <= 1 }

        println("wait")
        delay(5000)

        println("collect")
        flow.collect { println(it) }
    }
}
m

Mattias Flodin

01/14/2021, 11:52 AM
I'm not in a coroutine, this is a callback from graphql-java and I'm expected to return a reactive streams publisher. From where would I get the coroutine scope?
m

Mattias Flodin

01/14/2021, 11:55 AM
that uses GlobalScope though 🙂
m

Marc Knaup

01/14/2021, 11:58 AM
Maybe, but it makes sure that errors and cancelations are properly propagated to their reactive counterparts.
m

Mattias Flodin

01/14/2021, 12:39 PM
what is the function of coroutineScope here? What would be different if it wasn't there?
m

Marc Knaup

01/14/2021, 12:41 PM
The function won’t return until all coroutines in that scope have completed. And any errors in coroutines within that scope become errors of the entire scope instead of getting lost in an unhandled exception handler of
GlobalScope
. But that’s only useful if you actually use that scope. If you create a reactive producer scope and use that one instead for
bufferIn(scope, …)
then you don’t need the additional scope.
m

Mattias Flodin

01/14/2021, 3:09 PM
so
bufferIn
launches a new coroutine to collect the flow (and as a side effect call
channel::send
).. But what if anything guarantees that that
collect()
is scheduled before the delay occurs?
m

Marc Knaup

01/14/2021, 3:17 PM
What delay?
m

Mattias Flodin

01/14/2021, 3:17 PM
The delay(5000) that you have there. That's the point where I would query the database to see if there are persisted events that I should return
I need to guarantee that the coroutine started by launchIn() has invoked collect() before that happens
m

Marc Knaup

01/14/2021, 3:18 PM
Then let’s add that 🙂
suspend fun <T> Flow<T>.bufferIn(scope: CoroutineScope, capacity: Int = Channel.BUFFERED): Flow<T> {
    val startedFlow = CompletableDeferred<Flow<T>>()

    Channel<T>(capacity).also { channel ->
        onStart { startedFlow.complete(channel.consumeAsFlow()) }
            .onEach(channel::send)
            .launchIn(scope)
    }

    return startedFlow.await()
}
What about this?
hmm no
that suspends and you’re in non-coroutine code
I guess you can’t avoid
runBlocking
in your code for that
Because you want to wait with your DB query
So
val flow = runBlocking {
   upstream.bufferIn(…)
}
Just be careful that
runBlocking
introduces a new scope. So you wouldn’t want to use
bufferIn(this, …)
m

Mattias Flodin

01/14/2021, 3:25 PM
The db query will always run. If I don't get any events there, that's when I wait for the flow to produce an event instead. The only problem is that an event may be emitted to the flow during the db query and then my wait won't see that event.
m

Marc Knaup

01/14/2021, 3:26 PM
Just make sure that
bufferIn
is run before you set up your DB query
m

Mattias Flodin

01/14/2021, 3:29 PM
Hmm
The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
after this `onStart` action will be collected
(see [onSubscription] for an alternative operator on shared flows).
m

Marc Knaup

01/14/2021, 3:30 PM
oh, good catch
I guess in that case there’s no way around modifying the upstream Flow to emit an initial “ready” element.
m

Mattias Flodin

01/14/2021, 3:42 PM
Well I can use onSubscription, I guess? Here's what I have now in my actual code.
m

Marc Knaup

01/14/2021, 3:44 PM
I don’t know enough about Reactive for that
m

Mattias Flodin

01/14/2021, 3:49 PM
the key point is that
startedFlow.await()
will wait for
changedIdsFlow.first()
to begin collecting. I'm just unsure what guarantees I have that the
async
code will ever run. Seems that it does, though.
m

Marc Knaup

01/14/2021, 3:50 PM
.first()
won’t just begin collecting. It will begin and then end again after the first collected element 🤔
m

Mattias Flodin

01/14/2021, 3:50 PM
yes but I only need one event
the flow never completes anyway
m

Marc Knaup

01/14/2021, 3:51 PM
I guess I don’t know enough about your architecture anyway 🙂
m

Mattias Flodin

01/14/2021, 3:55 PM
I would guess SharedFlows typically never really complete, I mean it just signals whenever a change happens. They won't stop happening for as long as the system is running. My task is to notify the GraphQL subscription as soon as a change happens (and if the database already has recorded changes since earlier, it will notify about those immediately)
Here's how I ended up at the end. Seems to work and catches the event even if it happens during the 10-second delay.
👍 1