https://kotlinlang.org logo
Join the conversationJoin Slack
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by Linen
coroutines
  • e

    eygraber

    07/15/2021, 6:23 AM
    If I'm in a
    callbackFlow
    what's the best way to determine if I should still emit?
    isActive
    or
    !isClosedForSend
    (or both)?
    b
    • 2
    • 2
  • t

    Timo Gruen

    07/15/2021, 10:06 AM
    Hi Folks, I’m currently running into an issue while bootstrapping my application. While initializing an object, I would like to create indices in my mongodb collection. When using the following snippet:
    runBlocking {
                collection.createIndexes(indexes)
    }
    I keep get issues because the
    awaitSingle()
    (which is inside the
    createIndexes(...)
    ) is being stopped, and no value is being returned. Any help there?
    n
    • 2
    • 1
  • n

    Nikiizvorski

    07/15/2021, 1:45 PM
    Hey guys, i have the following snippet and would really appreciate some help. So basically this function is being called from another coroutine but this one is blocking even after everything else returns and it’s done so it’s like it never finish. Do you guys see something wrong with it? When i comment it out everything else works fine.
    override suspend fun waitForFix(timeoutMsec: Long, scope: CoroutineScope): Location? {
    return getLocation() ?: let {
    val channel = Channel<Location?>(1)
    val timeout = scope.launch {
    delay(timeoutMsec)
    channel.send(null)
    }
    getUpdates { fix ->
    scope.launch {
    timeout.cancel()
    channel.send(fix)
    }
    }
    channel.receive()
    }
    }
    o
    • 2
    • 7
  • o

    Omkar Amberkar

    07/15/2021, 4:42 PM
    Hey guys, I am trying to create a polling mechanism which fetches data when at least one Subscriber is active and would stop otherwise, I started with sharedflow but no luck there so I tried to work it with channelflow but the problem is that the
    isClosedForSend
    flag is still false in case I cancel the job, and there is no other way to close the channelflow that I am aware of. need some help 🙂
    fun poll(gameId: String, dispatcher: CoroutineDispatcher, ) = channelFlow {
                while (!isClosedForSend) {
                    try {
                        send(repository.getDetails(id))
                        delay(MIN_REFRESH_TIME_MS)
                    } catch (throwable: Throwable) {
                        Timber.e("Debug: error -> ${throwable.message}")
                    }
                    invokeOnClose { Timber.e("Debug: channel flow closed.") }
            }
        }
    n
    • 2
    • 7
  • m

    marcinmoskala

    07/16/2021, 2:51 PM
    Why
    Job::join
    does not resume when coroutine is just done? The documentation suggests that it should, but I cannot make in resume any other way but by crashing or cancelling a coroutine.
    import kotlinx.coroutines.*
    
    fun main(): Unit = runBlocking {
        val job = Job()
        launch(job) {
            delay(200)
        }
        job.join()
    }
    // Runs forever
    👍🏽 1
    n
    e
    l
    • 4
    • 4
  • f

    florent

    07/16/2021, 9:12 PM
    Hi, I got an issue with that code:
    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.flow.catch
    import kotlinx.coroutines.flow.map
    import kotlinx.coroutines.flow.onStart
    import kotlinx.coroutines.flow.receiveAsFlow
    import org.junit.Test
    
    class ExampleUnitTest {
    
        @Test
        fun `kotlin language error maybe`() {
            val channel = Channel<String>()
    
            channel.receiveAsFlow()
                .map { State.Content(it) as State }
                .catch { emit(State.Error) }
                .onStart { emit(State.Loading) }
        }
    
        sealed class State {
            object Loading : State()
            object Error : State()
            data class Content(val content: String) : State()
        }
    }
    If I keep the
    as State
    the compiler mark is as unnecessary, if I remove it the code doesn't compile. Is there an alternative way to write this so it compiles? I have tried using the <> syntax but it doesn't seems to work
    f
    e
    • 3
    • 5
  • m

    marcinmoskala

    07/17/2021, 8:25 AM
    Another thing that troubles me - why
    SupervisorJob
    is designed in this way, that it needs to be a part of scope, and does not work as a simple context passed as an argument?
    fun main(): Unit = runBlocking(SupervisorJob()) {
        launch {
            delay(100)
            throw Error()
        }
        launch {
            delay(200)
            print("Done")
        }
    }
    // Just Error
    fun main(): Unit = runBlocking {
        with(CoroutineScope(coroutineContext + SupervisorJob())) {
            launch {
                delay(100)
                throw Error()
            }
            val job = launch {
                delay(200)
                print("Done")
            }
            job.join()
        }
    }
    // Error...
    // Done
    e
    • 2
    • 5
  • x

    xxfast

    07/19/2021, 11:17 PM
    Accidentally typed in this
    suspend { //some suspending calls here }
    and this compiles just fine. Is this the same as
    runBlocking{}
    🤔
    e
    • 2
    • 5
  • w

    William Reed

    07/21/2021, 12:51 PM
    hi guys - i have code that roughly looks like this were I poll on an endpoint until the result changes. My current code passes in the time interval as a parameter
    data class ApiResponse(
        val someId: Long,
        val success: Boolean,
    )
    
    suspend fun ApiResponse.poll(interval: Long) {
        var result = success
        while (!result) {
            result = sendSomeApiRequest(someId)
            delay(interval)
        }
    }
    A requirement I need to support is allowing users of this function to change the
    interval
    value while polling occurs based on outside information. would the most idiomatic way to do this be using a
    StateFlow
    ? so my above function would become
    suspend fun ApiResponse.poll(interval: StateFlow<Long>)
    d
    e
    • 3
    • 5
  • p

    Pitel

    07/22/2021, 6:50 AM
    What's the difference between parameter
    block: suspend () -> T
    and
    block: suspend CoroutineScope.() -> T
    ?
    e
    • 2
    • 3
  • p

    Pitel

    07/22/2021, 9:07 AM
    Is there some (even dumb) implementatio of
    runBlocking
    for JS? The project I'm working has this:
    expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
    I tried someting like:
    var result: T? = null
    val job = GlobalScope.launch { result = block }
    while (!job.isCompleted) {}
    return result!!
    Yeah, I know it has many mistakes, but the main problem is the while loop block the execution of the coroutine.
    b
    r
    • 3
    • 3
  • m

    Melih Aksoy

    07/22/2021, 9:29 AM
    Hey all ! A question / discussion 🙂
    private var job: Job? = null
            set(value) {
                println("Setting value")
                field = value
            }
    
            job = lifecycleScope.launch() {
                println("Started")
    
                try {
                    println("Try")
                } catch (e: CancellationException) {
                    println("Catch")
                } finally {
                    println("Finally")
                }
            }
    prints
    Started
    Try
    Catch
    Finally
    Setting value
    in order. Which means when using default coroutine start,
    launch
    returns
    job
    reference after contents are executed ( if execution is fast enough ). So there’s no guarantee when you’ll be handled
    job
    reference when you do the assignment. I didn’t find any mention of this in any docs, but I was expecting it to at least wait for reference to be handled before starting execution 🤔 This is probably troublesome in scenarios if you do an
    if ( job != null ) return
    check or similar ( clear in
    finally
    (
    finally { job = null }
    ). You may end up
    job
    being assigned and not getting cleared. P.S. Using
    CorotuineStart.LAZY
    followed by
    job?.start()
    resolves this issue, I’m just asking if is this expected from default implementation 🤔 ?
    w
    e
    • 3
    • 11
  • s

    Sudhir Singh Khanger

    07/22/2021, 12:45 PM
    val watcher = object :TextWatcher{
        private var searchFor = ""
    
        override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
            val searchText = s.toString().trim()
            if (searchText == searchFor)
                return
    
            searchFor = searchText
            
            launch {       
                delay(300)  //debounce timeOut
                if (searchText != searchFor)
                    return@launch
    
                // do our magic here
            }
        }
    
        override fun afterTextChanged(s: Editable?) = Unit
        override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit
    }
    I came across this piece of code as a sample of debouncing. But it seems to me that it basically launches a new coroutine every time
    onTextChanged
    is called and then it executes the code after 300 seconds. That sounds like even a worse idea.
    s
    • 2
    • 3
  • a

    andylamax

    07/22/2021, 6:43 PM
    Quick torrent of questions. 1. Can I use any dispatchers with native-mt? 2. does multithreading in coroutines still require to freeze objects before passing them to another coroutines with a switched contex?
    b
    • 2
    • 2
  • m

    Moses Mugisha

    07/23/2021, 7:13 PM
    import io.ktor.util.DispatcherWithShutdown
    import io.ktor.util.InternalAPI
    import kotlin.coroutines.CoroutineContext
    import kotlinx.coroutines.CompletableJob
    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.Dispatchers
    import kotlinx.coroutines.Job
    import kotlinx.coroutines.SupervisorJob
    import kotlinx.coroutines.channels.ReceiveChannel
    import kotlinx.coroutines.channels.produce
    import kotlinx.coroutines.launch
    
    
    class SQSPoller(private val numWorkers: Int = 10
    ) : CoroutineScope {
    
        private val supervisorJob = SupervisorJob()
        override val coroutineContext: CoroutineContext
            get() = <http://Dispatchers.IO|Dispatchers.IO> + supervisorJob
    
        private val stopRequest: CompletableJob = Job()
    
        // this is limited to a pool size of 64 threads. or the number of cores(which ever is higher)
        @OptIn(InternalAPI::class)
        private val tasksDispatcher = DispatcherWithShutdown(<http://Dispatchers.IO|Dispatchers.IO>)
    
        private lateinit var rootJob: Job
    
        fun start() = launch(tasksDispatcher) {
            println("Starting SQS Queue Poller")
            println("starting")
            val sqsMessageProducer = launchSQSPoller()
            val workers = List(numWorkers) {
                worker(it, sqsMessageProducer)
            }
            // distribute work among numWorkers
            for (worker in workers) {
                worker.join()
            }
            stopRequest.join()
        }.apply {
            rootJob = this
        }
    
        /** [fetchMessages] long polls the SQS queue for new messages*/
        fun fetchMessages(): List<Int> {
    
            return arrayListOf(1,3)
        }
    
        /** [launchSQSPoller] fetch sqs messages` .*/
        private fun CoroutineScope.launchSQSPoller(): ReceiveChannel<Int> = produce {
            loopForever {
                    val messages = fetchMessages()
                    messages.forEach {
                        send(it)
                    }
            }
        }
    
        private fun processMessage(message: Int) {
            println("sqs: worker ${Thread.currentThread()}  processing messages")
            println(message)
            Thread.sleep(400)
        }
    
        /** [loopForever] triggers a function as long as the coroutine scope is active and the unleash flag is set */
        private suspend fun loopForever(block: suspend () -> Unit) {
            while (true) {
    
                    try {
                        block()
                        Thread.yield()
                    } catch (ex: Exception) {
                        println("coroutine on ${Thread.currentThread().name} cancelled")
                    } catch (ex: Exception) {
                        println("${Thread.currentThread().name} failed with {$ex}. Retrying...")
                        ex.printStackTrace()
                    }
                }
    
            println("coroutine on ${Thread.currentThread().name} exiting")
        }
    
        /** [worker] consumes the SQS messages */
        private fun CoroutineScope.worker(id: Int, channel: ReceiveChannel<Int>) = launch(tasksDispatcher) {
            println("sqs: worker $id processing messages")
            // fanout messages
            for (message in channel) {
                launch { processMessage(message) }
            }
        }
    }
    I am implementing a SQS polling job, that polls the queue and does fan out to process the messages, but I keep running into performance issues. What I am I doing wrong?
    j
    • 2
    • 1
  • d

    Dean Djermanović

    07/24/2021, 8:45 AM
    I have following layers in my app:
    UI
    ,
    Presentation
    ,
    Business
    and
    Data
    . I’m launching a coroutines from the
    Presentation
    layer where I have setup
    backgroundScope
    with an exception handler. I want to share a
    Flow
    in the data layer using
    shareIn
    operator. Which scope should I pass to it? If I pass different scope than the
    backgroundScope
    used in the presentation layer, the exceptions won’t be propagated up to the exception handler in that
    backgroundScope
    . Also I want to share that
    Flow
    globally, not in the scope of that
    ViewModel
    in the presentation layer. So the question is how to share a
    Flow
    on the data layer from some repository class which is a singleton and still propagate the exceptions to the exception handler that’s setup in the presentation layer?
    n
    • 2
    • 1
  • c

    camdenorrb

    07/25/2021, 8:53 PM
    package me.camdenorrb.generaltesting.coroutines
    
    import kotlinx.coroutines.*
    
    object TestBreaking1 {
    
        @JvmStatic
        fun main(args: Array<String>) {
    
            runBlocking {
                launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                    delay(10000000)
                }
                println("Here1")
            }
    
            println("Here2")
        }
    
    }
    This doesn't print "Here2" until the delay is done, is this expected?
    :yes: 6
    a
    e
    j
    • 4
    • 7
  • w

    William Reed

    07/26/2021, 9:11 PM
    in my code I have
    fun foo(): Flow<Foo> = flow {
            while (context.isActive) {
                // ... process and emit ...
                delay(someMutableDelay) // wait until doing this again
            }
        }
    someMutableDelay
    is a mutable
    var
    in the same class as
    fun foo()
    - when it is changed I want the current running
    delay
    to be cancelled and the next iteration of the loop to start immediately. any suggestions for how to achieve this?
    e
    n
    • 3
    • 7
  • a

    Adrien Poupard

    07/27/2021, 9:23 AM
    As we can do:
    class Lambda: (String) -> String {
    	override fun invoke(p1: String): String {
    		TODO("Not yet implemented")
    	}
    }
    Would it be possible to add the feature to do?
    class SuspendLambda: suspend (String) -> String {
    	override suspend fun invoke(p1: String): String {
    		TODO("Not yet implemented")
    	}
    }
    m
    e
    • 3
    • 4
  • j

    jeff

    07/27/2021, 3:02 PM
    I read today that
    kotlinx.coroutines.delay
    on Android uses
    Handler.postDelayed
    under the hood. That surprised me -- is it true? For Dispatcher.Main only, or all dispatchers?
    z
    m
    a
    • 4
    • 4
  • s

    Scott Kruse

    07/27/2021, 9:36 PM
    Looking at some code where RxJava is called from a suspend function. TLDR on why this is a bad idea?
    viewModelScope.launch {
                withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
                    doStuff()
                }
            }
    
            suspend fun doStuff() {
                doStuffWithRx()
            }
    
            fun doStuffWithRx() {
                Single.fromCallable {
                    // hit network 
                }
                        .subscribeOn(<http://Schedulers.io|Schedulers.io>())
                        .observeOn(AndroidSchedulers.mainThread())
                        .doOnSuccess { result ->
                        }
                        .subscribe()
            }
    c
    n
    • 3
    • 2
  • t

    TwoClocks

    07/28/2021, 1:45 AM
    if you have some normal function / code, is there a way to tell if your in a coroutine, or just a normal code path? For logging, I'd like to add the coroutine name (if it exits). but logging calls can be called from anywhere, inside or outside of coroutines.
    ➕ 1
    m
    • 2
    • 6
  • e

    enleur

    07/28/2021, 6:02 PM
    any best practices how gracefully shutdown scope (like custom GlobalScope)?
    withTimeout
    but for the whole scope?
    z
    • 2
    • 1
  • a

    azabost

    07/28/2021, 6:38 PM
    Hey 👋 I'm trying to use
    suspendCancellableCoroutine
    to bridge some 3rd party API in my app. It's working OK-ish but I'm wondering how reliable it is considering the callback I need to implement requires returning some values, e.g.
    suspend fun displayFoo() {
        val foo = getFoo()
        delay(10_000)
        println(foo)
    }
    
    suspend fun getFoo(): Foo = suspendCancellableCoroutine { continuation ->
        val callback = FooLoader
            .addListener(object : RequestListener<Foo> {
                override fun onLoadFailed(e: FooException): Boolean {
                    continuation.resumeWithException(e)
                    return false
                }
    
                override fun onLoadSuccessful(foo: Foo): Boolean {
                    continuation.resume(foo)
                    return false
                }
            })
            .load()
    
        continuation.invokeOnCancellation {
            callback.cancel()
        }
    }
    How is this actually working? Does the thread calling
    onLoadSuccessful
    execute the full
    delay(10_000)
    before returning
    return false
    or is it executed as soon as the calling thread is "free" (because
    delay
    is suspending, not blocking)? I'm also wondering what would happen if the continuation failed e.g.
    suspend fun displayFoo() {
        val foo = getFoo()
        delay(10_000)
        throw NPE
    }
    n
    • 2
    • 3
  • t

    tad

    07/29/2021, 7:42 PM
    So this definitely looks wrong, and raises the "Inappropriate blocking method call" warning, but it works. Is this actually dangerous?
    lifecycleScope.launch {
                repeatOnLifecycle(Lifecycle.State.STARTED) {
                    try {
                        session.load()
                        awaitCancellation()
                    } finally {
                        runBlocking {
                            // This is a suspending function that performs I/O and needs to complete.
                            session.recordBackgroundTime()
                        }
                    }
                }
            }
    u
    n
    +2
    • 5
    • 51
  • t

    Tristan

    07/30/2021, 4:56 PM
    Hello, I am trying to use Flow in an iOS project using objective-c. I am not so familiar with the iOS environment. Reading a bit some articles, I made the following code in Kotlin
    class IosDispatcher : CoroutineDispatcher() {
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            dispatch_async(dispatch_get_main_queue()) { block.run() }
        }
    }
    
    actual class PrimaryScope : CoroutineScope {
        private val dispatcher = IosDispatcher()
        private val job = Job()
    
        override val coroutineContext: CoroutineContext
            get() = dispatcher + job
    }
    
    fun <T>observe(origin: Flow<T>, block: (T) -> Unit): PrimaryScope {
        val scope = PrimaryScope()
    
        origin.onEach {
            block(it)
        }.launchIn(scope)
    
        return scope
    }
    But when I do use it, I am getting the error from the screenshot. I use it like this (I tried with empty block in case it was related to a reference lost, or similar):
    [UnityKotlinCoroutineHelperKt observeOrigin: self.tokenStorage.events
                                              block:^(id result) {
                                              }];
    Do you have some example I could use? Or article explaining how to achieve this? Thanks a lot for your lights.
    m
    • 2
    • 4
  • g

    Gleno

    07/30/2021, 7:24 PM
    I’m sure this has been asked to death, but should I always rethrow the
    CancellationException
    ?
    e
    n
    • 3
    • 2
  • j

    Jan Skrasek

    07/30/2021, 10:10 PM
    A different between
    MutableSharedFlow()
    and
    Channel()
    with runBlockingTest I would expect both to behave the same; Yet when not using extraBufferaCapacity = 1, my following test fails for MutableSharedFlow and not for Channel. (See gist) https://gist.github.com/hrach/94ba0dc427ad2f08a73467d8972516a2 • why is there a difference? • is both behavior correct? • is correct that there is a difference?
    n
    • 2
    • 7
  • t

    Tuan Kiet

    07/31/2021, 4:35 AM
    Not really related to coroutine but does anyone know what "larger actions" in this doc mean? and how does "reads and writes" differ from "increment"
    e
    c
    • 3
    • 11
  • h

    Hank

    07/31/2021, 8:45 AM
    How to get the first response of calling 5 requests with Http call? The use case is sending 5 http requests then get the first response and cancel others. Any suggestions?
    :not-kotlin: 1
    j
    r
    n
    • 4
    • 6
Powered by Linen
Title
h

Hank

07/31/2021, 8:45 AM
How to get the first response of calling 5 requests with Http call? The use case is sending 5 http requests then get the first response and cancel others. Any suggestions?
:not-kotlin: 1
j

Jan Skrasek

07/31/2021, 9:40 AM
Use select function. Register all deferred in it and after first receive cancel others.
👍 2
h

Hank

07/31/2021, 9:42 AM
That's really helpful. Thank you so much.
r

Richard Gomez

08/01/2021, 12:34 AM
@Jan Skrasek: Would you mind sharing a snippet of what that would look like? (For learning purposes.) I'm assuming you're referring to "Selecting deferred values", but am not familiar with the
select { }
function. Perhaps something like:
// Ugly code
val requests = List<Deferred<String>>(5) {
    delay(Random.nextInt(0, 1000).toLong())
    async { "Response #$it" }
}
val response = select<String> {
    requests.withIndex().forEach { (index, deferred) ->
        deferred.onAwait { answer ->
            requests.forEach { it.cancel() }
            "Deferred $index produced answer '$answer'"
        }
    }
}
n

Nick Allen

08/01/2021, 4:19 AM
Alternative with
Flow
withTimeout(limit) {
 merge(::func1.asFlow(),::func2.asFlow(),... ). first()
} (formatting not working on phone)
🙌 2
j

Jan Skrasek

08/01/2021, 11:45 AM
@Richard Gomez your's example seems reasonable, what's not working? You may take a look at some official samples: https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt
r

Richard Gomez

08/01/2021, 4:55 PM
your's example seems reasonable, what's not working?
Nothing in particular, just confirming that I have the right idea. 🙂 Is there a more idiomatic way to cancel a list of jobs than
requests.forEach(Job::cancel)
? I suppose you could run them in an ad-hoc context and cancel that once the first is completed.
View count: 4