https://kotlinlang.org logo
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
benchmarks
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
confetti
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hiring-french
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
lincheck
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by
Title
d

diesieben07

01/08/2020, 10:14 AM
I'm converting a callback-based API into a flow. I need to signal to the API if the flow is cancelled or fails with an exception. I know about
onCompletion
, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g.
take
operator.
onCompletion
also has no context information:
flow {
  val callback = MyCallbackHandler(this@flow)
  api.call(callback)
}.onCompletion { t ->
  // Was the stream cancelled? I don't know.
  // and how do I access callback here to (maybe) tell it to cancel the request
}
d

Dominaezzz

01/08/2020, 11:28 AM
You might have to use
callbackFlow
with
awaitClose
. How are you able to keep your flow suspended btw?
Oh it's you. You can't wrap your while loop in a try/finally. Assuming you haven't changed your code.
d

diesieben07

01/08/2020, 12:16 PM
Yes, I know I can't do that. well,
finally
is okay, but not
catch
, because that breaks flow cancellation. But with
finally
I still don't know whether the stream got cancelled or completed fully. And I don't have the exception that may have occurred either.
callbackFlow
+
awaitClose
does not work. The
awaitClose
code is not called if the flow is cancelled (e.g. using
take(1)
)
d

Dominaezzz

01/08/2020, 12:24 PM
It's not called?!
d

diesieben07

01/08/2020, 12:24 PM
val myFlow = callbackFlow {
        send(1)
        send(2)
        delay(200)
        send(3)
        close()
        awaitClose {
            println("await close")
        }
    }.onCompletion {
        println("on completion!")
        it?.printStackTrace()
    }

    myFlow.take(1).collect {
        println("got value $it")
    }
Prints:
got value 1
on completion!
d

diesieben07

01/08/2020, 12:26 PM
I know the documentation. It seems to be wrong for channels in flows...
d

Dominaezzz

01/08/2020, 12:26 PM
Yeah, but you close it though, before awaitClose was called.
d

diesieben07

01/08/2020, 12:26 PM
If I remove
close
the flow never finishes.
Which makes sense, because the channel does not get closed.
d

Dominaezzz

01/08/2020, 12:27 PM
Even with the
take
?
d

diesieben07

01/08/2020, 12:28 PM
Well,
take
cancels the flow so it will finish. But
awaitClose
is still not called (how could it, the producing lambda hasn't even gotten there...
I can surround my producer with
try-finally
, but then I am back to square one (don't know if it was cancelled, don't have the failing exception).
d

Dominaezzz

01/08/2020, 12:33 PM
Use try/catch and rethrow the exception.
d

diesieben07

01/08/2020, 12:34 PM
Still no way to tell if it was simply cancelled or if it's a genuine exception. There is no way to check for the cancellation exception, because it's class is marked
internal
.
val myFlow = callbackFlow {
        try {
            send(1)
            send(2)
            delay(200)
            send(3)
            close()
        } catch (e: Exception) {
            println("Failure! ... not really")
            e.printStackTrace()
        }
    }

    myFlow.take(1).collect {
        println("got value $it")
    }
Shows:
got value 1
Failure! ... not really
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
And there is no way to check for
AbortFlowException
, because it's internal.
d

Dominaezzz

01/08/2020, 12:37 PM
On second thought, does the difference matter? Why does the producer need to know how (as supposed to just when) the consumer failed?
d

diesieben07

01/08/2020, 12:37 PM
apiCall.cancel()
as opposed to
apiCall.fail(exception)
.
The first one is a perfectly normal condition "I don't need more elements, thanks". The 2nd one is "something went wrong"
d

Dominaezzz

01/08/2020, 12:40 PM
I still don't understand why it needs to know? As far as
Flow
is concerned here, only the first one is possible.
Flow
just doesn't provide the second one.
I mean, what if your
Flow
had multiple consumers and one fails, should the rest fail because of it?
d

diesieben07

01/08/2020, 12:41 PM
Each flow only has one collector... You can't call two terminal operators on one flow.
Well, you can, but they will just make your emitter code run twice.
d

Dominaezzz

01/08/2020, 12:43 PM
flow {}.broadcastIn() ......
, which can have multiple consumers. There will be a
share
operator coming soon also, that will basically allow flows have multiple collectors.
d

diesieben07

01/08/2020, 12:43 PM
Yes, but
broadcastIn
is still one terminal operator, which collects the flow once.
As far as the flow is concerned, there is one consumer, which just happens to broadcast things.
d

Dominaezzz

01/08/2020, 12:47 PM
Well, yes, but you could say that about literally every operator. Every operator is a consumer which happens to emit things as another
Flow
.
Regardless, why should you get downstream exceptions?
d

diesieben07

01/08/2020, 12:49 PM
I don't know. Should I? I am not sure I understand the point you are making.
d

Dominaezzz

01/08/2020, 12:51 PM
Sorry. I'm not very good at explaining things.
Flow implementations never catch or handle exceptions that occur in downstream flows.
You shouldn't handle the exceptions.
d

diesieben07

01/08/2020, 12:53 PM
Okay. So just cancel without any information
d

Dominaezzz

01/08/2020, 12:54 PM
Yes
d

diesieben07

01/08/2020, 12:55 PM
This logs a warning from the API...
Cancelling without a message or cause is suboptimal
d

Dominaezzz

01/08/2020, 12:59 PM
🤷🏼 You can pass the cancellation exception as a cause I guess, idk. Weird API imo.
d

diesieben07

01/08/2020, 1:00 PM
grpc 😄
val myFlow = flow {
        var done = false
        try {
            emit(1)
            emit(2)
            delay(200)
            emit(3)
            println("closed")
            done = true
        } finally {
            if (!done) {
                println("cancelled!")
            }
        }
    }
Seems to work