https://kotlinlang.org logo
Docs
Join the conversationJoin Slack
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by Linen
coroutines
  • d

    Dmitry Khalanskiy [JB]

    06/20/2022, 12:18 PM
    📣 📣 📣 kotlinx.coroutines 1.6.3 is here! This is a tiny release that mostly just fixes an issue with building JS IR projects that use coroutines 1.6.2.
    :party-parrot: 8
    👍 4
    👍🏾 2
  • k

    Ky

    06/20/2022, 7:05 PM
    Does Ktor’s
    HttpClient
    kick off it’s own coroutine to make requests? And do we have any control over that coroutineContext? I’m having trouble getting ktors HttpClient to behave nicely in regards to synchronization. Unless I wrap the request in
    withContext(runTest.coroutineContext)
    my tests do not wait for ktor request to return and the test will complete prematurely
  • n

    Norbi

    06/21/2022, 7:12 AM
    Is it OK to call
    runBlocking()
    on each HTTP request? Does it degrade performance a lot? Thanks.
    i
    y
    4 replies · 3 participants
  • n

    Nino

    06/21/2022, 12:02 PM
    Hey, I'm trying to unit test a callbackFlow and I feel like I'm missing something. It's the kind of "snake biting its tail" problem. I found a working solution with
    onStart
    and
    delay
    but it's smelly. Source code:
    // I don't control the ConnectivityManager, it's from Android
    class ConnectivityRepository(private val connectivityManager: ConnectivityManager) {
    
        fun isInternetAvailableFlow(): Flow<Boolean> = callbackFlow {
            val networkCallback = object : ConnectivityManager.NetworkCallback() {
                override fun onAvailable(network: Network) {
                    trySend(true)
                }
    
                override fun onLost(network: Network) {
                    trySend(false)
                }
            }
    
            connectivityManager.registerDefaultNetworkCallback(networkCallback)
    
            awaitClose { connectivityManager.unregisterNetworkCallback(networkCallback) }
        }
    }
    Unit test:
    @Test
        fun `happy path`() = runTest {
            // Given
            val connectivityRepository = ConnectivityRepository(connectivityManager)
    
            val networkCallbackSlot = slot<ConnectivityManager.NetworkCallback>() // Mocking stuff : I can 'capture' something during the test execution with this
            val connectivityManager = mockk<ConnectivityManager>() // Mocking the ConnectivityManager
            justRun { connectivityManager.registerDefaultNetworkCallback(capture(networkCallbackSlot)) } // This function is mocked and "wired" with the slot
            justRun { connectivityManager.unregisterNetworkCallback(any<ConnectivityManager.NetworkCallback>()) }
    
            // When
            val result = connectivityRepositoryImpl.isInternetAvailableFlow().onStart { // Ugly solution but nothing else works...
                launch {
                    delay(1)
                    networkCallbackSlot.captured.onAvailable(mockk())
                }
            }.first()
    
            // Then
            assertThat(result).isTrue()
        }
    I need to capture the anonymous class extending
    ConnectivityManager.NetworkCallback
    in order to call it with either
    onAvaible
    or
    onLost
    during my test, but at the same time, since this is a cold flow, it won't run until I collect it. But it won't emit something until I run
    onAvaible
    or
    onLost
    .
    onStart
    is too early, and
    collect
    is never called. I'd need a "afterStart" callback on my Flow or something like that ?
    m
    e
    8 replies · 3 participants
  • y

    Yuriy Dynnikov

    06/21/2022, 3:36 PM
    Hi everyone. Please point towards the truth. For some reason (I'll fix that later) an exception is thrown inside a coroutine. What's interesting for me is that after this exception all my coroutines, including the ones launched with
    GlobalScope.launch {...}
    , stop working. Looks like an exception gets its way to CoroutineScheduler and breaks it. How can I protect myself from this kind of behavior? Exception stacktrace:
    java.util.concurrent.CancellationException: The task was rejected
    	at kotlinx.coroutines.ExceptionsKt.CancellationException(Exceptions.kt:22)
    	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.cancelJobOnRejection(Executors.kt:169)
    	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:131)
    	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:159)
    	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
    	at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513)
    	at kotlinx.coroutines.AwaitAll$AwaitAllNode.invoke(Await.kt:115)
    	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1519)
    	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:323)
    	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:240)
    	at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:906)
    	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:863)
    	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:828)
    	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
    	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
    	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)
    	at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:39)
    	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
    	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
    Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@47310ad2[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5f40bc28[Wrapped task = CancellableContinuation(DispatchedContinuation[java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20], Continuation at core.MLock$invoke$2.invokeSuspend(MLock.kt:35)@1a367720]){Completed}@6f01d5c3]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]
    	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
    	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:128)
    	... 19 more
    j
    11 replies · 2 participants
  • l

    Lukas Lechner

    06/21/2022, 5:30 PM
    I created a new video about "Best Practices for using Kotlin Coroutines in Android Development". Maybe its helpful for someone!

    https://youtu.be/tVDCpjqQ1Ro▾

    🙌 3
    j
    s
    11 replies · 3 participants
  • i

    Igor Kolomiets

    06/21/2022, 5:45 PM
    Hello, can you please advise on good naming convention for extension functions that convert existing Java API based on CompletableFuture to Kotlin’s suspendible variant? For example, there is Java service:
    public interface FibonacciService {
        CompletableFuture<Long> nextFibonacci(Long number);
    }
    In my Kotlin code, I’d like to add extension function to this interface to expose
    nextFibonacci
    method as suspended function:
    suspend fun FibonacciService.nextFibonacci(number: Long): Long = nextFibonacci(number).asDeferred().await()
    Unfortunately, the code above doesn’t work, because
    nextFibonacci
    extension is shadowed by interface original method (I have to rename it to something different, e.g.
    coNextFibonacci
    ,
    suspndNextFibonacci
    ,
    deferredNextFibonacci
    ?). Is there a conventional prefix (or suffix) to distinguish suspendible variants? Also, would it be nice if Kotlin language allowed for
    suspend
    variants in cases like this (presence of CoroutineScope would allow to figure out which one to use)?
    e
    t
    +1
    5 replies · 4 participants
  • l

    liminal

    06/22/2022, 4:35 AM
    I am trying to test a repository function which returns Flow<List<item>>. I have a fake repository where I emit some data into MutableSharedFlow using myFlow.tryEmit(items). I wanted to test what happens if you throw exception while emitting into myFlow. How do I do that?
    n
    2 replies · 2 participants
  • a

    azabost

    06/22/2022, 7:40 PM
    Hey. In general, do I need to include a dependency on the coroutines library in a module to write a
    suspend fun
    ? I’m asking about it only because I’m trying to optimise some modules’ dependencies (e.g. get rid of unnecessary
    implementation
    /
    api
    dependencies)
    e
    n
    +1
    6 replies · 4 participants
  • c

    Chachako

    06/23/2022, 9:39 AM
    Hi everyone, I would like to know when the hang of
    channelFlow.collect
    will resume? If there is a task in the
    channelFlow
    block that doesn’t know when it will complete, when will the coroutines channel close? I’m a little curious what the criteria is for the channel to determine
    channelFlow
    completion, is it to wait a while without any
    send
    and then end the hang?
    s
    5 replies · 2 participants
  • f

    Fleshgrinder

    06/24/2022, 1:56 PM
    Hey 😊 maybe someone knows a nicer solution to a simple problem. I have two flows that are resolving things, the first one from the Internet, the second one from the filesystem. Hence, they are working with different hardware and can run both concurrently and in parallel. Now I need to wait for both to collect everything from their sources, because I want to perform some logic with the results of both. What I have:
    val result = awaitAll(
      async { networkFlow.toSet() },
      async { filesystemFlow.toList() },
    )
    val networkResult = result[0] as Set<NetworkResult>
    val filesystemResult = result[1] as List<FilesystemResult>
    This works exactly as intended, but the ceremony required to get it to work makes it seem even worse than threading and futures. I was hoping for at least …
    val (networkResult, filesystemResult) = awaitAll(
      async { networkFlow.toSet() },
      async { filesystemFlow.toList() },
    )
    … but this does not exist. Any other way to make this nicer? 🤔
    m
    y
    +3
    31 replies · 6 participants
  • g

    Grigory Panko

    06/24/2022, 8:37 PM
    Hi everyone. I'm in process of migrating my JVM application from Java 16 to Java 17. My application uses coroutines 1.6.2 and used to work fine on Java 16, but now it throws
    java.lang.IllegalAccessError: class kotlin.coroutines.jvm.internal.DebugProbesKt (in module kotlin.stdlib) cannot access class kotlinx.coroutines.debug.internal.DebugProbesImpl (in module kotlinx.coroutines.core.jvm) because module kotlin.stdlib does not read module kotlinx.coroutines.core.jvm
    on any coroutine launch. I guess it's because Java 17 removed support for
    --illegal-access
    option. I tried to use
    --add-reads kotlin.stdlib=kotlinx.coroutines.core.jvm
    option, but it shows
    WARNING: Unknown module: kotlin.stdlib specified to --add-reads
    on JVM init and still throws error on coroutines access. Any ideas how can I fix it?
  • s

    Slackbot

    06/25/2022, 9:17 AM
    This message was deleted.
    a
    n
    3 replies · 3 participants
  • s

    Sergei Grishchenko

    06/25/2022, 12:11 PM
    Hi, I am experimenting with coroutines and trying to implement some broadcast flow to separate work between few workers, here is what I have now
    private fun generateWork() = channelFlow {
        for (i in 1..10) {
            val page = "page$i"
            println("Generator sent $page")
            send(page)
        }
        close()
        println("Generator is closed")
    }
    
    private fun CoroutineScope.doWork(id: Int, flow: Flow<String>) = launch {
        flow.collect {
            println("Worker $id processed $it")
        }
        println("Worker $id finished")
    }
    
    suspend fun performWork() {
        try {
            coroutineScope {
                val workFlow: Flow<String?> = generateWork()
    
                val sharedWorkFlow = workFlow
                    .onCompletion { cause -> if (cause == null) emit(null) }
                    .shareIn(this, WhileSubscribed())
                    .takeWhile { it != null }
                    .filterNotNull()
    
                val workersCount = 10
    
                List(workersCount) { id ->
                    val workPartFlow = sharedWorkFlow
                        .withIndex()
                        .filter { (index, _) -> index % workersCount == id }
                        .map { (_, value) -> value }
    
                    doWork(id, workPartFlow)
                }.joinAll()
    
                cancel()
            }
        } catch (e: CancellationException) {
            println("Work is performed")
        }
    }
    So my questions are: 1. Is there possibility to implement it simpler? 2. Am I use Shared Flow correctly? 3. Are there ways to unsubscribe from Shared Flow aside from canceling of
    coroutineScope
    ? 4. Is there way to make Shared Flow finite aside from emitting some special value from
    onCompletion
    (it is null in my case) and use
    takeWhile
    to track it? Thank you
    n
    4 replies · 2 participants
  • a

    Arjan van Wieringen

    06/25/2022, 7:40 PM
    How do I test the following?
    @Test
        fun distributedTest2() = runTest {
            repeat(1000) {
                val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
                // state is mutablestateflow, updates is mutablesharedflow that also updates state
                
                val otherSource = uuid()
                updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
                updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
                
                /* What do do here to make sure I can assert `state` */
                assertEquals("Bat", state.value.value)
            }
        }
    Basically I have a MutableStateFlow
    state
    and a MutableSharedFlow
    updates
    . By emitting into
    updates
    I update, among other stuff, also the
    state
    . However, for the life of me I can not see how to test this. In an application this works as expected. Basically this is the
    distribute()
    function:
    data class DistributedMergeable<T : Mergeable<T>>(
        val states: MutableStateFlow<T>,
        val updates: MutableSharedFlow<Update<T>>
    ) {
        data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
    }
    
    fun <T : Mergeable<T>> T.distribute(
        updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
        scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
    ): DistributedMergeable<T> {
        val source = uuid()
        val states = MutableStateFlow(this)
    
        states.onEach { newState ->
            println("$newState")
            updates.emit(DistributedMergeable.Update(source, newState))
        }.launchIn(scope)
    
        updates.onEach { update ->
            if (update.source == source) {
                println("Same source: $update")
                return@onEach
            }
            if (update.value == states.value) {
                println("Same state: $update")
                return@onEach
            }
            val merged = states.value.merge(update.value)
            println("Merged: $update -> $merged")
            states.value = merged
        }.launchIn(scope)
    
        return DistributedMergeable(states, updates)
    }
    j
    n
    7 replies · 3 participants
  • c

    CLOVIS

    06/26/2022, 8:28 AM
    One thing I never understood: if I'm writing a class that needs to internally launch coroutines, should the constructor take a
    Job
    , a
    CoroutineScope
    , a
    CoroutineContext
    or something else entirely? The guide is fairly clear on what functions should accept, but I didn't find anywhere it mentions classes.
    j
    5 replies · 2 participants
  • o

    oday

    06/26/2022, 9:34 PM
    how would I turn this into something that uses coroutines?
    class DetermineAuthStatus @Inject constructor(
        private val firebaseAuth: FirebaseAuth,
        private val populateUserPrefs: PopulateUserPrefs
    ) {
        operator fun invoke() = firebaseAuth.currentUser?.let { currentUser ->
            val subject = SingleSubject.create<Boolean>()
            subject
                .hide()
                .doOnSubscribe {
                    currentUser.getIdToken(false)
                        .addOnSuccessListener { task ->
                            populateUserPrefs(currentUser, task.token)
                            subject.onSuccess(true)
                        }
                        .addOnFailureListener {
                            subject.onError(it)
                        }
                }
        } ?: Single.just(false)
    }
    r
    12 replies · 2 participants
  • b

    Brian Estrada

    06/26/2022, 9:39 PM
    Hey guys I’m trying to find an implementation of coroutines that allows me to keep a buffer of 10 items and keeps this buffer of 10 items first in first out ordering… i’ve been trying to use
    SharedFlow
    and
    StateFlow
    but none of these seem to keep the items in a buffer once they’re read they’re gone. I would normally use a BehaviorSubject in RxJava for this but haven’t been able to find the equvilant in coroutines Edit: Nevermind, the question
    MutableSharedFlow
    has the ability to do this (you can set it in the constructor and I missed that somehow) thanks
    1 reply · 1 participant
  • f

    franztesca

    06/26/2022, 11:02 PM
    Ciao guys, I found a weird behavior with the
    conflate
    flow operator and throwables... Basically, when throwing an exception from a
    flow
    that passes through a conflated operator, the re-thrown exception is different from the original one. It's reproducible with the following code:
    @Test
    fun myTest() = runBlocking {
        val throwable = Throwable()
        try {
            flow<Unit> { throw throwable }.conflate().collect()
        } catch (caught: Throwable) {
            // We expect the same exception that we are throwing 
            assert(throwable === caught) { "Expected $throwable, found $caught" }
            // Prints "Expected java.lang.Throwable, found java.lang.Throwable"
        }
    }
    The same test without the
    .conflate
    operator passes. Can you confirm that what I expect is the actual intended behavior and therefore this is a bug? I'm on :kotlin:
    1.7.0
    and coroutines
    1.6.3
    :thank-you:
  • b

    Berkeli Alashov

    06/27/2022, 6:11 AM
    Hello. I'm having trouble testing a simple ViewModel that exposes a state by combining MutableStateflows. Updating MutableStateflow value doesn't cause combined state to emit new value during testing (while works as expected when manually testing on device) Code with failing test comments in the thread🧵
    a
    4 replies · 2 participants
  • g

    Gopal S Akshintala

    06/27/2022, 1:53 PM
    I read coroutines can be serialized and deserialized to resume later. Are there any examples of persisting coroutine continuation is a DB to resume later by a different process? My use case is, I have a batch Job made of steps, and each step spawns an async process via MQ and suspends. The deque handler is supposed to resume/resurrect the batch process back, which can only happen if I can persist the batchJob state.
  • a

    Arjan van Wieringen

    06/27/2022, 2:16 PM
    Is there an idiomatic way to wait until a job is active? When I collect a hot flow it can be the case that between the ‘launchIn’ of the collect and a new emission to the hot flow the collector job hasn't started. Assume I have no ownership over the replay :)
    j
    a
    +1
    10 replies · 4 participants
  • r

    ritesh

    06/27/2022, 8:11 PM
    Was exploring stream api's in co-routine. I have a use-case where, i want to logout from app once the auth sdk, i am dependent on emits some event. In auth sdk, i have a created a channel
    val channel = Channel<Boolen>
    which emit a event when certain action happens and in the consumer application, i have this code sitting there in
    BaseActivity
    lifecycleScope.launch {
       repeatOnLifecycle(Lifecycle.State.STARTED) {
         if(AuthSDK.channel.receive){
           // logout from app
         }
       }
    }
    It works great, i was wondering if it's the correct way of doing it or if there's a better solution in co-routine stream world. • Reason why i didn't go with
    SharedFlow
    - events can be lost when producer produces but consumer moves to STOPPED state. • Current channel i am using is
    RENDEZVOUS
    channels, in this scenario, there is a guarantee for my event to be received and emitted, irrespective of lifecycle changes. • I realised instead of
    receive
    ,
    consumeAsFlow
    or
    receiveAsFlow
    can be used. As this is one-off event does it matter if don't. • Both sender and receiver channel co-routines are on main thread, that also means, both
    send
    and
    receive
    will be in suspended state. Is it a concern, is there a trade-off in doing so.
    n
    3 replies · 2 participants
  • t

    tylerwilson

    06/28/2022, 3:04 PM
    I just noticed there was a new coroutines 1.6.3-native-mt, and so tried updating my KMP module to it, but when building macosX64, I get the following error:
    Compilation failed: Internal compiler error: no implementation found for FUN DEFAULT_PROPERTY_ACCESSOR name:<get-thread> visibility:internal modality:ABSTRACT <> ($this:kotlinx.coroutines.CloseableCoroutineDispatcher) returnType:kotlinx.coroutines.Thread
    when building vtable for CLASS CLASS name:MultiWorkerDispatcher modality:FINAL visibility:private superTypes:[kotlinx.coroutines.CloseableCoroutineDispatcher]
    at /Users/administrator/Documents/agent/work/8d547b974a7be21f/ktor-utils/posix/src/io/ktor/util/CoroutineUtils.kt (0:0)
    CLASS CLASS name:MultiWorkerDispatcher modality:FINAL visibility:private superTypes:[kotlinx.coroutines.CloseableCoroutineDispatcher]
    
     * Source files: 
     * Compiler version info: Konan: 1.6.21 / Kotlin: 1.6.21
     * Output kind: FRAMEWORK
    The 1.6.1-native-mt works fine for me. Perhaps this version removed some platforms?
    b
    3 replies · 2 participants
  • r

    Remy Benza

    06/28/2022, 5:11 PM
    Question about the limitedParallelism API. Is the following assumption correct?
    val singleDispatcher = Dispatchers.IO.limitedParallism(1)
    
    suspend fun codeBlockA() = withContext(singleDispatcher) { .. }
    
    suspend fun codeBlockB() = withContext(singleDispatcher) { .. }
    Never in time will code inside functions
    codeBlockA
    and
    codeBlockB
    be running at the same time. Irrespective of when / how often they are invoked and/or what parent coroutine there are launched from. Correct?
    :yes: 1
    j
    l
    13 replies · 3 participants
  • z

    Zhanna Gorelova

    06/29/2022, 11:11 AM
    Hello! I am a bit confused with the case, how can I cancel the flow job, since in tests sometimes happens that after a 30 seconds the flow is still alive and does println(), but now other test is in processing. Since I pass context without job, there is no job and ensureActive returns true and I can’t cancel the job with flow. And context.cancel() as well cancels job, but there is no such in context. I thought that launch with flow should inherit parents Job, but what if there is no such? Thanks!
    class CoroutineTest {
        @Test
        fun `test`() = withContext { context ->
            // when
            SomethingWithFlow(context)
    
            //then
            assertNotNull(context[Job])
        }
    
        private fun withContext(context: CoroutineContext = <http://Dispatchers.IO|Dispatchers.IO>, block: suspend (CoroutineContext) -> Unit) {
            runBlocking(context) { block(context) }
        }
    
        class SomethingWithFlow(override val coroutineContext: CoroutineContext) : CoroutineScope {
            private val flow = MutableStateFlow<String>("Initial")
    
            init {
                launch {
                    flow.debounce(30_000L).collectLatest {
                        ensureActive()
                        println(it)
                    }
                }
            }
        }
    }
    n
    1 reply · 2 participants
  • j

    Jan Skrasek

    06/30/2022, 9:46 AM
    We have a SharedFlow to avoid duplicated resource fetching (network, parsing). When some runtime non-domain exception happen, we let the exception (e.g. IOException) to be thrown and processed in ViewModels (e.g. show no internet connection toast - i.e. a general handling is reused here). But SharedFlow does not distribute the exception downstream and rather crashes (throws it on its coroutine scope). What is the motivation here? Why not rethrow the exception for particular downstreams? We can wrap the stuff to some Result type but we wanted to avoid this for general runtime errors, we use it just for domain state modeling (e.g. error type is for ShortPassword error)
    m
    j
    +1
    5 replies · 4 participants
  • a

    Arjan van Wieringen

    07/01/2022, 5:23 PM
    What would be a good solution for bidirectional operations on
    MutableSharedFlow
    ? What I mean is, that I have a routine that both subscribes and emits to the flow. Why would I want this? Well, for instance when I want to send/receive from a Browser
    BroadcastChannel
    :
    inline fun <reified T> MutableSharedFlow<T>.broadcast(
        channel: BroadcastChannel,
        scope: CoroutineScope,
        serializersModule: SerializersModule = EmptySerializersModule
    ) {
        val jsonSerializer = Json {
            this.serializersModule = serializersModule
        }
    
        channel.onmessage = { event ->
            val deserialized = jsonSerializer.decodeFromString<T>(event.data as String)
            scope.launch {
               this@broadcast.emit(deserialized.data)
            }
        }
    
        this.onEach { data ->
            val serialized = jsonSerializer.encodeToString(data)
            channel.postMessage(serialized)
        }.launchIn(scope)
    }
    Of course, this will not work and will generate an endless loop, since it receives its own updates. A few options came to my mind: • remember the last send message - will not work because it can be the case that it receives something again from the broadcastchannel • create a new wrapper class that overrides 'emit' so that I get a split between internal and external emits and can handle it according to this • wrapping the sharedflow with some sort of ID and include this ID in the message in order to distinguish own messages, however this dirties the messaging classes, but IMHO I can't find another idea. It leaks the implementation of broadcasting out of the MutableSharedFlow. Any ideas out there? I can't imagine I am the only one doing this, or I am barking up the completely wrong tree by using MutableSharedFlow 😉
    e
    3 replies · 2 participants
  • l

    lesincs

    07/02/2022, 5:02 AM
    async {}
    throws exception not until
    .await()
    be called.
    l
    6 replies · 2 participants
  • e

    Exerosis

    07/02/2022, 11:24 AM
    I've got a framework with a scheduler somewhat like this one: https://srcb.in/Re5YK27o2Z And I'm trying to make a dispatcher that works with this, so with the above dispatcher I could do:
    class MyWrapper(init: suspend () -> (Unit)) : ExampleFrameworkApp {
      override fun onInit() = blocking { init() }
    }
    As well as other framework calls (like events off the event bus etc.) I would like to be able to say something like:
    class MyApplication : MyWrapper({
      withContext(IO) {
        http.get("..")
      }
      delay(1.second)
      .. etc.
    })
    However the issue is that when withContext resumes it will call dispatch from an IO thread that schedules the resume, however, we are currently blocking the main thread so that never happens and we deadlock. I think I need a local event loop or something along those lines, is there a builtin construct I could use or do I need to make my own system here?
    n
    1 reply · 2 participants
Powered by Linen
Title
e

Exerosis

07/02/2022, 11:24 AM
I've got a framework with a scheduler somewhat like this one: https://srcb.in/Re5YK27o2Z And I'm trying to make a dispatcher that works with this, so with the above dispatcher I could do:
class MyWrapper(init: suspend () -> (Unit)) : ExampleFrameworkApp {
  override fun onInit() = blocking { init() }
}
As well as other framework calls (like events off the event bus etc.) I would like to be able to say something like:
class MyApplication : MyWrapper({
  withContext(IO) {
    http.get("..")
  }
  delay(1.second)
  .. etc.
})
However the issue is that when withContext resumes it will call dispatch from an IO thread that schedules the resume, however, we are currently blocking the main thread so that never happens and we deadlock. I think I need a local event loop or something along those lines, is there a builtin construct I could use or do I need to make my own system here?
n

Nick Allen

07/05/2022, 4:52 PM
so with the above dispatcher I could do
Avoid any solution that involves blocking on the main thread. Launch your coroutines instead.
withContext(IO) {
Moving off the main thread only helps when you are not already blocking it. This is why you need to launch from the main thread callbacks.
I think I need a local event loop or something along those lines, is there a builtin construct
Yes, if your dispatcher returns false for
isDispatchNeeded
, then a shared thread local loop is used. That won't fix your dead-lock, though. You can look at existing integrations for inspiration for wrapping your scheduler: https://github.com/Kotlin/kotlinx.coroutines/blob/master/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt There's ones for swing, javafx, and android.
View count: 11