Łukasz Tokarski
02/06/2025, 6:58 AMstateIn
or shareIn
, over callbackFlow
-based upstream, never call awaitClose
if the started
argument is SharingStarted.Lazily
, SharingStarted.Eagerly
.
For example:
callbackFlow {
// some logic creating a callback
// and registering it.
// then the mandatory
awaitClose {
// code to unregister the callback, but it's never called
}
}.stateIn(
scope = myScope,
started = SharingStarted.Eagerly,
initialValue = someValue,
)
Even when I cancel the myScope
, the awaitClose
is not called.
The only way to make it work is to use started = SharingStarted.WhileSubscribed()
Sam
02/06/2025, 7:17 AMawaitClose
is always executed. Are you able to share a complete runnable program that shows the issue?Łukasz Tokarski
02/06/2025, 8:15 AMŁukasz Tokarski
02/06/2025, 8:40 AMSam
02/06/2025, 8:57 AMcallbackFlow {…}
block is executed. The awaitClose()
function runs its cleanup block if the coroutine was cancelled during the call to awaitClose()
. But in your example, the instruction to cancel the scope happens before awaitClose()
has actually been invoked. After you cancel the scope, the loop attempts to continue, running its next delay()
. Because the scope has been cancelled, that delay()
immediately throws a CancellationException
, exiting the callbackFlow {…}
block before it ever reaches the call to awaitClose()
.Sam
02/06/2025, 9:02 AMawaitClose()
function is typically useful when the callbackFlow {…}
doesn't contain any long-running code of its own, and is just waiting indefinitely for the consumer (or the callback) to go away.Łukasz Tokarski
02/06/2025, 9:16 AMawaitClose
suspend function but it's still never called.
In your very last sentence you wrote:
and is just waiting indefinitely for the consumer (or the callback) to go away.Indeed, my callback based API, indeed does not have a call like:
onCompleted
, that would do the trick for me.
Now when it comes to the consumer of that flow. Please note it is the SharedFlow
started Eagerly
and according to the Kotlin spec. it never stops collecting the upstream.
You have shed new light on the problem, but I still need to verify the production usage on my side and eventually come back with more relevant example or... gonna identify the problem on our side.
Anyway, really helpful feedback. Much appreciated. I will keep you posted.Łukasz Tokarski
02/06/2025, 9:58 AMŁukasz Tokarski
02/06/2025, 9:59 AMawaitClose
throws CancellationException
instead of executing its body.Łukasz Tokarski
02/06/2025, 10:01 AMsubscribe(...)
and unsubscribe
but also onCompleted
so I can unregister listeners and avoid leaks.
Relying on awaitClose
is unreliable in some scenarios with hot flows started Eagerly
or Lazily
.kevin.cianfarini
02/07/2025, 3:22 AMcallbackFlow
you could use a channel directly?
val upstream = flow {
val channel = Channel(...)
val callback = { channel.trySend(it) }
thing.registerCallback(callback)
try {
for (item in channel) {
emit(item)
}
} finally {
thing.unregister(callback)
}
}
val stateFlow = upstream.stateIn(...)
Łukasz Tokarski
02/07/2025, 6:55 AMcallbackFlow
+ started = WhileSubscribed
which seems to always work and the awaitClose {}
is always called.
Thanks