diesieben07
01/08/2020, 10:14 AMonCompletion
, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g. take
operator.
onCompletion
also has no context information:
flow {
val callback = MyCallbackHandler(this@flow)
api.call(callback)
}.onCompletion { t ->
// Was the stream cancelled? I don't know.
// and how do I access callback here to (maybe) tell it to cancel the request
}
Dominaezzz
01/08/2020, 11:28 AMcallbackFlow
with awaitClose
.
How are you able to keep your flow suspended btw?diesieben07
01/08/2020, 12:16 PMfinally
is okay, but not catch
, because that breaks flow cancellation.
But with finally
I still don't know whether the stream got cancelled or completed fully. And I don't have the exception that may have occurred either.
callbackFlow
+ awaitClose
does not work. The awaitClose
code is not called if the flow is cancelled (e.g. using take(1)
)Dominaezzz
01/08/2020, 12:24 PMdiesieben07
01/08/2020, 12:24 PMval myFlow = callbackFlow {
send(1)
send(2)
delay(200)
send(3)
close()
awaitClose {
println("await close")
}
}.onCompletion {
println("on completion!")
it?.printStackTrace()
}
myFlow.take(1).collect {
println("got value $it")
}
Prints:
got value 1
on completion!
Dominaezzz
01/08/2020, 12:25 PMdiesieben07
01/08/2020, 12:26 PMDominaezzz
01/08/2020, 12:26 PMdiesieben07
01/08/2020, 12:26 PMclose
the flow never finishes.Dominaezzz
01/08/2020, 12:27 PMtake
?diesieben07
01/08/2020, 12:28 PMtake
cancels the flow so it will finish. But awaitClose
is still not called (how could it, the producing lambda hasn't even gotten there...try-finally
, but then I am back to square one (don't know if it was cancelled, don't have the failing exception).Dominaezzz
01/08/2020, 12:33 PMdiesieben07
01/08/2020, 12:34 PMinternal
.val myFlow = callbackFlow {
try {
send(1)
send(2)
delay(200)
send(3)
close()
} catch (e: Exception) {
println("Failure! ... not really")
e.printStackTrace()
}
}
myFlow.take(1).collect {
println("got value $it")
}
Shows:
got value 1
Failure! ... not really
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
AbortFlowException
, because it's internal.Dominaezzz
01/08/2020, 12:37 PMdiesieben07
01/08/2020, 12:37 PMapiCall.cancel()
as opposed to apiCall.fail(exception)
.Dominaezzz
01/08/2020, 12:40 PMFlow
is concerned here, only the first one is possible. Flow
just doesn't provide the second one.Flow
had multiple consumers and one fails, should the rest fail because of it?diesieben07
01/08/2020, 12:41 PMDominaezzz
01/08/2020, 12:43 PMflow {}.broadcastIn() ......
, which can have multiple consumers. There will be a share
operator coming soon also, that will basically allow flows have multiple collectors.diesieben07
01/08/2020, 12:43 PMbroadcastIn
is still one terminal operator, which collects the flow once.Dominaezzz
01/08/2020, 12:47 PMFlow
.diesieben07
01/08/2020, 12:49 PMDominaezzz
01/08/2020, 12:51 PMFlow implementations never catch or handle exceptions that occur in downstream flows.You shouldn't handle the exceptions.
diesieben07
01/08/2020, 12:53 PMDominaezzz
01/08/2020, 12:54 PMdiesieben07
01/08/2020, 12:55 PMCancelling without a message or cause is suboptimal
Dominaezzz
01/08/2020, 12:59 PMdiesieben07
01/08/2020, 1:00 PMval myFlow = flow {
var done = false
try {
emit(1)
emit(2)
delay(200)
emit(3)
println("closed")
done = true
} finally {
if (!done) {
println("cancelled!")
}
}
}
Seems to work