For producing a `StateFlow` from another `StateFlo...
# codereview
m
For producing a
StateFlow
from another
StateFlow
without requiring a
CoroutineScope
(so no new coroutine):
Copy code
private class MappedStateFlow<T, R>(
    private val original: StateFlow<T>,
    private val transform: (T) -> R
) : StateFlow<R> {
    override val replayCache: List<R>
        get() = original.replayCache.map(transform)

    override val value: R
        get() = transform(original.value)

    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        original.collect { collector.emit(transform(it)) }
    }
}

fun <T, R> StateFlow<T>.mapState(transform: (T) -> R): StateFlow<R> {
    return MappedStateFlow(this, transform)
}
e
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/
Not stable for inheritance
The StateFlow interface is not stable for inheritance in 3rd party libraries
m
Yes I saw that, but can't I just deal with that as and when it arises?
e
if you have any downstream consumers, they may end up with a new version of kotlinx.coroutines which your implementation would be incompatible with
if you're only in a single project then it's not so much of a concern
your transform may run multiple times on the same value
Copy code
mapped.collect {
    it === mapped.value // may be false
}
and it may also violate the documented conclusion equality-based conflation
so overall you can do this but I'd not call it a
StateFlow
m
Good point regarding the transform. How about this? Should a Mutex be used?
Copy code
private class MappedStateFlow<T, R>(
    private val original: StateFlow<T>,
    private val transform: (T) -> R,
) : StateFlow<R> {
    @Volatile
    private var _value: R = transform(original.value)

    override val replayCache: List<R>
        get() = listOf(_value)

    override val value: R
        get() = _value

    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        original.collect {
            _value = transform(it)
            collector.emit(_value)
        }
    }
}
or with using a mutex:
Copy code
private val mutex = Mutex()
    
    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        original.collectLatest {
            val transformedValue = transform(it)
            mutex.withLock { // to ensure the value and emitted values are consistent
                _value = transformedValue
                collector.emit(transformedValue)
            }
        }
        throw IllegalStateException("Should not get here")
    }
e
that doesn't fix it when you have multiple subscribers and still doesn't handle conflation the same way as
StateFlow
is documented. which is maybe fine for your use case but it should not be a
StateFlow
.
m
regarding multiple subscribers, couldn’t you also store the original value and compare that in order to avoid unnecessary transform? And for conflation, keep track of the latest value for each subscriber.
Copy code
private class MappedStateFlow<T, R>(
    private val originalFlow: StateFlow<T>,
    private val transform: (T) -> R,
) : StateFlow<R> {
    @Volatile
    private var valuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)

    override val value: R
        get() = valuePair.second

    override val replayCache: List<R>
        get() = listOf(value)

    private val mutex = Mutex()

    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        var latestValue: R? = null
        var isFirstValue = true
        suspend fun emitIfNecessary(value: R) {
            if (isFirstValue || latestValue !== value) {
                isFirstValue = false
                latestValue = value
                collector.emit(value)
            }
        }
        originalFlow.collectLatest { originalValue ->
            mutex.withLock {
                if (originalValue === valuePair.first) {
                    emitIfNecessary(valuePair.second)
                    return@collectLatest
                }
            }
            val transformedValue = transform(originalValue)
            mutex.withLock {
                valuePair = originalValue to transformedValue
                emitIfNecessary(transformedValue)
            }
        }
        throw IllegalStateException("Should not get here")
    }
}
e
you could, and that may deadlock if a collector is on a dispatcher that pauses (like Android's lifecycle scopes) or if a collector re-emits
the original implementation is structured to avoid those issues and it's complex
m
Probably not particularly safe, but:
Copy code
private class MappedStateFlow<T, R>(
    private val originalFlow: StateFlow<T>,
    private val transform: (T) -> R,
) : StateFlow<R> {
    @Volatile
    private var latestValuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)

    private fun transformValueIfNecessary(originalValue: T): R {
        val (latestOriginalValue, latestTransformedValue) = latestValuePair
        return if (latestOriginalValue === originalValue) {
            latestTransformedValue
        } else {
            transform(originalValue).also { transformedValue ->
                // only update the latest pair if originalFlow value unchanged
                if (originalValue === originalFlow.value) {
                    latestValuePair = originalValue to transformedValue
                }
            }
        }
    }

    override val value: R
        get() = transformValueIfNecessary(originalFlow.value)

    override val replayCache: List<R>
        get() = listOf(value)

    private val transformedFlow: Flow<R> = originalFlow
        .map(::transformValueIfNecessary)
        .distinctUntilChanged()

    override suspend fun collect(collector: FlowCollector<R>): Nothing {
        transformedFlow.collect { transformedValue ->
            collector.emit(transformedValue)
        }
        error("Should not get here")
    }
}
Ideally the
transformedFlow
would be a
SharedFlow
, but we can’t have that because no
CoroutineScope
.
e
to expand on my earlier statement,
so overall you can do this but I'd not call it a
StateFlow
you can create a custom type that provides whatever you actually need, and then you aren't breaking anything with non-conformance to the StateFlow API
m
Makes sense. So just extend
AbstractFlow
and override
collectSafely()
instead?
e
that would be a safe way to handle it, yes
m
Thanks for your help
Copy code
/**
 * Discussion: <https://kotlinlang.slack.com/archives/C1H43FDRB/p1737516319105979>
 */
fun <T, R> StateFlow<T>.mapState(transform: (T) -> R): ValuedFlow<R> {
    return MappedStateFlow(this, transform)
}

interface ValuedFlow<T>: Flow<T> {
    val value: T
}

@OptIn(ExperimentalCoroutinesApi::class)
private class MappedStateFlow<T, R>(
    private val originalFlow: StateFlow<T>,
    private val transform: (T) -> R,
) : AbstractFlow<R>(), ValuedFlow<R> {
    @Volatile
    private var latestValuePair: Pair<T, R> = originalFlow.value to transform(originalFlow.value)

    private fun transformIfNecessary(originalValue: T): R {
        val (latestOriginalValue, latestTransformedValue) = latestValuePair
        return if (latestOriginalValue === originalValue) {
            latestTransformedValue
        } else {
            transform(originalValue).also { transformedValue ->
                // only update the latest pair if originalFlow value unchanged
                if (originalValue === originalFlow.value) {
                    latestValuePair = originalValue to transformedValue
                }
            }
        }
    }

    override val value: R
        get() = transformIfNecessary(originalFlow.value)

    private val transformedFlow: Flow<R> = originalFlow
        .map(::transformIfNecessary)
        .distinctUntilChanged()

    override suspend fun collectSafely(collector: FlowCollector<R>) {
        transformedFlow.collect { transformedValue ->
            collector.emit(transformedValue)
        }
    }
}
Actually, it’s a shame
ValuedFlow
isn’t a thing (and extended by
StateFlow
), because otherwise I need to have my own:
Copy code
@Composable
fun <T> ValuedFlow<T>.collectAsStateWithLifecycle(
    lifecycleOwner: LifecycleOwner = LocalLifecycleOwner.current,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    context: CoroutineContext = EmptyCoroutineContext
): State<T> = collectAsStateWithLifecycle(
    initialValue = this.value,
    lifecycle = lifecycleOwner.lifecycle,
    minActiveState = minActiveState,
    context = context
)
Although I suppose you could argue that this implementation only works properly with a real
StateFlow
.
As a side note, isn’t it subpar that <T> has to be exposed here because once the class is instantiated, T is no longer relevant externally:
Copy code
class MappedStateFlow<T, R>(
    private val originalFlow: StateFlow<T>,
    private val transform: (T) -> R,
) : AbstractFlow<R>(), ValuedFlow<R>
e
Copy code
interface ValuedFlow<out T> : Flow<T> {
    val value: T
}

private class MappedStateFlow<T, R>(...) : ValuedFlow<R>, AbstractFlow<R>() {
    ...
}

fun <T, R> StateFlow<T>.map(block: (T) -> R): ValuedFlow<R> =
    MappedStateFlow(this, block)
m
Isn’t that the same but with different names (as well as the “out”)?
e
you don't need to expose the class
m
Right, and I’m not doing that because of that limitation. But do you see my point that T and R are quite different?
e
no
the built in functions do the same things
m
once the class is instantiated, the T generic (not the R generic) is meaningless externally and this forces us to use an interface.
e
and so the return type can be an interface which doesn't mention T
m
yes, that’s the workaround. But sub par
what you want is for the language to support an internal generic