https://kotlinlang.org logo
#coroutines
Title
# coroutines
o

Olekss

06/05/2018, 5:57 AM
hey do I understand correctly that launch { x = async {...};y=async{}; x.await(); y.await()} will try to take 3 threads from ForkJoinPool and in case of 1core computer 2nd async simply will not start until first finished even without "await"?
g

gildor

06/05/2018, 6:04 AM
No, all those coroutines run on CommonPool dispatcher which has
numCpu - 1
threads
Those coroutines are non-blocking, so probably handled by the same thread in this case even on multi-core CPI. Also both
async
will be run in parallel, but of course if you have only one thread both of them would be dispatch sequentially, but because there is no blocking it’s just a method call
On my 8 hyper thread machine both async handled by different threads from CommonPool
But if you code inside
async
is non blocking even with 1 thread both coroutines would be run in parallel in the same thread (with context switch of course) and
await()
doesn’t lock thread
You can easily check this, just put logs after each await with timestamp and thread name
o

Olekss

06/05/2018, 6:17 AM
my confusion with coroutines (still) is their relationship with physical hardware threads... because I have case where I have code something like launch { }
ooops
g

gildor

06/05/2018, 6:17 AM
Think about coroutines as about callbacks
You can create a lot of callbacks and invoke them at the same or different threads
main difference, that with coroutine builder you also specify where this callback should be dispatched (by default in CommonPool)
o

Olekss

06/05/2018, 6:19 AM
launch { in = inputchannel.receive outputWorkers.forEach { async{doWork(in)}.await()} } And I somewhat worried that if my doWork(in) of the first element takes long time, then other doWorks will wait in wain
g

gildor

06/05/2018, 6:19 AM
wat is
doWork
?
is that blocking operation?
o

Olekss

06/05/2018, 6:20 AM
doWork in general is call into ZeroMQ send
g

gildor

06/05/2018, 6:20 AM
And this call is blocking or not?
o

Olekss

06/05/2018, 6:20 AM
with greedy pirate algorithm, so it can take a while while it returns
g

gildor

06/05/2018, 6:21 AM
If this call to 0mq using some callbacks + coroutine adapter it’s non-blocking, otherwise it’s blocking and you shouldn’t wrap it to coroutines with CommonPool
o

Olekss

06/05/2018, 6:21 AM
if other end is down, then particular call my take like 12 seconds to finish
thats the problem, no callbacks
I feel I have to write my own wrapper around it to make it with callbacks
g

gildor

06/05/2018, 6:22 AM
see, “12 seconds to finish” doesn’t mean that something is blocking or not, it’s more about callbacks vs thread blocking
o

Olekss

06/05/2018, 6:22 AM
it is blocking... of course... zeromq is pretty strightforward...
g

gildor

06/05/2018, 6:22 AM
Yes, if 0mq lib has non-blocking API is the best choice in case of coroutines
but, even if you don’t have non-blocking API you can use it with coroutines
just use custom dispather that used only for such calls
o

Olekss

06/05/2018, 6:23 AM
currently I am using it with coroutines... only parts which are blocking - well they are blocking
g

gildor

06/05/2018, 6:24 AM
Something like this:
Copy code
val ZeroMQ = newFixedThreadPoolContext(ZERO_MQ_THREAD_POOL_SIZE, "ZeroMQ")
Copy code
launch {
in = inputchannel.receive
outputWorkers.forEach { async(ZeroMQ) {doWork(in)}.await()}
}
so now you don’t have problems with blocking of CommonPool
o

Olekss

06/05/2018, 6:24 AM
but async will dispatch into CommonPool, in case if I have several threadss... wouldn't it dispatch each blocking call onto it's own thread?
g

gildor

06/05/2018, 6:24 AM
async will dispatch into CommonPool
No, see my example
Yes, you right, blocking calls must be dispatched by some special dispatcher
and this dispatcher should be configured according your requirements (ammount of parallel calls, threads lifecycle etc)
o

Olekss

06/05/2018, 6:25 AM
so if I stick up my dispatcher and I have like don't know 4-5 threads reserved, then I am good?
g

gildor

06/05/2018, 6:26 AM
Yes
o

Olekss

06/05/2018, 6:26 AM
thanks... sounds about nice exercise 🙂
g

gildor

06/05/2018, 6:27 AM
See, CommonPool is default pool that used by default and intended to use only with non-blocking calls, even 1 thread usually is enough to dispatch all non-blocking calls (tho, there is an issue about this to allow configure minimal amount of threads in CommonPool)
o

Olekss

06/05/2018, 6:27 AM
but when we have callback based invocations of parallel tasks (think Firebase on Android) then this "dispatching" problem is problem of FB implementation, and I only write my Coroutine wrapper as per examples which wraps the callback, and I even don't worry about dispatching as it is hidden under the hood of FB
g

gildor

06/05/2018, 6:27 AM
But if you want to use some blocking API (blocking file io, blocking network, DB etc) it would be better to wrap such calls to own custom thread pool
Correct, usually libraries that provide some asyn API use own dispatching mechanism under the hood: 1. real non-blocking API, like java.nio 2. blocking API wrapped to some thread pool
o

Olekss

06/05/2018, 6:28 AM
yea that is what is this confusing part of Coroutines documentation, as all the examples start with fun doSomeBigWork() And then show how sticking this into async or launch servse the problem... But in reality, if it's blocking, it IS blocking... and coroutines won't save if you have long-running file-crunching anyway
g

gildor

06/05/2018, 6:29 AM
Coroutine cannot magically convert your blocking code to non-blocking
o

Olekss

06/05/2018, 6:29 AM
well stick this one sentence as first on coroutine documetnation 🙂 people get confused 🙂
ok... I think I understood how I need to tailor my code now
g

gildor

06/05/2018, 6:33 AM
BTW, You don’t need asunc/await here
outputWorkers.forEach { async(ZeroMQ) {doWork(in)}.await()}
withContext would be enough:
Copy code
outputWorkers.forEach {
    withContext(ZeroMQ) { doWork(in) }
}
l

louiscad

06/05/2018, 7:37 AM
I think this is more efficient:
Copy code
withContext(ZeroMQ) {
    outputWorkers.forEach {
        doWork(it); yield()
    }
}
g

gildor

06/05/2018, 8:19 AM
Yes, you right, better to consume on the same thread
l

louiscad

06/05/2018, 8:35 AM
Neither version consumes on a single thread since ZeroMQ dispatcher could be thread pool based, and both invoke a suspending function, which means it then resumes in the coroutineContext, on any of its threads. But my version avoids having two suspension points (withContext can suspend before and after its block) when you can have only one suspension point instead (yield makes it cooperative with other coroutines running on the same dispatcher)
g

gildor

06/05/2018, 8:40 AM
Good point about two suspension points, sorry for tautology 😅
o

Olekss

06/05/2018, 9:53 AM
I don't quite understand how it works, but looks like it's working
g

gildor

06/05/2018, 9:53 AM
What works? yield?
o

Olekss

06/05/2018, 9:54 AM
no worker dividing... I printed thread on which actual zeromq send happens, and they are different > I gave 5 threads to that newContext as you mentioned above
g

gildor

06/05/2018, 9:55 AM
It’s just a pool of tasks that should be dispatched, so this pull processed by coroutine dispatcher
o

Olekss

06/05/2018, 9:56 AM
pool?
g

gildor

06/05/2018, 9:57 AM
haha, yes, end of working day %)
even non-blocking coroutines should be dispatched, by current thread (using Unconfined dispathcher) or by any other thread or pool of threads
o

Olekss

06/06/2018, 7:13 AM
@gildor I am trying to understand yours and Louis code difference... What I want (and I can't quite test it on my case) to have like 5 workers (potentially) work in parallel (litearry) so that all of them start at the same time, but all finish when all are finished... So I am using coroutines not for starting in parallel, but be sure that when I exit the code block, all of them has finished... I don't care of order of execution, I care that noone blocks another for startup
g

gildor

06/06/2018, 7:15 AM
There is no real difference, version of Louis little bit more efficient, because have one less suspension point (point where coroutine dispatch callback and do some other work)
o

Olekss

06/06/2018, 7:17 AM
then I either don't undersatnd coroutines yet, or I am too embedded in Java thinking... If the worker.dowork is blocking code, then the execution of like (execution pointer) stops at the line no matter if it's in coroutine or outside? Isn't it
but withContext suspends while that blocking code runs... so in your case it suspends and outer coroutine will run the next forEach, but in Louis case it suspends on first cycle of worker, so the workers will be executed sequentially
y
g

gildor

06/06/2018, 7:24 AM
This is not about blocking or not blocking
o

Olekss

06/06/2018, 7:24 AM
yea looking at internals of withContext, it finally runs the executor, so the block passed to it is put on thread... your case, will 5 times put the code block on one thread, but Luis case puts all the workers sequentially to run on one dispatched thread... so if first worker takes long time, the next ones even won't start until first one finished
g

gildor

06/06/2018, 7:25 AM
is about what will be generated on coroutine side: how many states in state machine
o

Olekss

06/06/2018, 7:25 AM
ok
g

gildor

06/06/2018, 7:26 AM
again, it depends on what you want to achive, withContext is operations that waits for result
if you just need a pool of coroutines that work in parallel, than you need not neither withContext nor async{}.await
o

Olekss

06/06/2018, 7:26 AM
I want to achieve, that 5 workers START in parallel, but execution order is independent and they can each one take as much as they need, and I need to wait until all 5 finish eventually, and only then take next portion of data to give again for those 5 workers to work on
but each of the workers is blocking
g

gildor

06/06/2018, 7:28 AM
Yes, you need worker pool pattern with channel
o

Olekss

06/06/2018, 7:29 AM
yes, only how do I arrange it effectively with coroutine framework... I was thinking Ineed like 5 asyncs and then 5 awaits after those 1st ones started
but I don't need results. so here " await" is like artificial suspension
g

gildor

06/06/2018, 7:30 AM
You need async only if you want to get result
o

Olekss

06/06/2018, 7:30 AM
now I think I need something like launch { in = inputChannel.receive() launch { withContext(5ThreadPoolContext) { workers.foreach{doWork(in))} } }
g

gildor

06/06/2018, 7:31 AM
you don;’t need withContext there, you can pass context to launch directly
o

Olekss

06/06/2018, 7:33 AM
yea... but my thinking (where is it wrong) -> First launch in general runs in infinite loop which reads channel second launch is to "consolidate" inner coroutiens when they finish, so the 3rd one - probably can be launch too -> is the one which is run for each worker so somehow 3 launches one inside another to get that pooled effect?
g

gildor

06/06/2018, 7:34 AM
Check samples from this issue. There is example of
map
for ReceiveChannel where you can pass your IO context and do some operation (even blocking) and map some values with required parallelism
o

Olekss

06/06/2018, 7:35 AM
ahhh thanks... when you say so, I actually myself did paralell map some time ago, which was working... thanks will look at it
g

gildor

06/06/2018, 7:35 AM
Don’t forget to pass dispatcher suitable for blocking operations to context
actually pattern is simple, you run so many coroutines how many workers do you want (you also can tune dispatcher for those workers passing context), each of them reads value from an input channel, modifies and send modified value to another, output channel (or for simple version just do something with this value without returning result to another channel). When you run all the workers you just wait when work will be finished (join() on each of them)
o

Olekss

06/06/2018, 7:42 AM
Here is my blog post some time ago, where I achieved something similar to what is proposed in the issue... it was more like experiment, at how big data pool parallel map is effective
l

louiscad

06/06/2018, 12:34 PM
@Olekss I think this is what you're looking for:
Copy code
outputWorkers.map {
    launch(ZeroMQ) {
        doWork(it)
    }
}.forEach { it.join() }
g

gildor

06/06/2018, 2:07 PM
Yes, exactly, if you don't need results of work
5 Views