diesieben07
01/08/2020, 10:14 AMonCompletion
, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g. take
operator.
onCompletion
also has no context information:
flow {
val callback = MyCallbackHandler(this@flow)
api.call(callback)
}.onCompletion { t ->
// Was the stream cancelled? I don't know.
// and how do I access callback here to (maybe) tell it to cancel the request
}
Dominaezzz
01/08/2020, 11:28 AMcallbackFlow
with awaitClose
.
How are you able to keep your flow suspended btw?Dominaezzz
01/08/2020, 11:40 AMdiesieben07
01/08/2020, 12:16 PMfinally
is okay, but not catch
, because that breaks flow cancellation.
But with finally
I still don't know whether the stream got cancelled or completed fully. And I don't have the exception that may have occurred either.
callbackFlow
+ awaitClose
does not work. The awaitClose
code is not called if the flow is cancelled (e.g. using take(1)
)Dominaezzz
01/08/2020, 12:24 PMdiesieben07
01/08/2020, 12:24 PMval myFlow = callbackFlow {
send(1)
send(2)
delay(200)
send(3)
close()
awaitClose {
println("await close")
}
}.onCompletion {
println("on completion!")
it?.printStackTrace()
}
myFlow.take(1).collect {
println("got value $it")
}
Prints:
got value 1
on completion!
Dominaezzz
01/08/2020, 12:25 PMdiesieben07
01/08/2020, 12:26 PMDominaezzz
01/08/2020, 12:26 PMdiesieben07
01/08/2020, 12:26 PMclose
the flow never finishes.diesieben07
01/08/2020, 12:27 PMDominaezzz
01/08/2020, 12:27 PMtake
?diesieben07
01/08/2020, 12:28 PMtake
cancels the flow so it will finish. But awaitClose
is still not called (how could it, the producing lambda hasn't even gotten there...diesieben07
01/08/2020, 12:32 PMtry-finally
, but then I am back to square one (don't know if it was cancelled, don't have the failing exception).Dominaezzz
01/08/2020, 12:33 PMdiesieben07
01/08/2020, 12:34 PMinternal
.diesieben07
01/08/2020, 12:35 PMval myFlow = callbackFlow {
try {
send(1)
send(2)
delay(200)
send(3)
close()
} catch (e: Exception) {
println("Failure! ... not really")
e.printStackTrace()
}
}
myFlow.take(1).collect {
println("got value $it")
}
Shows:
got value 1
Failure! ... not really
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
diesieben07
01/08/2020, 12:35 PMAbortFlowException
, because it's internal.Dominaezzz
01/08/2020, 12:37 PMdiesieben07
01/08/2020, 12:37 PMapiCall.cancel()
as opposed to apiCall.fail(exception)
.diesieben07
01/08/2020, 12:38 PMDominaezzz
01/08/2020, 12:40 PMFlow
is concerned here, only the first one is possible. Flow
just doesn't provide the second one.Dominaezzz
01/08/2020, 12:41 PMFlow
had multiple consumers and one fails, should the rest fail because of it?diesieben07
01/08/2020, 12:41 PMdiesieben07
01/08/2020, 12:42 PMDominaezzz
01/08/2020, 12:43 PMflow {}.broadcastIn() ......
, which can have multiple consumers. There will be a share
operator coming soon also, that will basically allow flows have multiple collectors.diesieben07
01/08/2020, 12:43 PMbroadcastIn
is still one terminal operator, which collects the flow once.diesieben07
01/08/2020, 12:44 PMDominaezzz
01/08/2020, 12:47 PMFlow
.Dominaezzz
01/08/2020, 12:48 PMdiesieben07
01/08/2020, 12:49 PMDominaezzz
01/08/2020, 12:51 PMDominaezzz
01/08/2020, 12:51 PMFlow implementations never catch or handle exceptions that occur in downstream flows.You shouldn't handle the exceptions.
diesieben07
01/08/2020, 12:53 PMDominaezzz
01/08/2020, 12:54 PMdiesieben07
01/08/2020, 12:55 PMCancelling without a message or cause is suboptimal
Dominaezzz
01/08/2020, 12:59 PMdiesieben07
01/08/2020, 1:00 PMdiesieben07
01/08/2020, 1:03 PMval myFlow = flow {
var done = false
try {
emit(1)
emit(2)
delay(200)
emit(3)
println("closed")
done = true
} finally {
if (!done) {
println("cancelled!")
}
}
}
Seems to work