Is there an easy way convert a `SharedFlow` only t...
# flow
a
Is there an easy way convert a
SharedFlow
only to emit to its first subscriber?
m
What’s the purpose behind doing this?
a
The flow is sort of a side effect flow, taking data from one repository and deposits in another repository, the subscribers are UI elements and keeps this data transfer alive and emitting. I use
callbackFlow
and
shareIn
to keep the listeners registered as long as there are subscribers. But only would like to have only one subscriber collects at a time.
m
Perhaps sharing some of the code would help 🙂
a
Copy code
val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> =
    callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
        val onLocationUpdateListener = OnLocationUpdateListener { location ->
            trySend(location.toGeoLocationEntity().toOk())
        }
        locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
        locationEngine.enable()
        awaitClose {
            locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.disable()
        }
    }.shareIn(shareInScope,
        replay = 0,
        started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
    )

private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
    locationFlow.map { location ->
        location.andThen { geoLocationEntity ->
            engine.updateLocation(
                geoLocationEntity
            )
        }
    }


fun UI element 1() = runBlocking {
    launch{
        keepEngineRunning().collect()
    }
}

fun UI element 2() = runBlocking {
    launch{
        keepEngineRunning().collect()
    }
}
UI elements(Here UI elements are view models) do not use any data from
keepEngineRunning
but keeps collecting it to make sure engine is running. But I would like to prevent engine gets same location twice.
n
Use
shareIn
on
keepEngineRunning
as well. It'll only run once for any number of subscribers since it’s shared.
a
I my app,
keepEngineRunning
is in domain layer(clean arch) and doesn't receive a scope for
shareIn
I could move to Repository level then i would need
engine
and
location
repository as one.
m
Yeah using the UI elements to collect the flow directly (rather than within a piece of business logic or viewmodel) seems odd
Using a
Channel
with
receiveAsFlow
will ensure only one engine gets each emission, but they will be evenly distributed among engines using a fan out method.
a
Does this mean i shouldn't be using shared flow at all? Is there any easy method to map
SharedFlow
to
Channel
. Then i can reuse existing
SharedFlow
for other purposes
m
Using a channel might look like
Copy code
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    init {
        callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )
            .onEach(locationChannel::send) // This sends each emission into the channel
    }
There may be a better way using just flows though.
bump in case you didn’t see my edit 🙂
a
Hmmm. This way
SharingStarted.WhileSubscribed()
can't be used because there are no subscribers for callbackFlow
I tried
SharingStarted.Eagerly
but then location updates happen even when there are no subscribers.
m
Okay this is hacky but what if you did
Copy code
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    // Subscribe to this to start collection, but get the actual result from the other flow    
    val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .onEach(locationChannel::send) // This sends each emission into the channel
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )

fun UI element 1() = runBlocking {
    launch{
        locationFlowToSubscribeTo.collect()
        keepEngineRunning().collect()
    }
}
You might be able to use
combine
to make it look like a single flow to the subscribers
a
Copy code
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    // Subscribe to this to start collection, but get the actual result from the other flow    
    val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .onEach(locationChannel::trySend) // This sends each emission into the channel
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )

private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
    locationFlowToSubscribeTo.flowAlso(locationFlow).map { location ->
        location.andThen { geoLocationEntity ->
            engine.updateLocation(
                geoLocationEntity
            )
        }
    }

/**
 * Extension to run another flow as long as this flow is collecting
 */
fun <S,T> Flow<S>.flowAlso(another: Flow<T>) = flow {
    coroutineScope {
        launch {
            another.collect {}
        }
        this@flowAlso.collect {
            emit(it)
        }
    }
}
This works! Just needed to use
onEach(locationChannel::trySend)
🙌 1