Is it possible to tweak channel settings in callba...
# coroutines
u
Is it possible to tweak channel settings in callback or channelflow? or is it hardcoded to
BUFFERED
and expected for consumers to use
buffer
operator?
b
Yes, the following
.buffer
doesn't create a new channel in this case, but modifies the underlying channel
j
The doc of
callbackFlow
and
channelFlow
say it all:
A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
So yes, to change the default you should use
.buffer()
on the flow returned by
callbackFlow
Most of those channel-based operators are implemented in a smart way to perform operator "fusion". In those operators, there is a special paragraph mentioning how it's fused with others. Here is for instance the leading sentence of that paragraph in the doc of
buffer
:
Adjacent applications of channelFlowflowOnbuffer, and produceIn are always fused so that only one properly configured channel is used for execution
so does it really change it? is that what operator fusion is? or does it append and fingers crossed the downstream buffer can keep up, so upstream buffer wont overflow?
j
The implementation you linked doesn't use
channelFlow
or
callbackFlow
, it's an implementation that manually creates its own channel, so the operator fusion cannot happen on the resulting flow in this case. I'm surprised it's implemented this way, this could be written as a
callbackFlow
.
u
yes im aware, i was just getting at the fact that in general, if upstream loses the values, then downstream is sol (i.e that sqldelight implementation, or channelFlow if buffer overflows)
but if you can actually change the underlying channel in channel flow, then that statement doesnt apply (i.e. downstream can guarantee upstream wont lose anything, right?) or is it just fingers crossed?
also, this opfusion is new to me, how "far" downstream does it work? anywhere? I presume they conflate as optimization. If I were to change the lib to callbackflow+conflate; and then in userspace add buffer(unlimited) operator, then will userdpace buffer win?
j
this opfusion is new to me, how "far" downstream does it work? anywhere?
It only applies to adjacent operators, at least that's what the doc says. It's technically just an optimization to avoid having multiple coroutines and channels when it's not necessary
if upstream loses the values, then downstream is sol
What do you mean by "sol"?
u
shit out of luck 😁
😆 1
j
If I were to change the lib to callbackflow+conflate; and then in userspace add buffer(unlimited) operator, then will userdpace buffer win?
Yes, because
callbackFlow
,
conflate
and
buffer
would be adjacent and thus "fused"
u
Copy code
callbackFlow
 .conflate() // library for performance
 .buffer(UNLIMITED) // me because of certain use case
just to be super sure, before I send them a PR, you mean the adjacent operators can be an infinite list? therefore the last in the list wins? would this work? or once conflated, always conflated?
j
you mean the adjacent operators can be an infinite list?
Yes, that's how I understand it.
therefore the last in the list wins?
Not necessarily, I think it depends on the semantics of each operator. They must be merged in a way that's consistent with the behaviour they could have without the fusion. Without talking about operator fusion (which is an optimization) using
buffer(UNLIMITED)
after
conflate
should launch a separate coroutine collecting elements from the conflated flow and putting them in the infinite buffer. If fast enough, this coroutine should never let
conflate
actually drop events. So if we have a
buffer(UNLIMITED)
after
conflate
, it makes sense that we could optimize it by just ignoring
conflate
completely. For other fusions, I don't know if that holds. For instance, using a smaller buffer after a big buffer would not make the resulting buffer small (I believe). The biggest buffer should win I guess. `Mmmh that's a good question. Sorry if I was a bit quick to answer, but indeed operator fusion should just mean optimization (using a single coroutine+channel). It should not change the behaviour. So I would find it strange that an extra
Note that even without changing the SqlDelight implementation, using a fast enough coroutine should prevent the underlying channel from conflating. But it won't be optimized away as fusion indeed.
u
hm, wouldnt that mean the I could
buffer
a MutableStateFlow then? I tried that in unrelated case, and it didnt work, as if
buffer
it self had sort of a backpressure
j
I think in theory it should be possible, but the coroutine collecting the state flow would have to have a chance to "be fast enough" - which is not easy to guarantee given that setting the value of a state flow is not a suspending operation. This means that setting the value several times in a row from a coroutine in a single threaded context never suspends and thus never gives a chance to consumers to get all the values
u
hm, right; guess I'll open issue with sqldelight, what's their reasoning, because there is this commit https://github.com/cashapp/sqldelight/commit/d3f404ba07dd9294055ac745855109a0c36f367a#diff-6e93b8a7a1264e7321daff71[…]f86a2e7fdd8974c5069f1c9a19c22b so they did exactly that, and changed to the hardcoded thing
j
It looks like it was because
callbackFlow
was experimental 🤔 but might be worth asking anyway, you're right
Btw, why exactly would you want to prevent conflation in this case? I believe the query is expected to be idempotent, right?
u
I have this use case where I need to infer that something happend, based on last 2 states of the database, i.e. "user was not null, now it is null, therefore logout happened" so if I understand this correctly, if I went from "user -> null -> user" very quickly, the collector might not see the null, right?
j
Is this really a reliable way of detecting events? It seems it should be the other way around: you have a logout event that triggers both the DB update and whatever you need to do in addition to that. What if some conflation happens before DB writes?
so if I understand this correctly, if I went from "user -> null -> user" very quickly, the collector might not see the null, right?
Correct
u
I was just trying to be smart by infering it from last two states, as you in theory can, if you see all changes, and save my self the explicit shared flow of Logout events https://github.com/cashapp/sqldelight/issues/2767 here is their reasoning, the channel only emits notifications to requery, it doesn't carry data; so im back to explicit events (my exact case was "if logout happened, then reset the ui to login screen", so I needed it to be done after logout happened
j
Yeah, as I said, you shouldn't use the DB as source of events. Also, you wouldn't be able to get all DB updates in the flow, only ticks to requery, so yeah you can't use it for your purpose. My question was mroe about why not
callbackFlow
, and I guess it was indeed to use stable coroutines, which I think is an obsolete comment given that this is stable now
Thanks for checking this though 😉 and good luck with the event management
u
I hate it btw 😄 now I need to check context at call site if the user is really not there. when event arrives, 2 "sources" of truth, they by definition could get mismatched etc
to finish, as backpressure tangles my brain, this is all related to backpressure of original listener notifications, right? if there are mappings downstream, which might be heavy, backpressure happening there, would just suspend the incoming map until the ongoing on finishes, since
map
doesnt support backpressure? i.e. if notifications themselves are not concurrent, but maps might; they would get sequentialized, not conflated, correct? or is the map somehow included in the actions that contribute to causing backpressure at notification level
Copy code
sqldelightQuery.asFlow()
	.map { query ->
		withContext(IO) {
    		it.executeAsOneOrNull()
  		}
	}
Oh god Im questioning everything now again 😄 backpressure on their channel happens when a downstream operator,
map
in this case, has not yet "taken" the emit, but the body of the map it self, doesn't count into this time, right? i.e. the channel backpressure only cares about the hand off, not the processing?
j
The processing of one element delays the handoff of the next, so the distinction may not matter depending on the case. But yeah if there is a buffer/channel in between, the bodies of 2 operators can be concurrent. If not, they are sequential
u
unless there is a
buffer
right?
j
I was about to edit to say just that :)
u
k got it, thank you very much!
okay sorry, to tripple down the top most source (db notifications) emits 1, the whole chain is processing it, during the processing, db creates a change notification so attempt to emit 2, this notification WAITS till the downstream maps etc finish conflation only means that during this wait it might get overwritten by emit 3, which shall be processed when emit 1 is finished processing i.e. conflation doesnt mean that when notification 2 wants to be processed, it cancels the emit 1 ongoing processing. Thats that mapLatest/flatmapLatest etc are for, am I correct?
j
Seems correct to me
n
@ursus @Joffrey Fusion is not as simple as trying to preserve the behavior. Read the docs on each fusible operator. They try to optimize for intention. For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned. Also, the fused buffer size is not max wins.
j
Fusion is not as simple as trying to preserve the behavior
How is preserving the behaviour simple?
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned
Not true, I actually experimented with
.conflate().buffer()
and you can get elements in a way that's equivalent to no conflation. Conflation is a behaviour that depends on consumers so nothing is really guaranteed either way
n
How is preserving the behaviour simple?
It'd be a simple to document/explain. I was not referrng to implementation.
Not true, I actually experimented with 
.conflate().buffer()
 and you can get elements in a way that's equivalent to no conflation.
I meant that
conflate().buffer()
is equivalent to
conflate()
. If you are disputing that, could you share a repro?
j
I meant that 
conflate().buffer()
 is equivalent to 
conflate()
. If you are disputing that, could you share a repro?
I understand, and I indeed dispute it. Here is a repro: https://pl.kotl.in/SLuY8UKA_
And that makes sense to me. I believe fusion should just be considered an optimization, I don't believe it would be correct to not preserve the original behaviour of the fused operator chain
n
Thanks for the example! I thought you meant
buffer()
with no arguments which is effectively thrown away by the fusion operation.
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned.
I was wrong here, only
buffer()
with no args is thrown away.
I believe fusion should just be considered an optimization,
I agree, but that doesn't mean it preserves behavior. I added to your example: https://pl.kotl.in/ncpiNF-4c.
.map {it}
should be a no-op, but here it changes the output significantly.
🤯 1
u
I dont understand. Does the conflation kick in in the last flow because there is some inherent overhead to operators, therefore backpressure can kick in between
map
and upstream? or why is it different that the 2nd flow..
j
I honestly don't know, I'm pretty puzzled as well. Of course
map
prevents the fusion of the operators, but even without fusion,
buffer
should prevent backpressure from the collector to reach the
conflate
part, so I don't see the reason why
conflate()
is dropping events
u
maybe you can bump my https://github.com/Kotlin/kotlinx.coroutines/issues/3123 so shamen can pitch in
could you?
n
The docs already cover your questions there:
Adjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution.
There is no "limit" and library boundaries do not factor in.
Note that
conflate
operator is a shortcut for buffer with
capacity
of Channel.CONFLATED, with is, in turn, a shortcut to a buffer that only keeps the latest element as created by
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
.
This is actually incorrect, I think, and should be
buffer(capacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
, but it doesn't affect the result because:
Explicitly specified buffer capacity takes precedence over
buffer()
or
buffer(Channel.BUFFERED)
calls, which effectively requests a buffer of any size. Multiple requests with a specified buffer size produce a buffer with the sum of the requested buffer sizes.
So the explicit buffer sizes are added together. So
conflate().buffer(Channel.UNLIMITED)
is the same as
buffer(Channel.UNLIMITED, onBufferOverflow = BufferOverflow.DROP_OLDEST)
which will never drop anything.
With
conflate().buffer(Channel.UNLIMITED)
 the upsteam is able to push directly into the unlimited channel. With
conflate().map {it}.buffer(Channel.UNLIMITED)
 the upsteam is pushing into a rendevouz channel with DROP_OLDEST (so send will never block). Another coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer. Basically, in the example, all the elements are emitted before the coroutine reading the rendevouz channel even starts.
j
Another coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer.
My interrogation is why isn't this buffer coroutine started at the same time as the flow's producer? And why isn't it fast enough to prevent conflation?
n
why isn't this buffer coroutine started at the same time as the flow's producer?
Who says it doesn't? Even if it starts first it'll suspend to receive.
And why isn't it fast enough to prevent conflation?
emit
is calling
send
on a non-suspending
Channel
. It doesn't suspend. So it's able to emit all the values before the reading coroutine resumes.
runBlocking
is single threaded by default so the reading channel can't resume until the emitting is all done. If I switch to
Dispatchers.Default
and bump it to
repeat(1000)
with
take(50).onEach
then I see some elements make it through sometimes. Even then it's all a race condition.
j
Who says it doesn't?
I guess I misread your message: "_all the elements are emitted before the coroutine reading the rendevouz channel even starts"._ The coroutine is scheduled but never actually gets the chance to run.
emit
 is calling 
send
 on a non-suspending 
Channel
Oh good point, I forgot about this. Even though it's calling the suspending
send()
, it doesn't actually return
COROUTINE_SUSPENDED
so it doesn't actually suspend in this case. Thanks for pointing this out
u
Okay so the gist of this discussion is, there is no way to undo upstream conflation, the end, right?
n
Basically yeah. I mean, if you don't want conflated data, don't collect from a conflated
Flow
.
u
thx