reline
02/05/2024, 7:04 PMlaunch
here even though I'm inside a suspending function?
private val scope: CoroutineScope = ..
private val connectionState = MutableStateFlow<Connection?>(null)
suspend fun connect(): Connection = withContext(scope.coroutineContext) {
val sharedFlow: SharedFlow<Connection> = connectionFlow()
.shareIn(this, SharingStarted.Eagerly, replay = 1)
launch { sharedFlow.collect(connectionState) }
sharedFlow.first()
}
streetsofboston
02/05/2024, 7:10 PMcoroutineScope { ..... launch { ..... }. }
inside your connect
function instead, making sure that the launch
is part of the same coroutine-scope that called your connect
function.
when using structured concurrency, note that the call to your connect
function only returns(\resumes) after all the child coroutines (ie your launch) have returned/completed!
That means that the sharedFlow.collect
must have completed/returned. And that the call to sharedFlow.first()
must have completed/returned.streetsofboston
02/05/2024, 7:12 PMcoroutineScope { .... launch { ....} or .... val result = async { ...}
is perfectly already if you want to do some parallel/concurrent work within your suspend function (collect
)
But as i noted, the collect
will only return/resume after all these parallel/concurrent tasks have completed as well due to the structured concurrency of coroutines.reline
02/05/2024, 7:14 PMasync
and ran into the issue where my function wasn't returning due to the `launch`/`collect` still running so I switched to withContext
. I missed the coroutineScope
though, so I should be having the same "problem". Thanks!
Do you have a suggestion on how I could redesign this so that I can suspend for the first value but collect on the shared flow even after the function returns?reline
02/05/2024, 7:16 PMsuspend fun connect(): Connection {
val flow = ..
scope.launch { .. }
return flow.first()
}
streetsofboston
02/05/2024, 7:16 PMsuspend fun connect(): Connection = coroutineScope {
val sharedFlow: SharedFlow<Connection> = connectionFlow()
.shareIn(this, SharingStarted.Eagerly, replay = 1)
launch { sharedFlow.collect(connectionState) }
sharedFlow.first()
}
streetsofboston
02/05/2024, 7:17 PMstreetsofboston
02/05/2024, 7:18 PMfirst
is a terminating function... no need to call 'collect' as well, even asynchronouslystreetsofboston
02/05/2024, 7:20 PMconnectionFlow().shareIn(someScope, haringStarted.Eagerly, replay = 1).first()
should do it.reline
02/05/2024, 7:21 PMreline
02/05/2024, 7:24 PMinit
block and collect on the state flow, then just query the state flow
init {
scope.launch { connectionFlow().collect(stateFlow) }
}
fun connect(): Connection = stateFlow.filterNotNull().first()
but I don't want to begin collecting the flow until connect()
is calledreline
02/05/2024, 7:37 PMprivate val actor = scope.watchState()
fun CoroutineScope.watchState() = actor {
for (item in channel) {
// connectionFlow is a channelFlow which closes upon disconnect
connectionFlow().collect(stateFlow)
}
}
suspend fun connect(): Connection {
return stateFlow.value ?: run {
actor.send(Unit) // could also pass a `CompletableDeferred` and `await()`
stateFlow.filterNotNull().first()
}
}