https://kotlinlang.org logo
#coroutines
Title
# coroutines
g

George

07/22/2022, 1:15 PM
Hi guys, i have a question. In case i have a shared flow and i want to apply filter, which is the simplest way to retrieve back the sharedFlow, since i cant just cast it back to SharedFlow as filer returns a Flow. Thanks in advance for any help
Hmm, i was thinking of using the shareIn() operator but my flow is actual a hot one instead of a cold. So it is not ideal to use it right?
After some digging, i kinda dropped the idea of converting back and forth the flow from sharedFlow to Flow (not sure if im going to the right direction or not). I came up with two versions:
Copy code
private val sender: MutableSharedFlow<Message> = MutableSharedFlow()
override fun stream(client: Client): Flow<Message> {
    return sender
        .onSubscription { emitAll(latest(client)) }
        .filter { getClient(it.client) == client }
}
1️⃣ Con: the filter is applied also on the elements from subscription which i already make sure they are the right ones.
Copy code
private val messageCache: ConcurrentMap<Client, MutableSharedFlow<Message>> =
    ConcurrentHashMap<Client, MutableSharedFlow<Message>>().apply {
        put(Csp1, MutableSharedFlow())
        put(Csp2, MutableSharedFlow())
    }

override fun stream(client: Client): Flow<Message> {
    return messageCache[client]!!.onSubscription {
        latest(client)
    }
}
2️⃣ Con: Not sure if it really is a con but i need a new MutableSharedFlow for every new client (which they are not that many tbh.) Pro: the filter operator is no longer needed at all. ps: i really dont like the !! 😛
t

Tijl

07/22/2022, 3:00 PM
the resulting
Flow
from your
filter
is cold not hot. Not sure why you would want to avoid
shareIn
if you need a
SharedFlow
after your filter. Especially since you don’t see to use any replay or buffer.
g

George

07/22/2022, 3:02 PM
hmm, how come it is cold? i thought the underlying flow was still hot since is based on MutableSharedFlow
t

Tijl

07/22/2022, 3:12 PM
filter
doesn’t care or know whether the upstream flow is cold or hot, it will only collect the upstream flow when it is collected. e.g. see this example: https://pl.kotl.in/gja9cjw0Z
g

George

07/22/2022, 3:16 PM
I thought the complete signal will not be send and the filter operator will filters forever
Actually i get what u mean now, yea that's true the resulting flow is cold. I confused cold vs hot flows for a bit 😛. Tbh i dont mind for it to being cold as a result, the more critical part is that i dont lose the properties of the MutableSharedFlow, which i dont right ?: specifically if there are no subscribers all emitted messages are dropped.
t

Tijl

07/22/2022, 3:25 PM
the filter flow completes, e.g here I use first so it completes. I could also cancel the scope I am collecting in, etc
yes, if noone is collecting your filter it’s like noone is collecting your SharedFlow (if there are no other collection).
🙏 1
the filter flow only becomes a collector when it is collected
it kind of sound like you just need
filter
and that is it TBH (no need to turn it back into a SharedFlow after), but of course I don’t know your exact use case
g

George

07/22/2022, 3:30 PM
Yup indeed i only need filter with the way my code is now, beforehand i was using
onSubscription
after the
filter
so i was looking for a way to turn it back. But i think the code is a bit cleaner like that. If you dont mind one more question: Do you prefer 1 or 2 choice? I think i prefer the 2nd one.
t

Tijl

07/22/2022, 3:45 PM
both feel a bit off, but I’m not sure what
latest
does.
f

Francesc

07/22/2022, 4:48 PM
there is a feature request for this here https://github.com/Kotlin/kotlinx.coroutines/issues/2631
g

George

07/22/2022, 5:20 PM
Latest brings all the messages from the database as a cold flow. Ps the 2nd example misses the emitAll
5 Views