Mark
01/22/2025, 3:25 AMStateFlow
from another StateFlow
without requiring a CoroutineScope
(so no new coroutine):
private class MappedStateFlow<T, R>(
private val original: StateFlow<T>,
private val transform: (T) -> R
) : StateFlow<R> {
override val replayCache: List<R>
get() = original.replayCache.map(transform)
override val value: R
get() = transform(original.value)
override suspend fun collect(collector: FlowCollector<R>): Nothing {
original.collect { collector.emit(transform(it)) }
}
}
fun <T, R> StateFlow<T>.mapState(transform: (T) -> R): StateFlow<R> {
return MappedStateFlow(this, transform)
}
ephemient
01/22/2025, 4:54 AMNot stable for inheritance
The StateFlow interface is not stable for inheritance in 3rd party libraries
Mark
01/22/2025, 4:55 AMephemient
01/22/2025, 4:56 AMephemient
01/22/2025, 4:57 AMephemient
01/22/2025, 5:01 AMmapped.collect {
it === mapped.value // may be false
}
and it may also violate the documented conclusion equality-based conflationephemient
01/22/2025, 5:02 AMStateFlow
Mark
01/22/2025, 6:55 AMprivate class MappedStateFlow<T, R>(
private val original: StateFlow<T>,
private val transform: (T) -> R,
) : StateFlow<R> {
@Volatile
private var _value: R = transform(original.value)
override val replayCache: List<R>
get() = listOf(_value)
override val value: R
get() = _value
override suspend fun collect(collector: FlowCollector<R>): Nothing {
original.collect {
_value = transform(it)
collector.emit(_value)
}
}
}
or with using a mutex:
private val mutex = Mutex()
override suspend fun collect(collector: FlowCollector<R>): Nothing {
original.collectLatest {
val transformedValue = transform(it)
mutex.withLock { // to ensure the value and emitted values are consistent
_value = transformedValue
collector.emit(transformedValue)
}
}
throw IllegalStateException("Should not get here")
}
ephemient
01/22/2025, 7:18 AMStateFlow
is documented. which is maybe fine for your use case but it should not be a StateFlow
.Mark
01/22/2025, 7:54 AMprivate class MappedStateFlow<T, R>(
private val originalFlow: StateFlow<T>,
private val transform: (T) -> R,
) : StateFlow<R> {
@Volatile
private var valuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)
override val value: R
get() = valuePair.second
override val replayCache: List<R>
get() = listOf(value)
private val mutex = Mutex()
override suspend fun collect(collector: FlowCollector<R>): Nothing {
var latestValue: R? = null
var isFirstValue = true
suspend fun emitIfNecessary(value: R) {
if (isFirstValue || latestValue !== value) {
isFirstValue = false
latestValue = value
collector.emit(value)
}
}
originalFlow.collectLatest { originalValue ->
mutex.withLock {
if (originalValue === valuePair.first) {
emitIfNecessary(valuePair.second)
return@collectLatest
}
}
val transformedValue = transform(originalValue)
mutex.withLock {
valuePair = originalValue to transformedValue
emitIfNecessary(transformedValue)
}
}
throw IllegalStateException("Should not get here")
}
}
ephemient
01/22/2025, 8:55 AMephemient
01/22/2025, 8:56 AMMark
01/22/2025, 9:26 AMprivate class MappedStateFlow<T, R>(
private val originalFlow: StateFlow<T>,
private val transform: (T) -> R,
) : StateFlow<R> {
@Volatile
private var latestValuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)
private fun transformValueIfNecessary(originalValue: T): R {
val (latestOriginalValue, latestTransformedValue) = latestValuePair
return if (latestOriginalValue === originalValue) {
latestTransformedValue
} else {
transform(originalValue).also { transformedValue ->
// only update the latest pair if originalFlow value unchanged
if (originalValue === originalFlow.value) {
latestValuePair = originalValue to transformedValue
}
}
}
}
override val value: R
get() = transformValueIfNecessary(originalFlow.value)
override val replayCache: List<R>
get() = listOf(value)
private val transformedFlow: Flow<R> = originalFlow
.map(::transformValueIfNecessary)
.distinctUntilChanged()
override suspend fun collect(collector: FlowCollector<R>): Nothing {
transformedFlow.collect { transformedValue ->
collector.emit(transformedValue)
}
error("Should not get here")
}
}
Ideally the transformedFlow
would be a SharedFlow
, but we can’t have that because no CoroutineScope
.ephemient
01/22/2025, 10:15 AMso overall you can do this but I'd not call it ayou can create a custom type that provides whatever you actually need, and then you aren't breaking anything with non-conformance to the StateFlow APIStateFlow
Mark
01/22/2025, 10:31 AMAbstractFlow
and override collectSafely()
instead?ephemient
01/22/2025, 10:33 AMMark
01/22/2025, 10:33 AM/**
* Discussion: <https://kotlinlang.slack.com/archives/C1H43FDRB/p1737516319105979>
*/
fun <T, R> StateFlow<T>.mapState(transform: (T) -> R): ValuedFlow<R> {
return MappedStateFlow(this, transform)
}
interface ValuedFlow<T>: Flow<T> {
val value: T
}
@OptIn(ExperimentalCoroutinesApi::class)
private class MappedStateFlow<T, R>(
private val originalFlow: StateFlow<T>,
private val transform: (T) -> R,
) : AbstractFlow<R>(), ValuedFlow<R> {
@Volatile
private var latestValuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)
private fun transformIfNecessary(originalValue: T): R {
val (latestOriginalValue, latestTransformedValue) = latestValuePair
return if (latestOriginalValue === originalValue) {
latestTransformedValue
} else {
transform(originalValue).also { transformedValue ->
// only update the latest pair if originalFlow value unchanged
if (originalValue === originalFlow.value) {
latestValuePair = originalValue to transformedValue
}
}
}
}
override val value: R
get() = transformIfNecessary(originalFlow.value)
private val transformedFlow: Flow<R> = originalFlow
.map(::transformIfNecessary)
.distinctUntilChanged()
override suspend fun collectSafely(collector: FlowCollector<R>) {
transformedFlow.collect { transformedValue ->
collector.emit(transformedValue)
}
}
}
Mark
01/22/2025, 10:44 AMValuedFlow
isn’t a thing (and extended by StateFlow
), because otherwise I need to have my own:
@Composable
fun <T> ValuedFlow<T>.collectAsStateWithLifecycle(
lifecycleOwner: LifecycleOwner = LocalLifecycleOwner.current,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
context: CoroutineContext = EmptyCoroutineContext
): State<T> = collectAsStateWithLifecycle(
initialValue = this.value,
lifecycle = lifecycleOwner.lifecycle,
minActiveState = minActiveState,
context = context
)
Although I suppose you could argue that this implementation only works properly with a real StateFlow
.Mark
01/22/2025, 12:04 PMclass MappedStateFlow<T, R>(
private val originalFlow: StateFlow<T>,
private val transform: (T) -> R,
) : AbstractFlow<R>(), ValuedFlow<R>
ephemient
01/22/2025, 12:46 PMinterface ValuedFlow<out T> : Flow<T> {
val value: T
}
private class MappedStateFlow<T, R>(...) : ValuedFlow<R>, AbstractFlow<R>() {
...
}
fun <T, R> StateFlow<T>.map(block: (T) -> R): ValuedFlow<R> =
MappedStateFlow(this, block)
Mark
01/22/2025, 12:47 PMephemient
01/22/2025, 12:48 PMMark
01/22/2025, 12:48 PMephemient
01/22/2025, 12:49 PMephemient
01/22/2025, 12:49 PMMark
01/22/2025, 12:50 PMephemient
01/22/2025, 12:50 PMMark
01/22/2025, 12:50 PMMark
01/22/2025, 12:51 PM