Hi All. I'm trying to make a flow that wants to em...
# flow
j
Hi All. I'm trying to make a flow that wants to emit the result of another flow but it also needs to trigger some side effect code once we're ready to start emitting that flows results. This is what I've done:
Copy code
return channelFlow {
     launch(start = CoroutineStart.UNDISPATCHED) { 
       messageSourceFlow.collect { send(it) 
     }
     try { 
         turnOnMessages()
         awaitCancellation() 
     } finally {
        turnOffMessages() 
     }
}
The key element is the
launch(start = CoroutineStart.UNDISPATCHED) {
because I want to make sure I'm ready to receive messages before I call the
turnOnMessages
method. Does this make sense? Is there an easier way to achieve this goal?
e
Copy code
messageSourceFlow
    .onStart { turnOnMessages() }
    .onCompletion { turnOffMessages() }
j
Unfortunately that would mean I can miss the first message since i'm not collecting yet from the messageSourceFlow while onStart runs
s
Can you explain more about where
messageSourceFlow
comes from? How/why did it come to be a
Flow
? I ask because if there's something behind it that's capable of sending messages while the flow isn't being collected, there's almost certainly a buffer in there too. For example, if it's a
callbackFlow
, it has a buffer.
And if there's a buffer, you already have somewhere for your messages to sit and wait, meaning that adding a second buffer in the form of the
channelFlow
is redundant.
j
messageSourceFlow
is a
SharedFlow
that is being populated from a background thread that's connected to some external service that sends me messages. I could setup a buffer on that shared flow but then I'd also potentially receive old messages since I can have multiple flows turning on / off messages (I basically reference count the calls to turn on/off to really control the server side flow). I think for my question I'm really wondering if the launch with an UNDISPATCHED start is the best way to ensure I'm collecting before calling turnOnMessags
s
If it's a shared flow, you can use
onSubscription
instead of
onStart
and that should solve your issue
j
Ahh thank you! I didn't know that existed
OOC, Do you know why that's only on shared flows?
s
That lifecycle phase just doesn't exist for a cold flow. When you start collecting a cold flow, you immediately pass control to the upstream, and don't get it back again until the upstream emits a value to you. The act of starting the upstream flow is the same as the act of collecting its first value. It's only with a shared flow that the upstream flow can run concurrently with the downstream collector. And it's only when they're running concurrently that the upstream has the possibility to emit values before the collector has actually started collecting.