https://kotlinlang.org logo
#coroutines
Title
# coroutines
l

Lukasz Kalnik

03/29/2021, 3:29 PM
Is there an operator for
SharedFlow
that cancels the flow automatically after all subscribers have left? Something like
refCount()
in RxJava.
b

bezrukov

03/29/2021, 3:30 PM
use
SharingStarted.WhileSubscribed()
in
.shareIn
operator
l

Lukasz Kalnik

03/29/2021, 3:43 PM
Thank you!
Will
onCompletion
then be called?
I need to close some underlying resources when the flow is completed.
To be precise: I don't have a cold upstream which I convert to hot one using
shareIn
. I create a hot flow directly, with
MutableSharedFlow
and I need to complete this flow and close underlying resource.
The flow source is a Websocket channel.
e

elizarov

03/29/2021, 4:20 PM
In this case you'll have to subscribe to
subscriptionCount
on your hot
MutableSharedFlow
(https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-mutable-shared-flow/subscription-count.html) and track it yourself to implement the corresponding logic. However, it is asynchronous. I don't think there is a reliable way to distinguish the case of "subscribers have not appeared yet" from the case "subscribers have already disappeared".
l

Lukasz Kalnik

03/29/2021, 7:56 PM
Yes, that's what I tried and as you wrote it doesn't really solve the problem. Currently I just implemented a
leaveChannel()
method and every subscriber has to call it, and I will track the subscriber number internally. Are there any plans to introduce something like
refCount()
for
SharedFlow
? http://reactivex.io/documentation/operators/refcount.html
e

elizarov

03/29/2021, 8:03 PM
It is similar to refCount. But unlike Rx the whole flow is asynchronous. It has no synchronous operations.
l

Lukasz Kalnik

03/29/2021, 8:06 PM
Thank you for the explanation.
u

ursus

03/30/2021, 4:05 AM
Im new to flow, but coming from rx, why dont you just wrap your websocket coldly, and then just turn it hot via shareIn / refCount? youd then have invokeOnCancel or whatever is called, to clear the resources in the
callbackFlow { }
builder
l

Lukasz Kalnik

03/30/2021, 8:38 AM
We have a problem with the underlying websocket library as it somehow doesn't work well with multiple subscribers. That's why we cannot create an underlying cold flow (which would resubscribe to the resource for every new collector), but have to directly use a single hot flow.
u

ursus

03/30/2021, 9:21 AM
I get that, just keep the cold stuff private, so you can have the resource clearing callback, and then expose only hot flow via shareIn
4 Views