https://kotlinlang.org logo
Docs
Join the conversationJoin Slack
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by Linen
coroutines
  • c

    christophsturm

    11/29/2021, 8:59 AM
    is there a kotlin pool library like commons-pool, maybe one that uses coroutines?
    g
    s
    +2
    • 5
    • 28
  • m

    Maciek

    11/29/2021, 3:09 PM
    what would be a recommended way of registering a callback for the scope cancellation (or all children)? Would the empty
    callbackFlow
    with
    awaitClose
    work here?
    fun CoroutineScope.onCancel(action: () -> Unit) {
            callbackFlow<Unit> { 
                awaitClose(action)
            }.launchIn(this)
        }
    j
    • 2
    • 5
  • m

    mcpiroman

    11/29/2021, 5:59 PM
    I need a simple way to send stateless notifications, currently I have:
    val focusRequests = MutableSharedFlow<Unit>(onBufferOverflow = BufferOverflow.DROP_LATEST, extraBufferCapacity = 1)
    focusRequests.tryEmit(Unit) // on frontend
    focusRequests.collect { ... } // on backend
    but I feel like this is an abuse of the SharedFlow. Is there more specific construct I should use in such cases?
    d
    j
    j
    • 4
    • 11
  • f

    frankelot

    11/29/2021, 7:01 PM
    Hi! How how would you go about creating an
    actor
    of buffer 1 that DROPS_OLD when the buffer is overflown. I’ve read the docs but I’m not sure I get it Is this it?
    actor<Int>(capacity = Channel.CONFLATED)
    • 1
    • 2
  • n

    natario1

    11/30/2021, 10:30 AM
    I often wish there was a channel with input and output types, something like
    Queue<In, Out>(process: suspend (In) -> Out, onBufferOverflow, capacity)
    where I can send input and suspend until output is ready:
    val out = queue.send(in)
    , use
    trySend
    and so on. I'm curious if it's just me, do you ever find yourself needing something similar and what do you use?
    j
    e
    • 3
    • 13
  • a

    Anvith

    11/30/2021, 12:33 PM
    What’s the equivalent of a UnicastSubject in Flow? SharedFlow with a buffer suspend might mimic emission behaviour however what about ensuring emissions aren’t missed if there are no collectors?
    j
    • 2
    • 1
  • s

    scana

    12/01/2021, 3:23 PM
    Could anyone please help me with understanding why
    MutableStateFlow
    would only emit last item in this example (running from Kotlin scratch file)?
    val x = MutableStateFlow("Test")
    CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        x.collect {
            println(it)
        }
    }
    
    x.value = "First"
    x.value = "Second"
    x.value = "Fourth"
    
    println("Waiting...")
    Thread.sleep(1000)
    Result:
    Waiting...
    Fourth
    j
    u
    • 3
    • 4
  • a

    ansman

    12/01/2021, 10:29 PM
    Is there any way using the new test helpers to not skip delays during a test but rather controlling them using a
    TestCoroutineScheduler
    ? I see
    TestCoroutineScheduler
    being used but delays are still being skipped which is super confusing to me as I don’t see the point of it if delays aren’t skipped
    a
    n
    • 3
    • 2
  • s

    Sergio Crespo Toubes

    12/02/2021, 10:45 AM
    Hello my locationmanager is using a kotlin channel with this code
    private val myLocationsChannel = Channel<MyLocation>(Channel.BUFFERED)
    val myLocationsFlow = myLocationsChannel.receiveAsFlow()
    How can i send an error to the channel? For example when i havent location permissions. Thanks
    j
    h
    • 3
    • 4
  • t

    Tim Malseed

    12/02/2021, 11:28 AM
    Hey, just wanted to run this by you guys and see what comes out.. I wanted a way to be able to define some ‘flags’ in the app - basically a bunch of compile-time defined, observable state holders. So, I came up with the following:
    data class Flag<T : Any>(val key: String, val description: String, val defaultValue: T)
    
    class FlagManager() {
    
        private val flagMap = mutableMapOf<Flag<out Any>, MutableStateFlow<Any>>()
    
        fun registerFlag(flag: Flag<Any>) {
            if (flagMap.containsKey(flag)) {
                throw IllegalStateException("Flag cannot be registered more than once")
            }
            flagMap[flag] = MutableStateFlow(flag.defaultValue)
        }
    
        fun <T : Any> getFlagState(flag: Flag<T>): StateFlow<T> {
            if (!flagMap.containsKey(flag)) {
                throw IllegalStateException("Flag not registered")
            }
    
            return (flagMap[flag] as MutableStateFlow<T>).asStateFlow()
        }
    
        fun <T : Any> updateFlagState(flag: Flag<T>, value: T) {
            if (!flagMap.containsKey(flag)) {
                throw IllegalStateException("Flag not registered")
            }
    
            (flagMap[flag] as MutableStateFlow<T>).value = value
        }
    }
    t
    • 2
    • 4
  • u

    ursus

    12/02/2021, 2:26 PM
    Im confused about the testing apis Am I expected to always inject the coroutine scope to stuff I want to test, even if I dont want to control time progress? Or is the correct way to make stuff synchronous/blocking with injecting Undispatched dispatcher driving my private scope? Asking for a friend who wants to test something like this
    class Syncer {
       private val scope = CoroutineScope(SupervisorJob() + <http://Dispatcher.Io|Dispatcher.Io>)
       private val _state = MutableStateFlow<State>(Idle)
       val state: Flow<State> get() = _state
       
       fun sync() {
          scope.launch {
             _state.value = Syncing
             someSuspendingApiAndDbStuff()
             _state.value = Synced
          }
       }
    }
    I want to test if I see correct
    state
    emissions if I call
    sync()
    s
    m
    • 3
    • 30
  • l

    Leonardo Borges

    12/02/2021, 3:04 PM
    I'm relative new to Kotlin and I'm trying to test a use case which has a suspend function. The function calls an API, which I'm mocking using
    mockK
    .
    class MyUseCase {
    	operator suspend fun invoke(val someId: UUID) = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    		val response = someAPI.requestDataById(someId)
    		// process the response...
    	}
    }
    Using JUnit5, there are a few exceptions I want to test, such as below:
    @Test
    fun testCustomExceptionIsThrown() {
    	coEvery { someAPI.requestDataById(someId) returns someData }
    	val thrown = assertThrows<CustomException> { 
    		runBlocking { myUseCase(someId) }
    	}
    	assertEquals("customExceptionMessage", thrown.message)
    }
    This is always returning an
    JobCancellationException: Parent job is Completed;
    and I actually can't figure out why. Also, is this the right way to use those libraries?
    x
    • 2
    • 1
  • u

    ursus

    12/03/2021, 1:08 AM
    do I need to cancel my coroutine scopes in tests if it seems the test process gets killed? or does it?
    n
    • 2
    • 9
  • y

    yschimke

    12/03/2021, 12:06 PM
    In general is it safe to assume the following are roughly similar in cost?
    suspend fun getX() : X
    fun getXFlow(): Flow<X>
    
    val a = getX()
    val b = getXFlow().first()
    Obviously the implementations may be different (one off query vs query and subscribe). But can the implementation optimise the call to first() which is the when the subscription happens and short circuit it? Othewise it's tempting to put both variants on most repository methods. But it would be nice if there was an expectation that first() should optimise to the suspend fun case.
    a
    • 2
    • 5
  • u

    ursus

    12/03/2021, 1:19 PM
    How would you test this?
    class Syncer(private val db: Db) {
       private val scope = CoroutineScope(SupervisorJob + Dispatchers Io)
       
       fun sync() {
          scope.launch {
             ...
             db.insertSomething()
          }
       }
    }
    I'd like to assert that
    db.readSomething()
    reads what it should after I call
    Syncer.sync()
    There is nothing to suspend on to know when the sync actually finished (like some status flow) Is injecting
    Dispatchers.Undispatched
    instead of
    Io
    to make it synchronous, so I can assert right after
    sync()
    returns, idiomatic? Is there something better?
    k
    s
    • 3
    • 5
  • v

    Victor Rendina

    12/03/2021, 5:12 PM
    Hello all. We make extensive use of flows and will allow exceptions to be thrown into a flow to notify downstream consumers that a problem has occurred. We have recently been noticing some crash reports where the stacktrace indicates the location that the exception was constructed in the flow, but not where the collector is that received the exception. Typically we will handle exceptions by applying a catch {} operator to the flow near the collection site, but it seems in this case we have not added a catch and are having difficulty determining exactly where the collector is that initiated the exceptional flow. Does anyone have any advice about how to trace the site of the collector that received the exception?
    l
    • 2
    • 5
  • d

    dimsuz

    12/03/2021, 5:35 PM
    Supposing I have a
    Flow<T>
    what would be the most idiomatic way to to the following:
    someFlow.collect { v ->
      process(v)
      if (isVeryFirstItem) {
        someSharedFlow.emit("received first item")
      }
    }
    I.e. I want to do a side-effect only once after receiving the first emission. I could make
    isVeryFirstFirstItem
    some kind of a volatile or AtomicBoolean, but somehow I don't like this.
    v
    • 2
    • 2
  • d

    David W

    12/03/2021, 10:05 PM
    Why does the first block not print anything, but the second prints "launch launch"?
    fun main() {
        val scope = CoroutineScope(Job())
        scope.launch { println("launch") }
    }
    fun main() {
        val scope = CoroutineScope(Job())
        scope.launch { println("launch") }
        scope.launch { println("launch") }
    }
    j
    • 2
    • 21
  • d

    David W

    12/03/2021, 11:14 PM
    Am I right in thinking that this, which I totally didn't write, will permanently tie up one of the few Default threads, and that four of these running on a 4-core machine would prevent any coroutines from running on Default?
    val someStateFlow: StateFlow<Unit> = ...
    GlobalScope.launch(Dispatchers.Default) { someStateFlow.collect { ... } }
    j
    • 2
    • 6
  • d

    David W

    12/03/2021, 11:30 PM
    My coroutine education continues...any ideas on why this code prints nothing:
    refreshScope = CoroutineScope(Job())
                    refreshScope.launch(Dispatchers.Default) {
                        println("Clicked Refresh button pntln.")
                    }
    but this produces text?
    refreshScope = CoroutineScope(Job())
                    refreshScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                        println("Clicked Refresh button pntln.")
                    }
    The only difference is the Dispatcher (IO works, Default doesn't). The issue is somewhere else in my code, I expect, and there's far too much to include; both code snippets work in a vacuum. I'm just hoping for a direction to start looking.
    j
    n
    • 3
    • 22
  • t

    tseisel

    12/04/2021, 8:52 PM
    Hi everyone ! I have a large number of flows and only want to collect up to N flows at once. Is there a way to apply some kind of LRU caching of flows and stop collecting them after being evicted from the cache ? I previously implemented that using `BroadcastChannel`s but since they are deprecated I have to find a solution with
    SharedFlow
    , which has no
    close
    method...
    d
    j
    • 3
    • 2
  • a

    Ayfri

    12/06/2021, 6:19 AM
    Hi, is is possible to uses coroutines to perform big operations like comparing every pixels colors of one image to multiple to find the closest image from all the other ones in a parallel way ? (Sort of multi-threading)
    :yes: 2
    z
    j
    • 3
    • 15
  • u

    ursus

    12/07/2021, 3:14 AM
    With the new dispatcher slicing
    <http://Dispatchers.IO|Dispatchers.IO>.limitedParallelism(1)
    does it guaranteed I'll get the same thread always? (Its for testing purposes, where I want to replace Main dispatcher with something single threaded)
    g
    • 2
    • 4
  • m

    MRSasko

    12/07/2021, 7:44 AM
    How can I emit the result to the consumer from a different function. When I try the following nothing happens?
    fun fetchInfo(): Flow<State<SportInfo>> =
        flow {
          val result = memoryInfoResult
          if (result != null) {
            emit(result)
          } else {
            fetchSportInfo()
            .onEach { result ->
              when (result) {
                is Data -> {
                  memoryInfoResult = result
                  emit(result)
                }
                is Error, is Loading -> {
                  emit(result)
                }
              }
            }
          }
        }
    n
    • 2
    • 6
  • m

    Mikołaj Karwowski

    12/07/2021, 11:22 AM
    I'm somewhat confused by 'advanceTimeBy' mechanics in kotlin tests using runBlockingTest Modifying a bit code present here, in the documentation: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-test/kotlinx.coroutines.test/-delay-controller/advance-time-by.html To something like that:
    @Test
        fun advanceTimeTest() = runBlockingTest {
            foo()
            advanceTimeBy(2_000)  // advanceTimeBy(2_000) will progress through the first two delays
            println("Main 1")
            // virtual time is 2_000, next resume is at 2_001
            advanceTimeBy(2)      // progress through the last delay of 501 (note 500ms were already advanced)
            println("Main 2")
            // virtual time is 2_0002
        }
    
        fun CoroutineScope.foo() {
            launch {
                println("Foo 1")
                delay(1_000)    // advanceTimeBy(2_000) will progress through this delay (resume @ virtual time 1_000)
                // virtual time is 1_000
                println("Foo 2")
                delay(500)      // advanceTimeBy(2_000) will progress through this delay (resume @ virtual time 1_500)
                // virtual time is 1_500
                println("Foo 3")
                delay(501)      // advanceTimeBy(2_000) will not progress through this delay (resume @ virtual time 2_001)
                // virtual time is 2_001
            }
        }
    The result is:
    Foo 1
    Foo 2
    Foo 3
    Main 1
    Main 2
    Shouldn't it be: Foo1, Foo2, Main1, Foo3, Main2 ?
    j
    • 2
    • 3
  • p

    Pedro Alberto

    12/07/2021, 2:24 PM
    on question not sure if you know the reason: so in android we use StateFlow and usually we use MutableStateFlow but the fragment only has the readonly so we created a function like this
    val <T> MutableStateFlow<T>.readOnly
        get() = this as StateFlow<T>
    now I realised there is a function called asStateFlow() but when you go into asStateFlow the function does something different
    public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
        ReadonlyStateFlow(this, null)
    m
    a
    • 3
    • 14
  • m

    mkrussel

    12/07/2021, 5:13 PM
    I'm seeing strange behavior with suspend functions that return
    Result
    . I can see that the body of the suspend function is returning what I expect. The
    toString
    return
    Success(null)
    or something like that. The caller of the suspend function is then getting an object with a toString of
    Success(Success(null))
    . When I return that from another suspend function it does not get wrapped a third time and still logs as
    Success(Success(null))
    . This is running on Android with Kotlin
    1.5.31
    and coroutines
    1.5.2-native-mt
    i
    • 2
    • 2
  • g

    groostav

    12/07/2021, 8:58 PM
    So my nightmare: • added a library that needs kotlinx-coroutines 1.4, was previously on 1.3. IntelliJ can no longer run the debugger because "Failed to find Premain-Class manifest attribute on kotlinx-coroutines-1.4.2" • "what the hell, I'm on java 13 with some half baked libs, time for an update", so I move to kotlin 1.5, kotlinx-coroutines 1.5, Java 17, | ◦ this blows up for many reasons. I work though most problems. ◦ im using zulu 17, a commuinty build of openJDK that bundles javafx as a jmod. IntelliJ's own highlighting understands this, and my ant build scripts understand it, but intelliJ's build button does not. what do I need to do to get intelliJ to build a java+kotlin project where javafx is from a jmod in the SDK? I've got an old cyclic dependency in my project that I'm sure isn't helping, so I've reverted all my work to sort that out first. any help is much appreciated, this is really really not fun.
    l
    • 2
    • 1
  • d

    dimsuz

    12/08/2021, 12:57 PM
    I have a "service" which has a CoroutineScope inside and I want clients to be able to "dispose" this service, but I want Scope to be an implementation detail. But I'm not against returning some kind of other disposable object. I'd settle with
    interface Service {
      fun start(): Job
    }
    
    val job = myServiceInstance.start()
    job.cancel()
    is it ok to implement start like
    fun start() { val scope = createScope(); return scope.coroutineContext[Job] }
    Or is there some better way?
    s
    e
    • 3
    • 8
  • d

    dimsuz

    12/08/2021, 2:46 PM
    I happened to check sources of
    Flow.collectIndexed
    and it's implemented like this:
    collect(object : FlowCollector<T> {
        private var index = 0
        override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
    })
    I'm curious, why is this thread-safe? Can't it happen so that
    index
    is initialized on one thread and incremented in another? (if some
    withContext
    will somehow be used by client) Or are there some invariants which are at play here?
    l
    • 2
    • 2
Powered by Linen
Title
d

dimsuz

12/08/2021, 2:46 PM
I happened to check sources of
Flow.collectIndexed
and it's implemented like this:
collect(object : FlowCollector<T> {
    private var index = 0
    override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
})
I'm curious, why is this thread-safe? Can't it happen so that
index
is initialized on one thread and incremented in another? (if some
withContext
will somehow be used by client) Or are there some invariants which are at play here?
l

louiscad

12/08/2021, 4:29 PM
Because flow collection is all sequential, there's no parallel calls happening, and emit can only be called in the same coroutineContext, can't call it from another place on the same FlowCollector instance.
d

dimsuz

12/08/2021, 4:42 PM
oh, right. it's all "coupled" together here.
View count: 7