Arun Joseph
04/03/2022, 12:51 PMSharedFlow
only to emit to its first subscriber?Michael Marshall
04/05/2022, 1:48 AMArun Joseph
04/05/2022, 4:30 AMcallbackFlow
and shareIn
to keep the listeners registered as long as there are subscribers. But only would like to have only one subscriber collects at a time.Michael Marshall
04/05/2022, 4:37 AMArun Joseph
04/05/2022, 4:45 AMval locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> =
callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
val onLocationUpdateListener = OnLocationUpdateListener { location ->
trySend(location.toGeoLocationEntity().toOk())
}
locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.enable()
awaitClose {
locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.disable()
}
}.shareIn(shareInScope,
replay = 0,
started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
)
private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
locationFlow.map { location ->
location.andThen { geoLocationEntity ->
engine.updateLocation(
geoLocationEntity
)
}
}
fun UI element 1() = runBlocking {
launch{
keepEngineRunning().collect()
}
}
fun UI element 2() = runBlocking {
launch{
keepEngineRunning().collect()
}
}
keepEngineRunning
but keeps collecting it to make sure engine is running. But I would like to prevent engine gets same location twice.Nick Allen
04/05/2022, 6:23 AMshareIn
on keepEngineRunning
as well. It'll only run once for any number of subscribers since it’s shared.Arun Joseph
04/05/2022, 6:43 AMkeepEngineRunning
is in domain layer(clean arch) and doesn't receive a scope for shareIn
engine
and location
repository as one.Michael Marshall
04/05/2022, 6:46 AMChannel
with receiveAsFlow
will ensure only one engine gets each emission, but they will be evenly distributed among engines using a fan out method.Arun Joseph
04/05/2022, 6:56 AMSharedFlow
to Channel
. Then i can reuse existing SharedFlow
for other purposesMichael Marshall
04/05/2022, 6:57 AMprivate val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
// Flow will emit one element to one collector only
val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()
init {
callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
val onLocationUpdateListener = OnLocationUpdateListener { location ->
trySend(location.toGeoLocationEntity().toOk())
}
locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.enable()
awaitClose {
locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.disable()
}
} // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
.shareIn(
scope = shareInScope,
replay = 0,
started = SharingStarted.WhileSubscribed()
)
.onEach(locationChannel::send) // This sends each emission into the channel
}
There may be a better way using just flows though.Arun Joseph
04/05/2022, 7:39 AMSharingStarted.WhileSubscribed()
can't be used because there are no subscribers for callbackFlowSharingStarted.Eagerly
but then location updates happen even when there are no subscribers.Michael Marshall
04/05/2022, 8:13 AMprivate val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
// Flow will emit one element to one collector only
val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()
// Subscribe to this to start collection, but get the actual result from the other flow
val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
val onLocationUpdateListener = OnLocationUpdateListener { location ->
trySend(location.toGeoLocationEntity().toOk())
}
locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.enable()
awaitClose {
locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.disable()
}
} // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
.onEach(locationChannel::send) // This sends each emission into the channel
.shareIn(
scope = shareInScope,
replay = 0,
started = SharingStarted.WhileSubscribed()
)
fun UI element 1() = runBlocking {
launch{
locationFlowToSubscribeTo.collect()
keepEngineRunning().collect()
}
}
combine
to make it look like a single flow to the subscribersArun Joseph
04/05/2022, 12:52 PMprivate val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
// Flow will emit one element to one collector only
val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()
// Subscribe to this to start collection, but get the actual result from the other flow
val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
val onLocationUpdateListener = OnLocationUpdateListener { location ->
trySend(location.toGeoLocationEntity().toOk())
}
locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.enable()
awaitClose {
locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
locationEngine.disable()
}
} // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
.onEach(locationChannel::trySend) // This sends each emission into the channel
.shareIn(
scope = shareInScope,
replay = 0,
started = SharingStarted.WhileSubscribed()
)
private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
locationFlowToSubscribeTo.flowAlso(locationFlow).map { location ->
location.andThen { geoLocationEntity ->
engine.updateLocation(
geoLocationEntity
)
}
}
/**
* Extension to run another flow as long as this flow is collecting
*/
fun <S,T> Flow<S>.flowAlso(another: Flow<T>) = flow {
coroutineScope {
launch {
another.collect {}
}
this@flowAlso.collect {
emit(it)
}
}
}
onEach(locationChannel::trySend)