Łukasz Tokarski
02/06/2025, 6:58 AMstateIn or shareIn , over callbackFlow-based upstream, never call awaitClose if the started argument is SharingStarted.Lazily, SharingStarted.Eagerly.
For example:
callbackFlow {
// some logic creating a callback
// and registering it.
// then the mandatory
awaitClose {
// code to unregister the callback, but it's never called
}
}.stateIn(
scope = myScope,
started = SharingStarted.Eagerly,
initialValue = someValue,
)
Even when I cancel the myScope, the awaitClose is not called.
The only way to make it work is to use started = SharingStarted.WhileSubscribed()Sam
02/06/2025, 7:17 AMawaitClose is always executed. Are you able to share a complete runnable program that shows the issue?Łukasz Tokarski
02/06/2025, 8:15 AMŁukasz Tokarski
02/06/2025, 8:40 AMSam
02/06/2025, 8:57 AMcallbackFlow {…} block is executed. The awaitClose() function runs its cleanup block if the coroutine was cancelled during the call to awaitClose(). But in your example, the instruction to cancel the scope happens before awaitClose() has actually been invoked. After you cancel the scope, the loop attempts to continue, running its next delay(). Because the scope has been cancelled, that delay() immediately throws a CancellationException, exiting the callbackFlow {…} block before it ever reaches the call to awaitClose().Sam
02/06/2025, 9:02 AMawaitClose() function is typically useful when the callbackFlow {…} doesn't contain any long-running code of its own, and is just waiting indefinitely for the consumer (or the callback) to go away.Łukasz Tokarski
02/06/2025, 9:16 AMawaitClose suspend function but it's still never called.
In your very last sentence you wrote:
and is just waiting indefinitely for the consumer (or the callback) to go away.Indeed, my callback based API, indeed does not have a call like:
onCompleted, that would do the trick for me.
Now when it comes to the consumer of that flow. Please note it is the SharedFlow started Eagerly and according to the Kotlin spec. it never stops collecting the upstream.
You have shed new light on the problem, but I still need to verify the production usage on my side and eventually come back with more relevant example or... gonna identify the problem on our side.
Anyway, really helpful feedback. Much appreciated. I will keep you posted.Łukasz Tokarski
02/06/2025, 9:58 AMŁukasz Tokarski
02/06/2025, 9:59 AMawaitClose throws CancellationException instead of executing its body.Łukasz Tokarski
02/06/2025, 10:01 AMsubscribe(...) and unsubscribe but also onCompleted so I can unregister listeners and avoid leaks.
Relying on awaitClose is unreliable in some scenarios with hot flows started Eagerly or Lazily.kevin.cianfarini
02/07/2025, 3:22 AMcallbackFlow you could use a channel directly?
val upstream = flow {
val channel = Channel(...)
val callback = { channel.trySend(it) }
thing.registerCallback(callback)
try {
for (item in channel) {
emit(item)
}
} finally {
thing.unregister(callback)
}
}
val stateFlow = upstream.stateIn(...)Łukasz Tokarski
02/07/2025, 6:55 AMcallbackFlow + started = WhileSubscribed which seems to always work and the awaitClose {} is always called.
Thanks