https://kotlinlang.org logo
Docs
Join the conversationJoin Slack
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by Linen
coroutines
  • d

    Davide Giuseppe Farella

    01/15/2020, 8:40 PM
    Hello, can somebody explain me this? 🤨 ( Don’t mind about the typo 😄 )
    d
    d
    s
    • 4
    • 12
  • j

    jeggy

    01/16/2020, 12:30 AM
    How does one catch an exception within a
    launch
    scope? Example code: https://pl.kotl.in/g3Gz4nH43
    z
    • 2
    • 2
  • a

    Aaron Stacy

    01/16/2020, 3:01 AM
    Hi friends, I'd like to observe a coroutine scope's cancellation. Is there an idiomatic way to do that? My use case is I'm working on something that multiplexes I/O connections (specifically Android binder IPC) within an app. More specifically, if an Activity and a Service both connect to the same service (i.e. Google Play services' location service), I'd like them to share the same binder connection, and if both get onDestroy, I'd like to unbind. The best solution I've come up with is to launch a coroutine from the scope's context, delay forever, and catch the cancellation exception:
    scopeToBeObserved.coroutineContext.launch {
      try {
        delay(FOREVER)
      } catch (ignored: CancellationException) {
        // ... unbind the service connection ...
      }
    }
    o
    • 2
    • 2
  • a

    Animesh Sahu

    01/16/2020, 6:44 AM
    How do you suspend a bunch of coroutines, (may be under a supervisor or a context) till given amount of time, i.e. to make sure changes have been saved. I don't want to run any of the job under a given period say 5ms. There is no delay function in coroutine context or a supervisor job.
    o
    d
    • 3
    • 8
  • l

    Luis Munoz

    01/16/2020, 4:20 PM
    If I have a fixed thread pool of 8 threads (my CPU has 4 cores, 8 hyper threads) and I look at the queue size and is between 0 and 1k, and the processing is CPU bound. My CPU usage is never maxed out (20% ish). Does that mean that adding more threads should necessarily keep my queue size from spiking to 1k? Don't understanding how my cpu isn't maxed out and I can be getting queuing
    j
    d
    • 3
    • 2
  • v

    v0ldem0rt

    01/17/2020, 3:51 AM
    I am writing a Proxy handler for
    suspend
    function. How can I get return
    Class<*>
    of a suspend function. So far I know the last parameter of
    suspend
    function is a
    Continuation<T>
    . How can I get
    Class<T>
    from
    Continuation<T>
    o
    • 2
    • 4
  • e

    Esa

    01/17/2020, 2:55 PM
    Hi! Sorry if this is a silly question, but I sort of need to be sure about this. I’m looking to implement Channels for some script we’re using, but the Experimental annotation sort of makes me a bit anxious. How experimental are
    CoroutineScope.produce()
    ,
    ReceiveChannel<E>.consumeEach{}
    and
    ProducerScope.send()
    ? Is there any way to know? I’ve got a working implementation now, but knowing that it may break without warning is a bit scary. Also, is this the wrong place to ask this question? Is the discussion boards a better place?
    e
    t
    • 3
    • 11
  • t

    Thiyagu

    01/17/2020, 6:27 PM
    Hi All, can anyone help me how to write unit test for this below method?
    fun pollMessages(): Flow<Message> {
    
            return channelFlow {
    
                repeat(noOfReceivers) {
                    launch {
                        val messages = sqsClient.fetchMessages()
                        messages.forEach {
                            send(it)
                        }
    
                    }
                }
            }
        }
    l
    a
    • 3
    • 3
  • t

    Thiyagu

    01/17/2020, 8:34 PM
    Maybe a vague question for all. What all the benefits I will get if rewrite my Kafka consumer using non blocking I/O and coroutines. Will there be any difference in CPU usage, thread usage compared to my blocking version of the method?
    a
    • 2
    • 4
  • a

    Antanas A.

    01/18/2020, 11:02 AM
    Hi, is it technically possible to resume continuation twice when calling suspendCoroutine { cont -> cont.resumeWith(...) }?
    d
    d
    s
    • 4
    • 8
  • j

    Jérôme Gully

    01/18/2020, 11:32 AM
    Stupid question: Using GlobalScope in the Application class of an Android app is the same than creating a specific scope in the Application class, isn't it ?
    p
    s
    • 3
    • 5
  • b

    benny.huo

    01/18/2020, 2:13 PM
    PublisherAsFlow ignores the CoroutineContext, so that it make no sense to call flowOn on it. Is it a bug? I have opened an issue for this: https://github.com/Kotlin/kotlinx.coroutines/issues/1765
    b
    • 2
    • 3
  • g

    georgiy.shur

    01/18/2020, 4:13 PM
    Hello, I'm starting to use
    Flow
    in my production code, but it seems that my understanding of it and coroutines in general isn't sufficient. I reduced my project code to a simple reproducible test example. I'm using
    TestCollector
    class to collect values, emitted by the
    Flow
    :
    class TestCollector<T>(scope: CoroutineScope, flow: Flow<T>) {
    
        private val collectedValues = mutableListOf<T>()
        private val job = scope.launch { flow.collect { collectedValues.add(it) } }
    
        fun assertValues(vararg values: T) = run {
            val valuesList = values.toList()
            if (valuesList != collectedValues) {
                fail("\nExpected values: $valuesList\nCollected values:$collectedValues")
            }
            this
        }
    
        fun dispose() = job.cancel()
    }
    
    fun <T> Flow<T>.test(scope: CoroutineScope) = TestCollector(scope, this)
    This is my test itself:
    @Before
        fun setup() {
            Dispatchers.setMain(Dispatchers.Unconfined)
        }
    
        @Test
        fun testFlowCollector() = runBlockingTest {
            var firstEmit = true
            val channel = ConflatedBroadcastChannel(0)
            val testCollector = channel.asFlow().onEach {
                if (firstEmit) {
                    launch {
                        channel.send(1)
                        channel.send(2)
                    }
                    firstEmit = false
                }
            }.test(this)
    
            testCollector.assertValues(0, 1, 2)
    
            testCollector.dispose()
        }
    So basically I'm using blocking test and what I'm trying to do, is launch those
    send
    s only on the first emit. What I'm expecting to collect are all those three numbers
    0, 1, 2
    sequentially. But the test fails:
    java.lang.AssertionError: 
    Expected values: [0, 1, 2]
    Collected values:[0, 2]
    For some reason, the emission of
    1
    is getting lost. I'm trying to understand what's going on. Most probably I'm just misunderstanding/misusing the flow and the channel. Maybe there are coroutine experts who may explain it to me. 🙂
    d
    • 2
    • 11
  • a

    addamsson

    01/18/2020, 5:55 PM
    I'm trying to test some code which uses coroutines in my common project. How can I do so if I want to
    .join()
    a
    Job
    ? What I have right now looks like this:
    @Test
    fun someTest {
        myObj.doSomething().join()
    }
    My problem is that I can't call
    join()
    if I want to wait for the coroutine to complete because I have no
    CoroutineScope
    but there is no
    runBlocking
    in a common project which what I usually use in my jvm tests. What can I do instead to solve this problem?
    g
    • 2
    • 6
  • g

    Gabriel Feo

    01/18/2020, 7:30 PM
    What'd be the most idiomatic way to have a regular, non-suspend function, launch a coroutine in the scope in which it was called?
    a
    r
    +2
    • 5
    • 24
  • b

    Babacar Tall

    01/18/2020, 8:19 PM
    Hello, I'm a newcomer to Kotlin Flow (and my understanding is, unsurprisingly, limited) 🙂 ! I'm wondering if there's an elegant way (operators ?) to capture KeyboardInterrupt (Ctrl+C) exceptions and/or SIGKILL signals (and by scope) ? In order to implement treatments such as graceful reload for example. Thanks for your help 🙂
    o
    • 2
    • 6
  • m

    Mauricio Barbosa

    01/19/2020, 4:52 PM
    Hi guys, Can someone help me understand what’s going on with this code? I’m trying to create a flow that merge the responses of some requests and emit a new value every time a response is retrieved. For some reason this code crashes after the first request, printing the following log:
    Doing first request
    retrieving first request
    Exception in thread "main" kotlinx.coroutines.JobCancellationException: Parent job is Completed; job=ScopeCoroutine{Completed}@383534aa
    Has someone any ideia of what am I doing wrong?
    Untitled
    a
    • 2
    • 6
  • s

    svenjacobs

    01/20/2020, 7:36 AM
    Hey, I'm wondering if there is an operator on
    Flow
    that achieves the following: If the Flow has not produced at least one value in a given timeframe (x miliseconds), produce a default value followed by any values the Flow might produce afterwards. In contrast to a solution with
    debounce
    however, if the Flow has produced a value before the timeframe, the default value will just be ignored and the value will be emitted immediatly.
    e
    • 2
    • 11
  • h

    hmole

    01/20/2020, 12:53 PM
    Is it normal that my ~10 lines of Flow code got decompiled(Show bytecode -> decompile) to 20k lines of code with billion nested calls?
    p
    i
    g
    • 4
    • 5
  • d

    Drew Hamilton

    01/20/2020, 4:18 PM
    I wrote this simple extension function to endlessly subscribe to a channel (the subscriber’s scope is always <= the sender’s scope):
    internal suspend fun <E> ReceiveChannel<E>.forEach(action: (E) -> Unit) {
        val iterator = iterator()
        while (iterator.hasNext())
            action.invoke(iterator.next())
    }
    I was surprised that such an extension didn’t already exist—things like
    consumeEach
    only consume existing items in the Channel and then cancel. Am I either missing a function that does what I want, or missing a reason that I shouldn’t do this?
    d
    d
    • 3
    • 3
  • d

    diesieben07

    01/20/2020, 7:08 PM
    How do you deal with multiple "chained" asynchronous operations when using flow? Example:
    fun foo(): Flow<ResultA>
    and each
    ResultA
    has a
    fun getElements(): Flow<SubResult>
    . The
    getElements
    is tied to the same execution context as the containing flow and has to be cancelled with it.
    s
    b
    • 3
    • 18
  • e

    eygraber

    01/20/2020, 9:00 PM
    I recently started seeing a crash in Crashlytics for my Android app regarding a Job getting cancelled, but it has no stacktrace. It happens when backgrounding the app, but only very rarely, so it's very difficult to track down what's actually happening. Here's the exception (it's just one line):
    Fatal Exception: JobCancellationException: Job was cancelled
    Is there any sort of global debugging I can turn on to try and track this down? If so, are any of them safe for a production environment?
    e
    l
    • 3
    • 16
  • s

    svenjacobs

    01/21/2020, 8:33 AM
    Hey, I need help with
    Flow
    🙂 I have a
    Flow<State>
    that represents an application (UI) state. Now what I want to do is convert this into a flow with certain "side effects" on parts of the state. Here's example code:
    data class State(
        val string: String? = null,
        val number: Long = 0
    )
    
    val flow = flowOf(
        State(
            string = "Hello world",
            number = 0
        ),
        State(
            string = "Hello world",
            number = 1
        ),
        State(
            string = null,
            number = 1
        ),
        State(
            string = "Hello world 2",
            number = 1
        )
    )
    
    val flow2 = flowOf(
        flow.mapNotNull { it.string }
            .onEach { println("string $it") },
        flow.map { it.number }
            .onEach { println("number $it") }
    ).flattenMerge()
    
    val job = flow2.launchIn(GlobalScope)
    The thing is, in the isolated example this works however in my real application I have the following behaviour: 1. The first State object triggers both
    onEach
    , so the output is
    string
    and
    number
    . 2. Any subsequent changes to
    State
    will always only trigger the second
    onEach
    , which is
    number
    . (it is always the last Flow
    onEach
    that is triggered here. So if there are three inner Flows then only the third will be called) 3. If I use
    flattenConcat()
    instead of
    flattenMerge()
    then only the first
    onEach
    will be called for every State change. I'm lost here. Any ideas? Why am I doing this? Imagine that later a
    distinctUntilChanged()
    is added before each
    onEach
    so that the side effect is only triggered when the selected value of the state has changed. But first of all this should even work without
    distinctUntilChanged()
    . Any ideas?
    t
    z
    d
    • 4
    • 14
  • o

    orafaaraujo

    01/21/2020, 1:07 PM
    Hi, all the
    .catch { }
    operator of
    Flow
    should not get one
    NPE
    that might occurs inside it?
    d
    l
    +2
    • 5
    • 13
  • d

    dave08

    01/21/2020, 5:18 PM
    Does it make sense to add the
    flowOn
    here:
    inline fun <reified T : Any, reified R : Any> Flow<T>.concurrentMap(
    	dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
    	crossinline mapFun: suspend (T) -> R
    )Flow<R> = flatMapMerge {
    	flow { emit(mapFun(it)) }.flowOn(dispatcher)
    }
    ?
    z
    • 2
    • 3
  • r

    RoqueRueda

    01/21/2020, 6:10 PM
    Hi guys I have a questions, I want to inject using dagger a suspend lambda like this:
    suspend () -> String
    however when I try to compile it says: cannot provide dependency without provides annotation
    t
    • 2
    • 2
  • c

    Christophe Smet

    01/21/2020, 6:34 PM
    Does anyone know the best practice way to do a flatmapMerge on a Flow that runs every map in parallel ? It seems to be sequential by default.
    z
    o
    • 3
    • 6
  • p

    Paul Woitaschek

    01/22/2020, 11:21 AM
    3️⃣ Just a regular coroutine function with async's? ✔️
    ☝️🏼 2
    u
    • 2
    • 1
  • m

    myanmarking

    01/22/2020, 4:44 PM
    what is the proper way of sharing a flow ?
    d
    p
    • 3
    • 9
  • p

    paulblessing

    01/22/2020, 9:12 PM
    Is there a way to configure
    suspendCancellableCoroutine
    to cause
    .join()
    after cancellation to suspend until the Continuation is resumed / the background work is finished, similar to how things work when using a
    withContext
    to switch threads? I'm curious if I can get these to behave the same:
    private suspend fun runViaThread(): String {
      return suspendCancellableCoroutine { continuation ->
        thread {
          println("Starting background work")
          Thread.sleep(2000)
          println("Finished background work")
          continuation.resume("done")
        }
      }
    }
    
    private suspend fun runViaWithContext(): String {
      return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        println("Starting background work")
        Thread.sleep(2000)
        println("Finished background work")
        "done"
      }
    }
    
    @Test fun testViaThread() {
      /*
      Prints:
        Starting background work
        Before join "coroutine#2":StandaloneCoroutine{Cancelling}@5c7fa833
        After join "coroutine#2":StandaloneCoroutine{Cancelled}@5c7fa833
        Finished background work
       */
      runBlocking {
        val job = launch(Dispatchers.Default) { runViaThread() }
    
        delay(1000)
        job.cancel()
        println("Before join $job")
        job.join()
        println("After join $job")
    
        delay(3000)
      }
    }
    
    @Test fun testViaWithContext() {
      /*
      Prints:
        Starting background work
        Before join "coroutine#4":StandaloneCoroutine{Cancelling}@2b98378d
        Finished background work
        After join "coroutine#4":StandaloneCoroutine{Cancelled}@2b98378d
       */
      runBlocking {
        val job = launch(Dispatchers.Default) { runViaWithContext() }
    
        delay(1000)
        job.cancel()
        println("Before join $job")
        job.join()
        println("After join $job")
    
        delay(3000)
      }
    }
    b
    • 2
    • 11
Powered by Linen
Title
p

paulblessing

01/22/2020, 9:12 PM
Is there a way to configure
suspendCancellableCoroutine
to cause
.join()
after cancellation to suspend until the Continuation is resumed / the background work is finished, similar to how things work when using a
withContext
to switch threads? I'm curious if I can get these to behave the same:
private suspend fun runViaThread(): String {
  return suspendCancellableCoroutine { continuation ->
    thread {
      println("Starting background work")
      Thread.sleep(2000)
      println("Finished background work")
      continuation.resume("done")
    }
  }
}

private suspend fun runViaWithContext(): String {
  return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    println("Starting background work")
    Thread.sleep(2000)
    println("Finished background work")
    "done"
  }
}

@Test fun testViaThread() {
  /*
  Prints:
    Starting background work
    Before join "coroutine#2":StandaloneCoroutine{Cancelling}@5c7fa833
    After join "coroutine#2":StandaloneCoroutine{Cancelled}@5c7fa833
    Finished background work
   */
  runBlocking {
    val job = launch(Dispatchers.Default) { runViaThread() }

    delay(1000)
    job.cancel()
    println("Before join $job")
    job.join()
    println("After join $job")

    delay(3000)
  }
}

@Test fun testViaWithContext() {
  /*
  Prints:
    Starting background work
    Before join "coroutine#4":StandaloneCoroutine{Cancelling}@2b98378d
    Finished background work
    After join "coroutine#4":StandaloneCoroutine{Cancelled}@2b98378d
   */
  runBlocking {
    val job = launch(Dispatchers.Default) { runViaWithContext() }

    delay(1000)
    job.cancel()
    println("Before join $job")
    job.join()
    println("After join $job")

    delay(3000)
  }
}
b

bezrukov

01/22/2020, 10:23 PM
If you will switch to
suspendCoroutine
instead of
suspendCancellableCoroutine
both methods will behave the same
But it's recommended to support cooperative cancellation (you don't support it), otherwise you loose one of the biggest advantage of coroutines. E.g. with coroutines you need to use
delay
instead of
Thread.sleep
. When you need non cancellable piece of code, you can wrap it to
withContext(NonCancellable)
explicitly and it will work regardless of cancellation support inside the block.
p

paulblessing

01/22/2020, 10:45 PM
The code I'm actually trying to implement is meant to support cancellation, and I'm definitely aware of using
delay
. I'm actually trying to do something more along the lines of this - was just trying to simplify as much as possible for asking the question.
suspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
  return suspendCancellableCoroutine { continuation ->
    val future = submit<Unit> {
      try {
        val result = task()
        continuation.resume(result)
      } catch (e: Exception) {
        continuation.resumeWithException(e)
      }
    }
    continuation.invokeOnCancellation {
      future.cancel(true)
    }
  }
}
I have a lot of code that checks thread interruption for cooperative cancellation, and it would be nice to be able to re-use that code in the context of a coroutine, while still having it behave externally using this API the same as if it was written with
withContext
internally. But I can't seem to get the
join
to ever wait the same way it does when using
withContext
.
Ultimately, I'm trying to decide there is a way to fill in the body of this extension function:
suspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
  // TODO: Is there something that can go here that
  //  - suspends the calling thread
  //  - runs the task on a thread provided by this ExecutorService
  //  - interrupts the worker thread when the coroutine context is cancelled, to interoperate nicely with legacy code
  //    that cooperates with cancellation by checking thread interruption
  //  - otherwise behaves the same as if we'd just used withContext, such that calling job.join() after job.cancel()
  //    will not resume until the task has finished
}
b

bezrukov

01/23/2020, 6:28 AM
When you are using
withContext
all thread interruption checks don't work, so it's simple blocking call, because dispatcher's thread isn't marked as interrupted. That's why in your case sleep wasn't interrupted
p

paulblessing

01/23/2020, 11:37 AM
Perhaps the example with the sleeps was a bad one to use and I think it may be getting in the way. Feel free to ignore that. I'm simply looking for a way to be able to write a suspending function that can run existing code that is already coded to check for thread interruption (no sleeps anywhere) on threads provided by an ExecutorService. I essentially would like it to work externally same as withContext would, except that internally the worker thread can also get interrupted when the coroutine's job is cancelled.
b

bezrukov

01/23/2020, 12:03 PM
The main point I use sleep in my comments because it supports cooperative cancellation via checking interruption and can be replaced with your blocking code that checks thread interruption. So I see difference between what you are trying to achive and what you are calling as "the same as withContext".
val job = GlobalScope.launch {
    withContext(someDispatcher) {
        while (!Thread.interrupted()) { <-- Your code that checks interruption
            // do nothing, just spin 
        }
    }
}
delay(100)
println("cancel")
job.cancel()
println("after cancel") <-- will be never invoked, because the block inside withContext never finishes
job.join()
println("after join")
submitAndAwait
extension looking good, but it's behavior differs from your withContext expectations
p

paulblessing

01/23/2020, 2:08 PM
Here's a hopefully more realistic example.
// Let's say this is the existing code that I don't have control of.
fun existingCode(items: List<Item>) {
  for (item in items) {
    if (Thread.currentThread().isInterrupted) {
      println("[Worker] Interrupted, not processing any more items")
      return
    }

    println("[Worker] Start work for $item")
    nonTrivialWork(item)
    println("[Worker] Finished work for $item")
  }
}

@Test fun example() {
  val items = List(100) { index -> Item(id = index) }
  val job = GlobalScope.launch {
    try {
      executorService.submitAndAwait { existingCode(items) }
    } finally {
      // Would like this cleanup code to wait to execute until the worker had the chance to respond to interruption
      // i.e. would like to have the line "[Worker] Interrupted..." printed before the line "Cleaning up"
      withContext(NonCancellable) {
        println("Cleaning up")
        cleanUp()
      }
    }
  }
  runBlocking {
    delay(300) // Give the background work enough time to actually start before cancelling
    println("Cancelling the job")
    job.cancel()
    delay(1000)
  }
}
What I mean when I say I'd like it to behave similar to
withContext
is that if I did have the ability to update this existing code to take in the
Job
and use it as a cancellation token, I could use
withContext
and get it to behave the way I would like.
fun hypotheticalCode(items: List<Item>, job: Job) {
  for (item in items) {
    if (job.isCancelled) {
      println("[Worker] Job cancelled, not processing any more items")
      return
    }

    println("[Worker] Start work for $item")
    nonTrivialWork(item)
    println("[Worker] Finished work for $item")
  }
}

@Test fun example2() {
  val items = List(100) { index -> Item(id = index) }
  val job = GlobalScope.launch {
    try {
      withContext(executorService.asCoroutineDispatcher()) { hypotheticalCode(items, coroutineContext[Job]!!) }
    } finally {
      withContext(NonCancellable) {
        // In this example, the cleanup code does wait until the worker had the chance to respond to the job cancellation
        // So we _do_ get "[Worker] Interrupted..." printed before the line "Cleaning up"
        println("Cleaning up")
        cleanUp()
      }
    }
  }
  runBlocking {
    delay(300)
    println("Cancelling the job")
    job.cancel()
    delay(5000)
  }
}
I'm guessing it's probably not easy or possible. Even replacing that
withContext
call with an `async`/`await` (ignoring the IDE suggestion to replace it with
withContext
) causes the behavior to change such that the cleanup will happen before the worker had a chance to respond to cancellation. It's even pointed out in the docs for
await
that cancellation causes immediate resumption. Thanks for the help either way. If anyone can think of a way to pull off what I'm trying to do, I'm still all ears.
View count: 13