Is there an operator for `SharedFlow` that cancels...
# coroutines
l
Is there an operator for
SharedFlow
that cancels the flow automatically after all subscribers have left? Something like
refCount()
in RxJava.
b
use
SharingStarted.WhileSubscribed()
in
.shareIn
operator
l
Thank you!
Will
onCompletion
then be called?
I need to close some underlying resources when the flow is completed.
To be precise: I don't have a cold upstream which I convert to hot one using
shareIn
. I create a hot flow directly, with
MutableSharedFlow
and I need to complete this flow and close underlying resource.
The flow source is a Websocket channel.
e
In this case you'll have to subscribe to
subscriptionCount
on your hot
MutableSharedFlow
(https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-mutable-shared-flow/subscription-count.html) and track it yourself to implement the corresponding logic. However, it is asynchronous. I don't think there is a reliable way to distinguish the case of "subscribers have not appeared yet" from the case "subscribers have already disappeared".
l
Yes, that's what I tried and as you wrote it doesn't really solve the problem. Currently I just implemented a
leaveChannel()
method and every subscriber has to call it, and I will track the subscriber number internally. Are there any plans to introduce something like
refCount()
for
SharedFlow
? http://reactivex.io/documentation/operators/refcount.html
e
It is similar to refCount. But unlike Rx the whole flow is asynchronous. It has no synchronous operations.
l
Thank you for the explanation.
u
Im new to flow, but coming from rx, why dont you just wrap your websocket coldly, and then just turn it hot via shareIn / refCount? youd then have invokeOnCancel or whatever is called, to clear the resources in the
callbackFlow { }
builder
l
We have a problem with the underlying websocket library as it somehow doesn't work well with multiple subscribers. That's why we cannot create an underlying cold flow (which would resubscribe to the resource for every new collector), but have to directly use a single hot flow.
u
I get that, just keep the cold stuff private, so you can have the resource clearing callback, and then expose only hot flow via shareIn