hey do I understand correctly that launch { x = as...
# coroutines
o
hey do I understand correctly that launch { x = async {...};y=async{}; x.await(); y.await()} will try to take 3 threads from ForkJoinPool and in case of 1core computer 2nd async simply will not start until first finished even without "await"?
g
No, all those coroutines run on CommonPool dispatcher which has
numCpu - 1
threads
Those coroutines are non-blocking, so probably handled by the same thread in this case even on multi-core CPI. Also both
async
will be run in parallel, but of course if you have only one thread both of them would be dispatch sequentially, but because there is no blocking it’s just a method call
On my 8 hyper thread machine both async handled by different threads from CommonPool
But if you code inside
async
is non blocking even with 1 thread both coroutines would be run in parallel in the same thread (with context switch of course) and
await()
doesn’t lock thread
You can easily check this, just put logs after each await with timestamp and thread name
o
my confusion with coroutines (still) is their relationship with physical hardware threads... because I have case where I have code something like launch { }
ooops
g
Think about coroutines as about callbacks
You can create a lot of callbacks and invoke them at the same or different threads
main difference, that with coroutine builder you also specify where this callback should be dispatched (by default in CommonPool)
o
launch { in = inputchannel.receive outputWorkers.forEach { async{doWork(in)}.await()} } And I somewhat worried that if my doWork(in) of the first element takes long time, then other doWorks will wait in wain
g
wat is
doWork
?
is that blocking operation?
o
doWork in general is call into ZeroMQ send
g
And this call is blocking or not?
o
with greedy pirate algorithm, so it can take a while while it returns
g
If this call to 0mq using some callbacks + coroutine adapter it’s non-blocking, otherwise it’s blocking and you shouldn’t wrap it to coroutines with CommonPool
o
if other end is down, then particular call my take like 12 seconds to finish
thats the problem, no callbacks
I feel I have to write my own wrapper around it to make it with callbacks
g
see, “12 seconds to finish” doesn’t mean that something is blocking or not, it’s more about callbacks vs thread blocking
o
it is blocking... of course... zeromq is pretty strightforward...
g
Yes, if 0mq lib has non-blocking API is the best choice in case of coroutines
but, even if you don’t have non-blocking API you can use it with coroutines
just use custom dispather that used only for such calls
o
currently I am using it with coroutines... only parts which are blocking - well they are blocking
g
Something like this:
Copy code
val ZeroMQ = newFixedThreadPoolContext(ZERO_MQ_THREAD_POOL_SIZE, "ZeroMQ")
Copy code
launch {
in = inputchannel.receive
outputWorkers.forEach { async(ZeroMQ) {doWork(in)}.await()}
}
so now you don’t have problems with blocking of CommonPool
o
but async will dispatch into CommonPool, in case if I have several threadss... wouldn't it dispatch each blocking call onto it's own thread?
g
async will dispatch into CommonPool
No, see my example
Yes, you right, blocking calls must be dispatched by some special dispatcher
and this dispatcher should be configured according your requirements (ammount of parallel calls, threads lifecycle etc)
o
so if I stick up my dispatcher and I have like don't know 4-5 threads reserved, then I am good?
g
Yes
o
thanks... sounds about nice exercise 🙂
g
See, CommonPool is default pool that used by default and intended to use only with non-blocking calls, even 1 thread usually is enough to dispatch all non-blocking calls (tho, there is an issue about this to allow configure minimal amount of threads in CommonPool)
o
but when we have callback based invocations of parallel tasks (think Firebase on Android) then this "dispatching" problem is problem of FB implementation, and I only write my Coroutine wrapper as per examples which wraps the callback, and I even don't worry about dispatching as it is hidden under the hood of FB
g
But if you want to use some blocking API (blocking file io, blocking network, DB etc) it would be better to wrap such calls to own custom thread pool
Correct, usually libraries that provide some asyn API use own dispatching mechanism under the hood: 1. real non-blocking API, like java.nio 2. blocking API wrapped to some thread pool
o
yea that is what is this confusing part of Coroutines documentation, as all the examples start with fun doSomeBigWork() And then show how sticking this into async or launch servse the problem... But in reality, if it's blocking, it IS blocking... and coroutines won't save if you have long-running file-crunching anyway
g
Coroutine cannot magically convert your blocking code to non-blocking
o
well stick this one sentence as first on coroutine documetnation 🙂 people get confused 🙂
ok... I think I understood how I need to tailor my code now
g
BTW, You don’t need asunc/await here
outputWorkers.forEach { async(ZeroMQ) {doWork(in)}.await()}
withContext would be enough:
Copy code
outputWorkers.forEach {
    withContext(ZeroMQ) { doWork(in) }
}
l
I think this is more efficient:
Copy code
withContext(ZeroMQ) {
    outputWorkers.forEach {
        doWork(it); yield()
    }
}
g
Yes, you right, better to consume on the same thread
l
Neither version consumes on a single thread since ZeroMQ dispatcher could be thread pool based, and both invoke a suspending function, which means it then resumes in the coroutineContext, on any of its threads. But my version avoids having two suspension points (withContext can suspend before and after its block) when you can have only one suspension point instead (yield makes it cooperative with other coroutines running on the same dispatcher)
g
Good point about two suspension points, sorry for tautology 😅
o
I don't quite understand how it works, but looks like it's working
g
What works? yield?
o
no worker dividing... I printed thread on which actual zeromq send happens, and they are different > I gave 5 threads to that newContext as you mentioned above
g
It’s just a pool of tasks that should be dispatched, so this pull processed by coroutine dispatcher
o
pool?
g
haha, yes, end of working day %)
even non-blocking coroutines should be dispatched, by current thread (using Unconfined dispathcher) or by any other thread or pool of threads
o
@gildor I am trying to understand yours and Louis code difference... What I want (and I can't quite test it on my case) to have like 5 workers (potentially) work in parallel (litearry) so that all of them start at the same time, but all finish when all are finished... So I am using coroutines not for starting in parallel, but be sure that when I exit the code block, all of them has finished... I don't care of order of execution, I care that noone blocks another for startup
g
There is no real difference, version of Louis little bit more efficient, because have one less suspension point (point where coroutine dispatch callback and do some other work)
o
then I either don't undersatnd coroutines yet, or I am too embedded in Java thinking... If the worker.dowork is blocking code, then the execution of like (execution pointer) stops at the line no matter if it's in coroutine or outside? Isn't it
but withContext suspends while that blocking code runs... so in your case it suspends and outer coroutine will run the next forEach, but in Louis case it suspends on first cycle of worker, so the workers will be executed sequentially
y
g
This is not about blocking or not blocking
o
yea looking at internals of withContext, it finally runs the executor, so the block passed to it is put on thread... your case, will 5 times put the code block on one thread, but Luis case puts all the workers sequentially to run on one dispatched thread... so if first worker takes long time, the next ones even won't start until first one finished
g
is about what will be generated on coroutine side: how many states in state machine
o
ok
g
again, it depends on what you want to achive, withContext is operations that waits for result
if you just need a pool of coroutines that work in parallel, than you need not neither withContext nor async{}.await
o
I want to achieve, that 5 workers START in parallel, but execution order is independent and they can each one take as much as they need, and I need to wait until all 5 finish eventually, and only then take next portion of data to give again for those 5 workers to work on
but each of the workers is blocking
g
Yes, you need worker pool pattern with channel
o
yes, only how do I arrange it effectively with coroutine framework... I was thinking Ineed like 5 asyncs and then 5 awaits after those 1st ones started
but I don't need results. so here " await" is like artificial suspension
g
You need async only if you want to get result
o
now I think I need something like launch { in = inputChannel.receive() launch { withContext(5ThreadPoolContext) { workers.foreach{doWork(in))} } }
g
you don;’t need withContext there, you can pass context to launch directly
o
yea... but my thinking (where is it wrong) -> First launch in general runs in infinite loop which reads channel second launch is to "consolidate" inner coroutiens when they finish, so the 3rd one - probably can be launch too -> is the one which is run for each worker so somehow 3 launches one inside another to get that pooled effect?
g
Check samples from this issue. There is example of
map
for ReceiveChannel where you can pass your IO context and do some operation (even blocking) and map some values with required parallelism
o
ahhh thanks... when you say so, I actually myself did paralell map some time ago, which was working... thanks will look at it
g
Don’t forget to pass dispatcher suitable for blocking operations to context
actually pattern is simple, you run so many coroutines how many workers do you want (you also can tune dispatcher for those workers passing context), each of them reads value from an input channel, modifies and send modified value to another, output channel (or for simple version just do something with this value without returning result to another channel). When you run all the workers you just wait when work will be finished (join() on each of them)
o
Here is my blog post some time ago, where I achieved something similar to what is proposed in the issue... it was more like experiment, at how big data pool parallel map is effective
l
@Olekss I think this is what you're looking for:
Copy code
outputWorkers.map {
    launch(ZeroMQ) {
        doWork(it)
    }
}.forEach { it.join() }
g
Yes, exactly, if you don't need results of work