Hello everyone, Is that by design that hot flows ...
# coroutines
ł
Hello everyone, Is that by design that hot flows created via
stateIn
or
shareIn
, over
callbackFlow
-based upstream, never call
awaitClose
if the
started
argument is
SharingStarted.Lazily
,
SharingStarted.Eagerly
. For example:
Copy code
callbackFlow {
   // some logic creating a callback
   // and registering it.

   // then the mandatory
   awaitClose {
      // code to unregister the callback, but it's never called
   }
}.stateIn(
   scope = myScope,
   started = SharingStarted.Eagerly,
   initialValue = someValue,
)
Even when I cancel the
myScope
, the
awaitClose
is not called. The only way to make it work is to use
started = SharingStarted.WhileSubscribed()
1
s
Sounds intriguing, but I can't reproduce the problem you're describing. If I run the code you shared,
awaitClose
is always executed. Are you able to share a complete runnable program that shows the issue?
ł
Hi Sam, thanks for verifying. For me it was also a surprising "discovery". I will try to prepare a working example and share here.
@Sam, please have a look when you get a chance: Link to Kotlin Playground
👀 1
s
Thanks Łukasz! The explanation is due to the order in which the code inside the
callbackFlow {…}
block is executed. The
awaitClose()
function runs its cleanup block if the coroutine was cancelled during the call to
awaitClose()
. But in your example, the instruction to cancel the scope happens before
awaitClose()
has actually been invoked. After you cancel the scope, the loop attempts to continue, running its next
delay()
. Because the scope has been cancelled, that
delay()
immediately throws a
CancellationException
, exiting the
callbackFlow {…}
block before it ever reaches the call to
awaitClose()
.
The
awaitClose()
function is typically useful when the
callbackFlow {…}
doesn't contain any long-running code of its own, and is just waiting indefinitely for the consumer (or the callback) to go away.
ł
My real (production) code is exactly as you described. It does not have any long-running producer like I presented on the Playground. After work, I will try to come up with a better example. Your explanation is clear however in my case, producer scope reaches the
awaitClose
suspend function but it's still never called. In your very last sentence you wrote:
and is just waiting indefinitely for the consumer (or the callback) to go away.
Indeed, my callback based API, indeed does not have a call like:
onCompleted
, that would do the trick for me. Now when it comes to the consumer of that flow. Please note it is the
SharedFlow
started
Eagerly
and according to the Kotlin spec. it never stops collecting the upstream. You have shed new light on the problem, but I still need to verify the production usage on my side and eventually come back with more relevant example or... gonna identify the problem on our side. Anyway, really helpful feedback. Much appreciated. I will keep you posted.
👍 1
Based on your feedback, Sam, I have re-reviewd the spec and found the following pitfall
It's exactly what happens to my code, especially reproducible in Unit Tests where I control the test dispatcher and cancel it when the test ends. The
awaitClose
throws
CancellationException
instead of executing its body.
It looks that the bullet-proof solution for that is to embrace callback-based APIs to provide not only methods like
subscribe(...)
and
unsubscribe
but also
onCompleted
so I can unregister listeners and avoid leaks. Relying on
awaitClose
is unreliable in some scenarios with hot flows started
Eagerly
or
Lazily
.
k
Rather than using a
callbackFlow
you could use a channel directly?
Copy code
val upstream = flow { 
  val channel = Channel(...)
  val callback = { channel.trySend(it) }
  thing.registerCallback(callback)
  try {
    for (item in channel) {
      emit(item)
    }
  } finally {
    thing.unregister(callback)
  }
}

val stateFlow = upstream.stateIn(...)
ł
Hi Kevin, that's a good idea! For the time being, we use
callbackFlow
+
started = WhileSubscribed
which seems to always work and the
awaitClose {}
is always called. Thanks