Mark
01/04/2021, 1:17 PMsuspend fun foo(progressLambda: (Long) -> Unit)
and I want to convert this to a function returning a StateFlow<State>
where State
is a sealed class providing Progress
, Success
etc. Initially I thought of using callbackFlow
but that seems to be more for functions that run asynchronously. Is there a more appropriate way?gildor
01/05/2021, 12:12 AMMark
01/05/2021, 2:52 AMemit
from within the lambda because it is not a suspend lambda.Mark
01/05/2021, 3:16 AMcallbackFlow
) would be something like:
val progressFlow = MutableStateFlow(State.Progress(0L))
return flow {
emit(null)
foo { progressValue ->
progressFlow.value = State.Progress(progressValue)
}
emit(State.Success)
}.combine(progressFlow) { invokeState, progressState ->
invokeState ?: progressState
}
Mark
01/05/2021, 3:29 AMflow {
val progressFlow = MutableStateFlow(State.Progress(0L)
coroutineScope {
val progressJob = launch {
progressFlow.collect { progress ->
emit(progress)
}
}
foo { progressValue ->
progressFlow.value = State.Progress(progressValue)
}
progressJob.cancel()
emit(State.Success)
}
}
Mark
01/05/2021, 3:34 AMcombine
approach, because I am more confident there will never be a progress state emitted after the success state.gildor
01/05/2021, 4:49 AMMark
01/05/2021, 4:49 AMgildor
01/05/2021, 4:49 AMgildor
01/05/2021, 4:49 AMgildor
01/05/2021, 4:50 AMMark
01/05/2021, 4:51 AMcallbackFlow {
send(State.Progress(0L))
try {
foo { progressValue ->
offer(State.Progress(progressValue))
}
send(State.Success)
close()
} catch (e: Exception) {
close(e)
}
}.conflate() // we only care about the latest value
Mark
01/05/2021, 4:53 AMcallbackFlow {
send(State.Progress(0L))
try {
foo { progressValue ->
offer(State.Progress(progressValue))
}
send(State.Success)
} catch (e: Exception) {
send(State.Failure(e.message))
}
// we need to close() or awaitClose(), but nothing to wait for
close()
}.conflate() // we only care about the latest value
Mark
01/05/2021, 4:55 AMgildor
01/05/2021, 7:51 AMgildor
01/05/2021, 7:55 AMfoo
is blocking function, so you probably want also add .flowOn(IO/Computation)Mark
01/05/2021, 7:57 AMfoo
does not launch another coroutine so I think it’s impossible for the lambda to be called/completed after foo
returns. So offer
can only be called before foo
returns, and so the question comes down to: if offer
is called before send
(potentially using a different dispatcher) is the order of emitted values guaranteed to be the same?gildor
01/05/2021, 8:13 AMgildor
01/05/2021, 8:14 AMgildor
01/05/2021, 8:14 AMgildor
01/05/2021, 8:14 AMMark
01/05/2021, 8:25 AMcombine
solution. You get a clear priority of the ‘invoke’ flow over the ‘progress’ flow. invokeState ?: progressState
val progressFlow = MutableStateFlow(State.Progress(0L))
val invokeFlow = flow {
emit(null)
try {
foo { progressValue ->
progressFlow.value = State.Progress(progressValue)
}
emit(State.Success)
} catch (e: Exception) {
emit(State.Failure(e.message))
}
}
return invokeFlow.combine(progressFlow) { invokeState, progressState ->
invokeState ?: progressState
}
gildor
01/05/2021, 8:39 AMMark
01/05/2021, 8:46 AMStateFlow
will swallow duplicates. Also using separate progress flow, allows us to do things like debounce the progress.gildor
01/05/2021, 9:02 AM