https://kotlinlang.org logo
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
benchmarks
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
confetti
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hiring-french
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
lincheck
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by
Title
a

Arun Joseph

04/03/2022, 12:51 PM
Is there an easy way convert a
SharedFlow
only to emit to its first subscriber?
m

Michael Marshall

04/05/2022, 1:48 AM
What’s the purpose behind doing this?
a

Arun Joseph

04/05/2022, 4:30 AM
The flow is sort of a side effect flow, taking data from one repository and deposits in another repository, the subscribers are UI elements and keeps this data transfer alive and emitting. I use
callbackFlow
and
shareIn
to keep the listeners registered as long as there are subscribers. But only would like to have only one subscriber collects at a time.
m

Michael Marshall

04/05/2022, 4:37 AM
Perhaps sharing some of the code would help 🙂
a

Arun Joseph

04/05/2022, 4:45 AM
val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> =
    callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
        val onLocationUpdateListener = OnLocationUpdateListener { location ->
            trySend(location.toGeoLocationEntity().toOk())
        }
        locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
        locationEngine.enable()
        awaitClose {
            locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.disable()
        }
    }.shareIn(shareInScope,
        replay = 0,
        started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
    )

private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
    locationFlow.map { location ->
        location.andThen { geoLocationEntity ->
            engine.updateLocation(
                geoLocationEntity
            )
        }
    }


fun UI element 1() = runBlocking {
    launch{
        keepEngineRunning().collect()
    }
}

fun UI element 2() = runBlocking {
    launch{
        keepEngineRunning().collect()
    }
}
UI elements(Here UI elements are view models) do not use any data from
keepEngineRunning
but keeps collecting it to make sure engine is running. But I would like to prevent engine gets same location twice.
n

Nick Allen

04/05/2022, 6:23 AM
Use
shareIn
on
keepEngineRunning
as well. It'll only run once for any number of subscribers since it’s shared.
a

Arun Joseph

04/05/2022, 6:43 AM
I my app,
keepEngineRunning
is in domain layer(clean arch) and doesn't receive a scope for
shareIn
I could move to Repository level then i would need
engine
and
location
repository as one.
m

Michael Marshall

04/05/2022, 6:46 AM
Yeah using the UI elements to collect the flow directly (rather than within a piece of business logic or viewmodel) seems odd
Using a
Channel
with
receiveAsFlow
will ensure only one engine gets each emission, but they will be evenly distributed among engines using a fan out method.
a

Arun Joseph

04/05/2022, 6:56 AM
Does this mean i shouldn't be using shared flow at all? Is there any easy method to map
SharedFlow
to
Channel
. Then i can reuse existing
SharedFlow
for other purposes
m

Michael Marshall

04/05/2022, 6:57 AM
Using a channel might look like
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    init {
        callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )
            .onEach(locationChannel::send) // This sends each emission into the channel
    }
There may be a better way using just flows though.
bump in case you didn’t see my edit 🙂
a

Arun Joseph

04/05/2022, 7:39 AM
Hmmm. This way
SharingStarted.WhileSubscribed()
can't be used because there are no subscribers for callbackFlow
I tried
SharingStarted.Eagerly
but then location updates happen even when there are no subscribers.
m

Michael Marshall

04/05/2022, 8:13 AM
Okay this is hacky but what if you did
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    // Subscribe to this to start collection, but get the actual result from the other flow    
    val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .onEach(locationChannel::send) // This sends each emission into the channel
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )

fun UI element 1() = runBlocking {
    launch{
        locationFlowToSubscribeTo.collect()
        keepEngineRunning().collect()
    }
}
You might be able to use
combine
to make it look like a single flow to the subscribers
a

Arun Joseph

04/05/2022, 12:52 PM
private val locationChannel = Channel<Either<GeoLocationEntity, LocationFailure>>()
    
    // Flow will emit one element to one collector only 
    val locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>> = locationChannel.receiveAsFlow()

    // Subscribe to this to start collection, but get the actual result from the other flow    
    val locationFlowToSubscribeTo = callbackFlow<Either<GeoLocationEntity, LocationFailure>> {
            val onLocationUpdateListener = OnLocationUpdateListener { location ->
                trySend(location.toGeoLocationEntity().toOk())
            }
            locationEngine.addOnLocationUpdateListener(onLocationUpdateListener)
            locationEngine.enable()
            awaitClose {
                locationEngine.removeOnLocationUpdateListener(onLocationUpdateListener)
                locationEngine.disable()
            }
        } // Using shareIn ensures you're only setting up this listener once, and sharing the emissions
            .onEach(locationChannel::trySend) // This sends each emission into the channel
            .shareIn(
                scope = shareInScope,
                replay = 0,
                started = SharingStarted.WhileSubscribed()
            )

private fun keepEngineRunning(locationFlow: Flow<Either<GeoLocationEntity, LocationFailure>>): Flow<Either<Unit, Failure>> =
    locationFlowToSubscribeTo.flowAlso(locationFlow).map { location ->
        location.andThen { geoLocationEntity ->
            engine.updateLocation(
                geoLocationEntity
            )
        }
    }

/**
 * Extension to run another flow as long as this flow is collecting
 */
fun <S,T> Flow<S>.flowAlso(another: Flow<T>) = flow {
    coroutineScope {
        launch {
            another.collect {}
        }
        this@flowAlso.collect {
            emit(it)
        }
    }
}
This works! Just needed to use
onEach(locationChannel::trySend)
🙌 1