ursus
01/05/2022, 4:28 AMBUFFERED
and expected for consumers to use buffer
operator?bezrukov
01/05/2022, 6:24 AM.buffer
doesn't create a new channel in this case, but modifies the underlying channelJoffrey
01/05/2022, 9:38 AMcallbackFlow
and channelFlow
say it all:
A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.So yes, to change the default you should use
.buffer()
on the flow returned by callbackFlow
Most of those channel-based operators are implemented in a smart way to perform operator "fusion". In those operators, there is a special paragraph mentioning how it's fused with others. Here is for instance the leading sentence of that paragraph in the doc of buffer
:
Adjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution
ursus
01/05/2022, 12:48 PMursus
01/05/2022, 12:54 PMJoffrey
01/05/2022, 1:26 PMchannelFlow
or callbackFlow
, it's an implementation that manually creates its own channel, so the operator fusion cannot happen on the resulting flow in this case. I'm surprised it's implemented this way, this could be written as a callbackFlow
.ursus
01/05/2022, 1:29 PMursus
01/05/2022, 1:30 PMursus
01/05/2022, 1:31 PMJoffrey
01/05/2022, 2:56 PMthis opfusion is new to me, how "far" downstream does it work? anywhere?It only applies to adjacent operators, at least that's what the doc says. It's technically just an optimization to avoid having multiple coroutines and channels when it's not necessary
Joffrey
01/05/2022, 2:57 PMif upstream loses the values, then downstream is solWhat do you mean by "sol"?
ursus
01/05/2022, 3:02 PMJoffrey
01/05/2022, 3:04 PMIf I were to change the lib to callbackflow+conflate; and then in userspace add buffer(unlimited) operator, then will userdpace buffer win?Yes, because
callbackFlow
, conflate
and buffer
would be adjacent and thus "fused"ursus
01/05/2022, 3:08 PMcallbackFlow
.conflate() // library for performance
.buffer(UNLIMITED) // me because of certain use case
just to be super sure, before I send them a PR, you mean the adjacent operators can be an infinite list? therefore the last in the list wins? would this work? or once conflated, always conflated?Joffrey
01/05/2022, 3:22 PMyou mean the adjacent operators can be an infinite list?Yes, that's how I understand it.
therefore the last in the list wins?Not necessarily, I think it depends on the semantics of each operator. They must be merged in a way that's consistent with the behaviour they could have without the fusion. Without talking about operator fusion (which is an optimization) using
buffer(UNLIMITED)
after conflate
should launch a separate coroutine collecting elements from the conflated flow and putting them in the infinite buffer. If fast enough, this coroutine should never let conflate
actually drop events. So if we have a buffer(UNLIMITED)
after conflate
, it makes sense that we could optimize it by just ignoring conflate
completely.
For other fusions, I don't know if that holds. For instance, using a smaller buffer after a big buffer would not make the resulting buffer small (I believe). The biggest buffer should win I guess.
`Mmmh that's a good question. Sorry if I was a bit quick to answer, but indeed operator fusion should just mean optimization (using a single coroutine+channel). It should not change the behaviour. So I would find it strange that an extraJoffrey
01/05/2022, 3:23 PMursus
01/05/2022, 3:25 PMbuffer
a MutableStateFlow then? I tried that in unrelated case, and it didnt work, as if buffer
it self had sort of a backpressureJoffrey
01/05/2022, 3:28 PMursus
01/05/2022, 3:48 PMJoffrey
01/05/2022, 4:15 PMcallbackFlow
was experimental 🤔 but might be worth asking anyway, you're rightJoffrey
01/05/2022, 4:15 PMursus
01/05/2022, 4:22 PMJoffrey
01/05/2022, 4:26 PMJoffrey
01/05/2022, 4:26 PMso if I understand this correctly, if I went from "user -> null -> user" very quickly, the collector might not see the null, right?Correct
ursus
01/05/2022, 4:53 PMJoffrey
01/05/2022, 4:58 PMcallbackFlow
, and I guess it was indeed to use stable coroutines, which I think is an obsolete comment given that this is stable nowJoffrey
01/05/2022, 4:59 PMursus
01/05/2022, 5:00 PMursus
01/05/2022, 5:07 PMmap
doesnt support backpressure?
i.e. if notifications themselves are not concurrent, but maps might; they would get sequentialized, not conflated, correct? or is the map somehow included in the actions that contribute to causing backpressure at notification levelursus
01/05/2022, 5:11 PMsqldelightQuery.asFlow()
.map { query ->
withContext(IO) {
it.executeAsOneOrNull()
}
}
ursus
01/05/2022, 5:19 PMmap
in this case, has not yet "taken" the emit, but the body of the map it self, doesn't count into this time, right? i.e. the channel backpressure only cares about the hand off, not the processing?Joffrey
01/05/2022, 6:48 PMursus
01/05/2022, 7:10 PMbuffer
right?Joffrey
01/05/2022, 7:10 PMursus
01/05/2022, 7:10 PMursus
01/05/2022, 7:18 PMJoffrey
01/06/2022, 9:07 AMNick Allen
01/06/2022, 6:49 PMJoffrey
01/06/2022, 8:02 PMFusion is not as simple as trying to preserve the behaviorHow is preserving the behaviour simple?
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandonedNot true, I actually experimented with
.conflate().buffer()
and you can get elements in a way that's equivalent to no conflation. Conflation is a behaviour that depends on consumers so nothing is really guaranteed either wayNick Allen
01/06/2022, 8:25 PMHow is preserving the behaviour simple?It'd be a simple to document/explain. I was not referrng to implementation.
Not true, I actually experimented withI meant thatand you can get elements in a way that's equivalent to no conflation..conflate().buffer()
conflate().buffer()
is equivalent to conflate()
. If you are disputing that, could you share a repro?Nick Allen
01/06/2022, 8:26 PMJoffrey
01/06/2022, 9:08 PMI meant thatI understand, and I indeed dispute it. Here is a repro: https://pl.kotl.in/SLuY8UKA_is equivalent toconflate().buffer()
. If you are disputing that, could you share a repro?conflate()
Joffrey
01/06/2022, 9:37 PMNick Allen
01/06/2022, 10:23 PMbuffer()
with no arguments which is effectively thrown away by the fusion operation.
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned.I was wrong here, only
buffer()
with no args is thrown away.
I believe fusion should just be considered an optimization,I agree, but that doesn't mean it preserves behavior. I added to your example: https://pl.kotl.in/ncpiNF-4c.
.map {it}
should be a no-op, but here it changes the output significantly.ursus
01/07/2022, 2:00 PMmap
and upstream? or why is it different that the 2nd flow..Joffrey
01/07/2022, 2:02 PMmap
prevents the fusion of the operators, but even without fusion, buffer
should prevent backpressure from the collector to reach the conflate
part, so I don't see the reason why conflate()
is dropping eventsursus
01/07/2022, 2:03 PMursus
01/10/2022, 11:34 AMNick Allen
01/10/2022, 8:37 PMAdjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution.There is no "limit" and library boundaries do not factor in.
Note thatThis is actually incorrect, I think, and should beoperator is a shortcut for buffer withconflate
of Channel.CONFLATED, with is, in turn, a shortcut to a buffer that only keeps the latest element as created bycapacity
.buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
buffer(capacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
, but it doesn't affect the result because:
Explicitly specified buffer capacity takes precedence overSo the explicit buffer sizes are added together. Soorbuffer()
calls, which effectively requests a buffer of any size. Multiple requests with a specified buffer size produce a buffer with the sum of the requested buffer sizes.buffer(Channel.BUFFERED)
conflate().buffer(Channel.UNLIMITED)
is the same as buffer(Channel.UNLIMITED, onBufferOverflow = BufferOverflow.DROP_OLDEST)
which will never drop anything.Nick Allen
01/10/2022, 8:47 PMconflate().buffer(Channel.UNLIMITED)
the upsteam is able to push directly into the unlimited channel.
With conflate().map {it}.buffer(Channel.UNLIMITED)
the upsteam is pushing into a rendevouz channel with DROP_OLDEST (so send will never block). Another coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer. Basically, in the example, all the elements are emitted before the coroutine reading the rendevouz channel even starts.Joffrey
01/10/2022, 11:34 PMAnother coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer.My interrogation is why isn't this buffer coroutine started at the same time as the flow's producer? And why isn't it fast enough to prevent conflation?
Nick Allen
01/10/2022, 11:50 PMwhy isn't this buffer coroutine started at the same time as the flow's producer?Who says it doesn't? Even if it starts first it'll suspend to receive.
And why isn't it fast enough to prevent conflation?
emit
is calling send
on a non-suspending Channel
. It doesn't suspend. So it's able to emit all the values before the reading coroutine resumes. runBlocking
is single threaded by default so the reading channel can't resume until the emitting is all done.
If I switch to Dispatchers.Default
and bump it to repeat(1000)
with take(50).onEach
then I see some elements make it through sometimes. Even then it's all a race condition.Joffrey
01/10/2022, 11:57 PMWho says it doesn't?I guess I misread your message: "_all the elements are emitted before the coroutine reading the rendevouz channel even starts"._ The coroutine is scheduled but never actually gets the chance to run.
Oh good point, I forgot about this. Even though it's calling the suspendingis callingemit
on a non-suspendingsend
Channel
send()
, it doesn't actually return COROUTINE_SUSPENDED
so it doesn't actually suspend in this case. Thanks for pointing this outursus
01/12/2022, 9:13 PMNick Allen
01/12/2022, 9:18 PMFlow
.ursus
01/12/2022, 9:26 PMursus
01/13/2022, 2:25 PM