https://kotlinlang.org logo
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
benchmarks
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
confetti
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
lincheck
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by
Title
u

ursus

01/05/2022, 4:28 AM
Is it possible to tweak channel settings in callback or channelflow? or is it hardcoded to
BUFFERED
and expected for consumers to use
buffer
operator?
b

bezrukov

01/05/2022, 6:24 AM
Yes, the following
.buffer
doesn't create a new channel in this case, but modifies the underlying channel
j

Joffrey

01/05/2022, 9:38 AM
The doc of
callbackFlow
and
channelFlow
say it all:
A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
So yes, to change the default you should use
.buffer()
on the flow returned by
callbackFlow
Most of those channel-based operators are implemented in a smart way to perform operator "fusion". In those operators, there is a special paragraph mentioning how it's fused with others. Here is for instance the leading sentence of that paragraph in the doc of
buffer
:
Adjacent applications of channelFlowflowOnbuffer, and produceIn are always fused so that only one properly configured channel is used for execution
so does it really change it? is that what operator fusion is? or does it append and fingers crossed the downstream buffer can keep up, so upstream buffer wont overflow?
j

Joffrey

01/05/2022, 1:26 PM
The implementation you linked doesn't use
channelFlow
or
callbackFlow
, it's an implementation that manually creates its own channel, so the operator fusion cannot happen on the resulting flow in this case. I'm surprised it's implemented this way, this could be written as a
callbackFlow
.
u

ursus

01/05/2022, 1:29 PM
yes im aware, i was just getting at the fact that in general, if upstream loses the values, then downstream is sol (i.e that sqldelight implementation, or channelFlow if buffer overflows)
but if you can actually change the underlying channel in channel flow, then that statement doesnt apply (i.e. downstream can guarantee upstream wont lose anything, right?) or is it just fingers crossed?
also, this opfusion is new to me, how "far" downstream does it work? anywhere? I presume they conflate as optimization. If I were to change the lib to callbackflow+conflate; and then in userspace add buffer(unlimited) operator, then will userdpace buffer win?
j

Joffrey

01/05/2022, 2:56 PM
this opfusion is new to me, how "far" downstream does it work? anywhere?
It only applies to adjacent operators, at least that's what the doc says. It's technically just an optimization to avoid having multiple coroutines and channels when it's not necessary
if upstream loses the values, then downstream is sol
What do you mean by "sol"?
u

ursus

01/05/2022, 3:02 PM
shit out of luck 😁
😆 1
j

Joffrey

01/05/2022, 3:04 PM
If I were to change the lib to callbackflow+conflate; and then in userspace add buffer(unlimited) operator, then will userdpace buffer win?
Yes, because
callbackFlow
,
conflate
and
buffer
would be adjacent and thus "fused"
u

ursus

01/05/2022, 3:08 PM
callbackFlow
 .conflate() // library for performance
 .buffer(UNLIMITED) // me because of certain use case
just to be super sure, before I send them a PR, you mean the adjacent operators can be an infinite list? therefore the last in the list wins? would this work? or once conflated, always conflated?
j

Joffrey

01/05/2022, 3:22 PM
you mean the adjacent operators can be an infinite list?
Yes, that's how I understand it.
therefore the last in the list wins?
Not necessarily, I think it depends on the semantics of each operator. They must be merged in a way that's consistent with the behaviour they could have without the fusion. Without talking about operator fusion (which is an optimization) using
buffer(UNLIMITED)
after
conflate
should launch a separate coroutine collecting elements from the conflated flow and putting them in the infinite buffer. If fast enough, this coroutine should never let
conflate
actually drop events. So if we have a
buffer(UNLIMITED)
after
conflate
, it makes sense that we could optimize it by just ignoring
conflate
completely. For other fusions, I don't know if that holds. For instance, using a smaller buffer after a big buffer would not make the resulting buffer small (I believe). The biggest buffer should win I guess. `Mmmh that's a good question. Sorry if I was a bit quick to answer, but indeed operator fusion should just mean optimization (using a single coroutine+channel). It should not change the behaviour. So I would find it strange that an extra
Note that even without changing the SqlDelight implementation, using a fast enough coroutine should prevent the underlying channel from conflating. But it won't be optimized away as fusion indeed.
u

ursus

01/05/2022, 3:25 PM
hm, wouldnt that mean the I could
buffer
a MutableStateFlow then? I tried that in unrelated case, and it didnt work, as if
buffer
it self had sort of a backpressure
j

Joffrey

01/05/2022, 3:28 PM
I think in theory it should be possible, but the coroutine collecting the state flow would have to have a chance to "be fast enough" - which is not easy to guarantee given that setting the value of a state flow is not a suspending operation. This means that setting the value several times in a row from a coroutine in a single threaded context never suspends and thus never gives a chance to consumers to get all the values
u

ursus

01/05/2022, 3:48 PM
hm, right; guess I'll open issue with sqldelight, what's their reasoning, because there is this commit https://github.com/cashapp/sqldelight/commit/d3f404ba07dd9294055ac745855109a0c36f367a#diff-6e93b8a7a1264e7321daff71[…]f86a2e7fdd8974c5069f1c9a19c22b so they did exactly that, and changed to the hardcoded thing
j

Joffrey

01/05/2022, 4:15 PM
It looks like it was because
callbackFlow
was experimental 🤔 but might be worth asking anyway, you're right
Btw, why exactly would you want to prevent conflation in this case? I believe the query is expected to be idempotent, right?
u

ursus

01/05/2022, 4:22 PM
I have this use case where I need to infer that something happend, based on last 2 states of the database, i.e. "user was not null, now it is null, therefore logout happened" so if I understand this correctly, if I went from "user -> null -> user" very quickly, the collector might not see the null, right?
j

Joffrey

01/05/2022, 4:26 PM
Is this really a reliable way of detecting events? It seems it should be the other way around: you have a logout event that triggers both the DB update and whatever you need to do in addition to that. What if some conflation happens before DB writes?
so if I understand this correctly, if I went from "user -> null -> user" very quickly, the collector might not see the null, right?
Correct
u

ursus

01/05/2022, 4:53 PM
I was just trying to be smart by infering it from last two states, as you in theory can, if you see all changes, and save my self the explicit shared flow of Logout events https://github.com/cashapp/sqldelight/issues/2767 here is their reasoning, the channel only emits notifications to requery, it doesn't carry data; so im back to explicit events (my exact case was "if logout happened, then reset the ui to login screen", so I needed it to be done after logout happened
j

Joffrey

01/05/2022, 4:58 PM
Yeah, as I said, you shouldn't use the DB as source of events. Also, you wouldn't be able to get all DB updates in the flow, only ticks to requery, so yeah you can't use it for your purpose. My question was mroe about why not
callbackFlow
, and I guess it was indeed to use stable coroutines, which I think is an obsolete comment given that this is stable now
Thanks for checking this though 😉 and good luck with the event management
u

ursus

01/05/2022, 5:00 PM
I hate it btw 😄 now I need to check context at call site if the user is really not there. when event arrives, 2 "sources" of truth, they by definition could get mismatched etc
to finish, as backpressure tangles my brain, this is all related to backpressure of original listener notifications, right? if there are mappings downstream, which might be heavy, backpressure happening there, would just suspend the incoming map until the ongoing on finishes, since
map
doesnt support backpressure? i.e. if notifications themselves are not concurrent, but maps might; they would get sequentialized, not conflated, correct? or is the map somehow included in the actions that contribute to causing backpressure at notification level
sqldelightQuery.asFlow()
	.map { query ->
		withContext(IO) {
    		it.executeAsOneOrNull()
  		}
	}
Oh god Im questioning everything now again 😄 backpressure on their channel happens when a downstream operator,
map
in this case, has not yet "taken" the emit, but the body of the map it self, doesn't count into this time, right? i.e. the channel backpressure only cares about the hand off, not the processing?
j

Joffrey

01/05/2022, 6:48 PM
The processing of one element delays the handoff of the next, so the distinction may not matter depending on the case. But yeah if there is a buffer/channel in between, the bodies of 2 operators can be concurrent. If not, they are sequential
u

ursus

01/05/2022, 7:10 PM
unless there is a
buffer
right?
j

Joffrey

01/05/2022, 7:10 PM
I was about to edit to say just that :)
u

ursus

01/05/2022, 7:10 PM
k got it, thank you very much!
okay sorry, to tripple down the top most source (db notifications) emits 1, the whole chain is processing it, during the processing, db creates a change notification so attempt to emit 2, this notification WAITS till the downstream maps etc finish conflation only means that during this wait it might get overwritten by emit 3, which shall be processed when emit 1 is finished processing i.e. conflation doesnt mean that when notification 2 wants to be processed, it cancels the emit 1 ongoing processing. Thats that mapLatest/flatmapLatest etc are for, am I correct?
j

Joffrey

01/06/2022, 9:07 AM
Seems correct to me
n

Nick Allen

01/06/2022, 6:49 PM
@ursus @Joffrey Fusion is not as simple as trying to preserve the behavior. Read the docs on each fusible operator. They try to optimize for intention. For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned. Also, the fused buffer size is not max wins.
j

Joffrey

01/06/2022, 8:02 PM
Fusion is not as simple as trying to preserve the behavior
How is preserving the behaviour simple?
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned
Not true, I actually experimented with
.conflate().buffer()
and you can get elements in a way that's equivalent to no conflation. Conflation is a behaviour that depends on consumers so nothing is really guaranteed either way
n

Nick Allen

01/06/2022, 8:25 PM
How is preserving the behaviour simple?
It'd be a simple to document/explain. I was not referrng to implementation.
Not true, I actually experimented with 
.conflate().buffer()
 and you can get elements in a way that's equivalent to no conflation.
I meant that
conflate().buffer()
is equivalent to
conflate()
. If you are disputing that, could you share a repro?
j

Joffrey

01/06/2022, 9:08 PM
I meant that 
conflate().buffer()
 is equivalent to 
conflate()
. If you are disputing that, could you share a repro?
I understand, and I indeed dispute it. Here is a repro: https://pl.kotl.in/SLuY8UKA_
And that makes sense to me. I believe fusion should just be considered an optimization, I don't believe it would be correct to not preserve the original behaviour of the fused operator chain
n

Nick Allen

01/06/2022, 10:23 PM
Thanks for the example! I thought you meant
buffer()
with no arguments which is effectively thrown away by the fusion operation.
For example, if you conflate, then it's assumed that you only care about the latest value so all other buffering is abandoned.
I was wrong here, only
buffer()
with no args is thrown away.
I believe fusion should just be considered an optimization,
I agree, but that doesn't mean it preserves behavior. I added to your example: https://pl.kotl.in/ncpiNF-4c.
.map {it}
should be a no-op, but here it changes the output significantly.
🤯 1
u

ursus

01/07/2022, 2:00 PM
I dont understand. Does the conflation kick in in the last flow because there is some inherent overhead to operators, therefore backpressure can kick in between
map
and upstream? or why is it different that the 2nd flow..
j

Joffrey

01/07/2022, 2:02 PM
I honestly don't know, I'm pretty puzzled as well. Of course
map
prevents the fusion of the operators, but even without fusion,
buffer
should prevent backpressure from the collector to reach the
conflate
part, so I don't see the reason why
conflate()
is dropping events
u

ursus

01/07/2022, 2:03 PM
maybe you can bump my https://github.com/Kotlin/kotlinx.coroutines/issues/3123 so shamen can pitch in
could you?
n

Nick Allen

01/10/2022, 8:37 PM
The docs already cover your questions there:
Adjacent applications of channelFlow, flowOn, buffer, and produceIn are always fused so that only one properly configured channel is used for execution.
There is no "limit" and library boundaries do not factor in.
Note that
conflate
operator is a shortcut for buffer with
capacity
of Channel.CONFLATED, with is, in turn, a shortcut to a buffer that only keeps the latest element as created by
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
.
This is actually incorrect, I think, and should be
buffer(capacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
, but it doesn't affect the result because:
Explicitly specified buffer capacity takes precedence over
buffer()
or
buffer(Channel.BUFFERED)
calls, which effectively requests a buffer of any size. Multiple requests with a specified buffer size produce a buffer with the sum of the requested buffer sizes.
So the explicit buffer sizes are added together. So
conflate().buffer(Channel.UNLIMITED)
is the same as
buffer(Channel.UNLIMITED, onBufferOverflow = BufferOverflow.DROP_OLDEST)
which will never drop anything.
With
conflate().buffer(Channel.UNLIMITED)
 the upsteam is able to push directly into the unlimited channel. With
conflate().map {it}.buffer(Channel.UNLIMITED)
 the upsteam is pushing into a rendevouz channel with DROP_OLDEST (so send will never block). Another coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer. Basically, in the example, all the elements are emitted before the coroutine reading the rendevouz channel even starts.
j

Joffrey

01/10/2022, 11:34 PM
Another coroutine is reading from that rendevouz channel, calling the map lambda, and then pushing into the unlimited buffer.
My interrogation is why isn't this buffer coroutine started at the same time as the flow's producer? And why isn't it fast enough to prevent conflation?
n

Nick Allen

01/10/2022, 11:50 PM
why isn't this buffer coroutine started at the same time as the flow's producer?
Who says it doesn't? Even if it starts first it'll suspend to receive.
And why isn't it fast enough to prevent conflation?
emit
is calling
send
on a non-suspending
Channel
. It doesn't suspend. So it's able to emit all the values before the reading coroutine resumes.
runBlocking
is single threaded by default so the reading channel can't resume until the emitting is all done. If I switch to
Dispatchers.Default
and bump it to
repeat(1000)
with
take(50).onEach
then I see some elements make it through sometimes. Even then it's all a race condition.
j

Joffrey

01/10/2022, 11:57 PM
Who says it doesn't?
I guess I misread your message: "_all the elements are emitted before the coroutine reading the rendevouz channel even starts"._ The coroutine is scheduled but never actually gets the chance to run.
emit
 is calling 
send
 on a non-suspending 
Channel
Oh good point, I forgot about this. Even though it's calling the suspending
send()
, it doesn't actually return
COROUTINE_SUSPENDED
so it doesn't actually suspend in this case. Thanks for pointing this out
u

ursus

01/12/2022, 9:13 PM
Okay so the gist of this discussion is, there is no way to undo upstream conflation, the end, right?
n

Nick Allen

01/12/2022, 9:18 PM
Basically yeah. I mean, if you don't want conflated data, don't collect from a conflated
Flow
.
u

ursus

01/12/2022, 9:26 PM
thx