diesieben07
01/08/2020, 10:14 AMonCompletion, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g. take operator.
onCompletion also has no context information:
flow {
val callback = MyCallbackHandler(this@flow)
api.call(callback)
}.onCompletion { t ->
// Was the stream cancelled? I don't know.
// and how do I access callback here to (maybe) tell it to cancel the request
}Dominaezzz
01/08/2020, 11:28 AMcallbackFlow with awaitClose.
How are you able to keep your flow suspended btw?Dominaezzz
01/08/2020, 11:40 AMdiesieben07
01/08/2020, 12:16 PMfinally is okay, but not catch, because that breaks flow cancellation.
But with finally I still don't know whether the stream got cancelled or completed fully. And I don't have the exception that may have occurred either.
callbackFlow + awaitClose does not work. The awaitClose code is not called if the flow is cancelled (e.g. using take(1))Dominaezzz
01/08/2020, 12:24 PMdiesieben07
01/08/2020, 12:24 PMval myFlow = callbackFlow {
send(1)
send(2)
delay(200)
send(3)
close()
awaitClose {
println("await close")
}
}.onCompletion {
println("on completion!")
it?.printStackTrace()
}
myFlow.take(1).collect {
println("got value $it")
}
Prints:
got value 1
on completion!Dominaezzz
01/08/2020, 12:25 PMdiesieben07
01/08/2020, 12:26 PMDominaezzz
01/08/2020, 12:26 PMdiesieben07
01/08/2020, 12:26 PMclose the flow never finishes.diesieben07
01/08/2020, 12:27 PMDominaezzz
01/08/2020, 12:27 PMtake ?diesieben07
01/08/2020, 12:28 PMtake cancels the flow so it will finish. But awaitClose is still not called (how could it, the producing lambda hasn't even gotten there...diesieben07
01/08/2020, 12:32 PMtry-finally, but then I am back to square one (don't know if it was cancelled, don't have the failing exception).Dominaezzz
01/08/2020, 12:33 PMdiesieben07
01/08/2020, 12:34 PMinternal.diesieben07
01/08/2020, 12:35 PMval myFlow = callbackFlow {
try {
send(1)
send(2)
delay(200)
send(3)
close()
} catch (e: Exception) {
println("Failure! ... not really")
e.printStackTrace()
}
}
myFlow.take(1).collect {
println("got value $it")
}
Shows:
got value 1
Failure! ... not really
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements neededdiesieben07
01/08/2020, 12:35 PMAbortFlowException, because it's internal.Dominaezzz
01/08/2020, 12:37 PMdiesieben07
01/08/2020, 12:37 PMapiCall.cancel() as opposed to apiCall.fail(exception).diesieben07
01/08/2020, 12:38 PMDominaezzz
01/08/2020, 12:40 PMFlow is concerned here, only the first one is possible. Flow just doesn't provide the second one.Dominaezzz
01/08/2020, 12:41 PMFlow had multiple consumers and one fails, should the rest fail because of it?diesieben07
01/08/2020, 12:41 PMdiesieben07
01/08/2020, 12:42 PMDominaezzz
01/08/2020, 12:43 PMflow {}.broadcastIn() ...... , which can have multiple consumers. There will be a share operator coming soon also, that will basically allow flows have multiple collectors.diesieben07
01/08/2020, 12:43 PMbroadcastIn is still one terminal operator, which collects the flow once.diesieben07
01/08/2020, 12:44 PMDominaezzz
01/08/2020, 12:47 PMFlow.Dominaezzz
01/08/2020, 12:48 PMdiesieben07
01/08/2020, 12:49 PMDominaezzz
01/08/2020, 12:51 PMDominaezzz
01/08/2020, 12:51 PMFlow implementations never catch or handle exceptions that occur in downstream flows.You shouldn't handle the exceptions.
diesieben07
01/08/2020, 12:53 PMDominaezzz
01/08/2020, 12:54 PMdiesieben07
01/08/2020, 12:55 PMCancelling without a message or cause is suboptimalDominaezzz
01/08/2020, 12:59 PMdiesieben07
01/08/2020, 1:00 PMdiesieben07
01/08/2020, 1:03 PMval myFlow = flow {
var done = false
try {
emit(1)
emit(2)
delay(200)
emit(3)
println("closed")
done = true
} finally {
if (!done) {
println("cancelled!")
}
}
}
Seems to work