I'm converting a callback-based API into a flow. I...
# coroutines
d
I'm converting a callback-based API into a flow. I need to signal to the API if the flow is cancelled or fails with an exception. I know about
onCompletion
, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g.
take
operator.
onCompletion
also has no context information:
Copy code
flow {
  val callback = MyCallbackHandler(this@flow)
  api.call(callback)
}.onCompletion { t ->
  // Was the stream cancelled? I don't know.
  // and how do I access callback here to (maybe) tell it to cancel the request
}
d
You might have to use
callbackFlow
with
awaitClose
. How are you able to keep your flow suspended btw?
Oh it's you. You can't wrap your while loop in a try/finally. Assuming you haven't changed your code.
d
Yes, I know I can't do that. well,
finally
is okay, but not
catch
, because that breaks flow cancellation. But with
finally
I still don't know whether the stream got cancelled or completed fully. And I don't have the exception that may have occurred either.
callbackFlow
+
awaitClose
does not work. The
awaitClose
code is not called if the flow is cancelled (e.g. using
take(1)
)
d
It's not called?!
d
Copy code
val myFlow = callbackFlow {
        send(1)
        send(2)
        delay(200)
        send(3)
        close()
        awaitClose {
            println("await close")
        }
    }.onCompletion {
        println("on completion!")
        it?.printStackTrace()
    }

    myFlow.take(1).collect {
        println("got value $it")
    }
Prints:
Copy code
got value 1
on completion!
d
I know the documentation. It seems to be wrong for channels in flows...
d
Yeah, but you close it though, before awaitClose was called.
d
If I remove
close
the flow never finishes.
Which makes sense, because the channel does not get closed.
d
Even with the
take
?
d
Well,
take
cancels the flow so it will finish. But
awaitClose
is still not called (how could it, the producing lambda hasn't even gotten there...
I can surround my producer with
try-finally
, but then I am back to square one (don't know if it was cancelled, don't have the failing exception).
d
Use try/catch and rethrow the exception.
d
Still no way to tell if it was simply cancelled or if it's a genuine exception. There is no way to check for the cancellation exception, because it's class is marked
internal
.
Copy code
val myFlow = callbackFlow {
        try {
            send(1)
            send(2)
            delay(200)
            send(3)
            close()
        } catch (e: Exception) {
            println("Failure! ... not really")
            e.printStackTrace()
        }
    }

    myFlow.take(1).collect {
        println("got value $it")
    }
Shows:
Copy code
got value 1
Failure! ... not really
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
And there is no way to check for
AbortFlowException
, because it's internal.
d
On second thought, does the difference matter? Why does the producer need to know how (as supposed to just when) the consumer failed?
d
apiCall.cancel()
as opposed to
apiCall.fail(exception)
.
The first one is a perfectly normal condition "I don't need more elements, thanks". The 2nd one is "something went wrong"
d
I still don't understand why it needs to know? As far as
Flow
is concerned here, only the first one is possible.
Flow
just doesn't provide the second one.
I mean, what if your
Flow
had multiple consumers and one fails, should the rest fail because of it?
d
Each flow only has one collector... You can't call two terminal operators on one flow.
Well, you can, but they will just make your emitter code run twice.
d
flow {}.broadcastIn() ......
, which can have multiple consumers. There will be a
share
operator coming soon also, that will basically allow flows have multiple collectors.
d
Yes, but
broadcastIn
is still one terminal operator, which collects the flow once.
As far as the flow is concerned, there is one consumer, which just happens to broadcast things.
d
Well, yes, but you could say that about literally every operator. Every operator is a consumer which happens to emit things as another
Flow
.
Regardless, why should you get downstream exceptions?
d
I don't know. Should I? I am not sure I understand the point you are making.
d
Sorry. I'm not very good at explaining things.
Flow implementations never catch or handle exceptions that occur in downstream flows.
You shouldn't handle the exceptions.
d
Okay. So just cancel without any information
d
Yes
d
This logs a warning from the API...
Copy code
Cancelling without a message or cause is suboptimal
d
🤷🏼 You can pass the cancellation exception as a cause I guess, idk. Weird API imo.
d
grpc 😄
Copy code
val myFlow = flow {
        var done = false
        try {
            emit(1)
            emit(2)
            delay(200)
            emit(3)
            println("closed")
            done = true
        } finally {
            if (!done) {
                println("cancelled!")
            }
        }
    }
Seems to work