uli
04/19/2018, 8:49 PMdeferredExports.foreach { it.await() }
and the first job takes the longest, I'd only get the exception of another job, after the first terminates.groostav
04/19/2018, 11:35 PMSequentialPoolExecutor
, so I wrote one, and tested it. The idea is that every submit
or execute
call is sequential-ized such that no submitted job will run before another submitted job completes. This is not necessarily single threaded as one very simple optimization we can employ is to back this SequentialExecutor with a pool, where the first-available thread is selected to run the job.
My goal for such a component would be to allow me to write code like this:
class SomeStatefulComponent {
private val data = MutableDataStructureThatIsntThreadSafe()
suspend fun doMutation(args: Args): Double = run(SequentialExecutor) {
data += transform(args)
return data.moreComputation()
}
}
and not worry about using nasty @Volatile
or Unsafe
or AtomicReference
or more generally CAS/locking strategies.
Instead such an executor would elegantly serialize everything for me.
But the problem, thinking back to concurrency in practice, is that which @Volatile
was originally designed to solve: if some mutable state on data
is put into a thread-local cache, then even squentially run jobds might get their correctness ruined by such a cache.
Effectively this boils down to an apocalyptic assumption: is it really the case that fields not marked for explicit thread sharing cannot ever be shared between threads?
Does somebody have a clever way to make this non-functional problem into a functional one via use of a fuzz-testing or other concurrency-testing strategy?mp
04/20/2018, 12:40 AMlouiscad
04/20/2018, 7:00 AMChannel(capacity = UNLIMITED)
+ offer(...)
or a rendez-vous Channel()
+ launch { send(...) }
?louiscad
04/20/2018, 3:59 PMThe default dispatcher for runBlocking coroutine, in particular, is confined to the invoker threadIs this still true? Isn't
runBlocking
now using DefaultDispatcher
, which is equal to CommonPool
on JVM/Android?
Quote from here, second paragraph, third sentence: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#unconfined-vs-confined-dispatcherDexter
04/20/2018, 6:33 PMuli
04/20/2018, 7:58 PMjulioyg
04/23/2018, 1:47 PMsuspendCoroutine
so:
suspend fun myFunc = suspendCancellableCoroutine{continuation ->
myObservable.subscribe(onsuccess={continuation.resume(it)}, onError={continuation.resume(error(it)})
continuation.invokeOnCompletion(onCancelling = true) {disposable.dispose()}}
Thing is that when I cancel the coroutine that calls the function, it doesn't dispose the observable, is there anyway to make the function to cancel immediately? as the doc says even if the coroutine is cancelled it doesn't mean that the code will be cancelled but instead we should check regularly inside that code whether is't cancelled or not, but, is there any alternative?julioyg
04/23/2018, 1:55 PMvoddan
04/23/2018, 2:36 PMimport org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.ListenableFutureCallback
public suspend fun <T> ListenableFuture<T>.await(): T = suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
val callback = ContinuationCallback(cont)
this.addCallback(callback)
cont.invokeOnCompletion {
callback.cont = null // clear the reference to continuation from the future's callback
}
}
private class ContinuationCallback<T>(@Volatile @JvmField var cont: CancellableContinuation<T>?) : ListenableFutureCallback<T> {
@Suppress("UNCHECKED_CAST")
override fun onSuccess(result: T?) { cont?.resume(result as T) }
override fun onFailure(t: Throwable) { cont?.resumeWithException(t) }
}
petersommerhoff
04/24/2018, 7:58 AMisActive
yourself to stop execution -- all of Kotlin's suspending functions do that so they also enable cancellationAJ Alt
04/24/2018, 5:20 PMselect
that doesn't abort if one of the coroutines throws an exception?dekans
04/25/2018, 9:32 AMigorvd
04/25/2018, 2:40 PMthrottle
operator using coroutines?hannesstruss
04/25/2018, 8:11 PMisActive
works:
fun main(args: Array<String>) = runBlocking {
val job = Job()
val pool = newFixedThreadPoolContext(2, "Cool")
repeat(2) {
launch(pool, parent = job) {
startLoop()
}
}
delay(1000)
println("Canceling")
job.cancel()
delay(1000)
println("End")
Unit
}
suspend fun startLoop() {
while (isActive) {
println("I'm active: ${isActive} in ${Thread.currentThread().name}, my job is cancelled: ${coroutineContext[Job]?.isCancelled}")
Thread.sleep(500)
}
}
calling job.cancel()
will not terminate the loops. The complete output is:
I'm active: true in Cool-1, my job is cancelled: false
I'm active: true in Cool-2, my job is cancelled: false
I'm active: true in Cool-1, my job is cancelled: false
I'm active: true in Cool-2, my job is cancelled: false
I'm active: true in Cool-2, my job is cancelled: false
I'm active: true in Cool-1, my job is cancelled: false
Canceling
I'm active: true in Cool-1, my job is cancelled: true
I'm active: true in Cool-2, my job is cancelled: true
I'm active: true in Cool-1, my job is cancelled: true
I'm active: true in Cool-2, my job is cancelled: true
End
In my real life situation, instead of Thread.sleep
, the coroutine is waiting to read from a blocking queue (poll with timeout). If I use delay(500)
instead of sleeping, the coroutines are cancelled properly. Does a Coroutine have to suspend in order for isActive to change?albertgao
04/26/2018, 1:04 AMreturn
, then the statement after this return
will not be executed. But what about inside a coroutine? You can’t return
in a coroutine. Seems the answer is coroutineContext().cancel()
, But with Anko
, it said coroutineContext of type CoroutineContext can't be invoked as a function
bj0
04/26/2018, 1:39 AMCancellationException
marstran
04/26/2018, 10:36 AMdknapp
04/26/2018, 2:02 PMgroostav
04/26/2018, 11:33 PM@ReplaceWith
for something like
launch {
//replace
staticallyKnownAsBlockingQueueInstance.take()
//with
run(BLOCKABLE_THREAD_POOL) { staticallyKnownAsBlockingQueueInstance.take() }
}
inducing a compiler error at BLOCKABLE_THREAD_POOL
... unlness kotlinx.coroutines can be updated to include such a dispatcher embedded in itself.Dmytro Danylyk
04/27/2018, 1:21 PMDmytro Danylyk
04/27/2018, 1:22 PMdekans
04/29/2018, 12:07 PMCommonPool
or custom from newSingleThreadContext
inherit the normail priority level.
Which makes them equal to the main thread.
Is there another mechanism to make the UI thread not affected? I'd like to make sure to prioritize it.jw
04/30/2018, 2:18 PMrunBlocking
dave08
05/01/2018, 4:43 AMfun ReceiveChannel. toDeferred() = async { single() }
, @spierce7 maybe, but probably not such good practice in coroutines to do such a thing though...Marharyta Nedzelska
05/03/2018, 7:41 AMlouiscad
05/04/2018, 2:00 PMcancel()
methods in Channel
and Job
be optimized to not allocate a new CancellationException
when the channel is already closed or the job is already cancelled or completed?voddan
05/08/2018, 3:52 PMnewSingleThreadedCoroutrineContext
, and a bunch of coroutine launched with it,
a) will the coroutines execute in the order of their creation?
b) will the execution prefer the first coroutine more than the last after some suspends and resumes?adeln
05/09/2018, 10:26 AMwithoutclass
05/09/2018, 3:26 PMwithoutclass
05/09/2018, 3:26 PMVsevolod Tolstopyatov [JB]
05/10/2018, 1:14 PMDelayChannel
(https://github.com/Kotlin/kotlinx.coroutines/issues/327) will be added in the next release, which will simplify things for you. You can find examples of usage in related pull requestwithoutclass
05/10/2018, 8:16 PM