groostav
05/09/2018, 5:01 PMSingleThreadedDispatcher
that would allow you to take a thread pool and put a facade on it such that any job submitted would be serialized (read: only run after the previous job had been completed). such a dispatcher would get you a similar effect as a kotlinx.coroutines...sync.Mutex
as it relates to the behaviour of sequentialDispatcherFacade.submit
and mutex.withLock
.
This issue of using mutex
without the other more traditional java.concurrent
sounds like it can induce some insidious problems with threads caching values, unless somebody here tells me I'm mistaken which I'm desperately hoping they do.
consider a simple little class:
class SomeStatefulClass {
val mutex: Mutex = Mutex()
var state: Int = 0
fun doStatefulTransform(){
mutex.withLock {
val newState = state + 1
state = newState //this is actuall just state++, but it illustrates how we might get a stale-read.
}
}
}
@Test fun doStuff(){
val statefulThing = SomeStatefulClass()
val job1 = launch(JavaFX) { for(i in 0 until 10_000) statefulThing.doStatefulTransform() }
val job2 = launch(CommonPool) { for(i in 0 until 10_000) statefulThing.doStatefulTransform() }
//a better test might be val jobs = 0 .. 10k .map { launch { ...
job1.await(); job2.await()
assertThat(statefulThing.state).isEqualTo( ??????? )
}
Such a test would likely pass on x86 with a good deal of regularity because my understanding is that x86 doesn't make CPU-local caches as aggressively as some platforms. But arm on the other hand might release the mutex acquired by doStatefulTransform()
before flushing its local change in its L2 cache (or similar) through to main memory, thus letting the other thread acquire the mutex but read a stale state
value.
Is there something in "happens-before" semantics that I'm missing that would protect us from this? Or do you need to mark state
with @Volatile
(or use an AtomicInt
or Unsafe
etc etc)?
Similarly, could we leverage jcstress or similar with a CI service on kotlinx.coroutines to discover this? Is it worth investing the time? If I could steal some time on jetbrain's quasi-public teamcity infrastructure, I'd be happy to try and set this up, if the kotlinx.coroutines commander-in-chief thinks its a good idea.0rph3u
05/09/2018, 8:50 PMylemoigne
05/09/2018, 10:48 PMwindow.asDynamic().coroutineDispatcher = window.asCoroutineDispatcher() +
CoroutineExceptionHandler { coroutineContext, throwable ->
console.error("Unhandled exception in coroutine", throwable, coroutineContext)
}
?ilya.gorbunov
05/13/2018, 11:45 PMkotlin.coroutines.experimental
- that's where it is. Also it has SinceKotlin("1.1")
restriction, so in case if you use -apiVersion 1.0 (most likely not), it may be hiddengroostav
05/14/2018, 3:20 AMkotlinx.coroutines.experimental.withTimeout
unit-testable?Sorin
05/14/2018, 7:00 PMexperimental
API.araqnid
05/14/2018, 10:39 PMbrabo-hi
05/15/2018, 4:19 AMSlackbot
05/15/2018, 5:18 AMbitkid
05/15/2018, 11:32 AMgroostav
05/16/2018, 8:11 PMjava.time.Duration
, so perhaps a pair of value: Long, unit: TimeUnit
is better.JoeHegarty
05/17/2018, 5:57 AMCompletableFuture<Unit>
, it's a little awkward but it's not really any more awkward than `CompletableFuure<Void>`or if you use just CompletableFuture
as a rawtype which is pretty nasty for other reasons (though arguably more readable). It's a shame Java doesn't have something equivalent to Job in the futures space.igorvd
05/17/2018, 2:45 PMandroid architecture components
with LiveData
, is still necessary to cancel my coroutines in the onDestroy
lifecycle method ?spand
05/18/2018, 8:11 AMHunter
05/21/2018, 1:44 AMwickedev
05/22/2018, 8:06 AMgroostav
05/22/2018, 10:14 PMIntel Fortran Runtime Error: IEEE754 is signalling
. This made me realize I really dont know how signals work. They are synchronous and on-thread correct? In the sense that If you look at the call stack it would look something like this:
top:
1. java entry point
2. JNA
3. fortran entry point
4. fortran math
5. ????
6. fortran signal handler
Or are signals raised on other threads? I'm not sure where x86 ends and Windows begins here.withoutclass
05/23/2018, 3:27 PMforEach
wasn't added to the coroutines library for channels? I'm considering putting up a PR to add forEach
and forEachIndexed
to complement consumeEach
and consumeEachIndexed
, so that people may avoid the same mistake I made with consumeEach
mp
05/23/2018, 7:29 PMspand
05/24/2018, 9:11 AMlaunch
and friends without giving the explicit context that I actually intend to. How do you guys avoid this ?pdegand
05/24/2018, 10:21 AMsuspend fun start(channel: ReceiveChannel<Event>) {
launch(coroutineContext) {
for (event in channel) {
val view = // do something with each the event
withContext(context = UI) {
_evenementLiveData.value = view
}
}
}
}
and my test looks like this :
@Test
fun testStart() = runBlocking {
//given
val channel = ConflatedChannel<Event>()
val observer = mock<Observer<EvenementDetailView>>()
presenter.evenementLiveData.observeForever(observer)
val evenement = // create an event
//when
presenter.start(channel)
channel.send(event)
//then
assertThat(presenter.evenementLiveData.value).isEqualTo(expectedView)
}
However, as my method start is launching a coroutine, the assertion in the UT is run before the consumption of the event sent into the channel.
I tried to remove the launch { }
inside the start()
function, but without it, the runBlocking coroutine of the UT is suspended on presenter.start()
because of the for
loop inside the start()
function.
Am I doing something really wrong here in the start()
function or is this a bad design that is hard to unit test or am I simply missing a point somewhere ?
Thxdave08
05/24/2018, 1:30 PMadjorno
05/25/2018, 11:59 AMalex.krupa
05/27/2018, 3:18 PMChannel
that works like RxJava Observable.sample()
operator?
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#sample-io.reactivex.ObservableSource-boolean-
Use case:
I have a pair of 2 suspend
functions: write()
and read()
. Under the hood writing fires an action and the action's result gets pushed to a Channel
that I consume using receive()
in read()
. I could as well combine them into a writeForResult()
that just returns the value, but I don't think it makes a difference here.
Problem is that while I can invoke write()
many times in a short period (let's say 100/second), read()
results are delayed by ~100ms each (so 10/second) in their Channel
(which I consume).
Example steps:
1. Call write()
and read()
one after the other.
2. Repeat above BEFORE ~100ms pass - write()
called before read()
in 1 returned a value, second read()
should return null
immediately.
3. Repeat above AFTER ~100ms pass - write()
called after read()
in 1 returned a value, second read()
should return a normal non-null value.
I guess I could achieve something similar with `wait()`/`notify()` on a Java object, but I'm trying to solve it using `Channel`s. And as in my initial question - Observable.sample()
seems perfect for this case, but I don't want to import Rx just for this one issue.
I tried various experiments with a proxy channel that uses ConflatedChannel
(which always holds the latest value). There's also a thing called Mutex
, but I'm not sure if it's right to use here.
Any other ideas?spand
05/28/2018, 12:21 PMvoddan
05/28/2018, 2:10 PMigorvd
05/28/2018, 5:52 PMPaul Woitaschek
05/28/2018, 7:30 PMAsyncTask.THREAD_POOL_EXECUTOR
igorvd
05/29/2018, 12:07 PMclean architecture
, how you guys model the coroutines execution for use cases?
You guys only mark the execute method with suspend and let the caller decide how to start the suspend function, or make the suspend function return a deferred or result?voddan
05/29/2018, 2:17 PMvoddan
05/29/2018, 2:17 PMelizarov
05/30/2018, 8:33 AMExecutors.newFixedThreadPool(n).asCoroutineDispatcher()
does not work for you?voddan
05/30/2018, 12:00 PM[[a1, b1, c1], [a2, b2, c2], [a3, b3, c3], ...]
, each stage is a roughly equivalent expensive CPU computation.ScheduledThreadPoolExecutor
the system will execute task stages in order of their submission: a1, a2, a3, a4, ..., b1, b2, b3, ..., c1, c2, ...
. Note that when (N >> 1) tasks come simultaneously, all of them will progress through stages "in-parallel" and finish at about the same time. In other worlds, the time it takes to complete 1st task (a1, b1, c1
) is O(N) in this scenarioa1, b1, c1
takes about the same for any N. That can happen if our scheduler has a higher priority for earlier tasks, so it never executes a3
if it can execute c1
instead. Makes sense?elizarov
05/30/2018, 3:55 PMvoddan
06/04/2018, 7:39 AMelizarov
06/04/2018, 11:55 AM