How can I tell if any continuations are in the que...
# coroutines
t
How can I tell if any continuations are in the queue to be run, or if there is a job waiting in delay() etc?
o
I think that's an implementation detail, you would have to reflect into the dispatcher to figure that out
t
I created my own context, and part of that in writing you own dispatcher...
but it's not clear where they actual queue is.
I extended ExecutorCourtineDispatcher().
o
there is no queue to my knowledge -- if you use e.g.
Executor.asDispatcher()
, your executor is the queue
t
hmmm... so if a job calls delay() where does that context live?
o
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/Executors.kt#L94 looks like it passes off to the default executor's impl if you're not a ScheduledExecutorService
internally on the default executor, it appears to have its own queue of delayed tasks that it processes via event loop mechanics
but of course, that doesn't apply if you provide a ScheduledExecutorService
t
yeah.. I see. ScheduledExecutorService()
That's the problem. I'm running my own eventloop for a select() based app.
but I'm worried I'll miss jobs that have called delay() because I'm not checking the queue of timmed continuations.
o
you can implement
Delay
on your dispatcher if you would like to intercept that
... actually, that appears to be internal API right now
I'm curious what you mean by "miss jobs" though
t
so I launch{} all my "apps". They all end up calling the event loop and suspend on a network read or write or whatever. The network eventloop has a run() that just runs on the main thread and calls select().
but if one off the apps/jobs decides to call delay() and not suspend in the eventLoop, how would the eventLoop know?
I was starting to write my own version of delay() that plugged into my eventloop... but I was wondering if I could just make my eventLoop aware of kotlin's built in one...
o
why do you need to catch that delay?
I don't get why that's important -- it will still resume in your event loop
t
my event loop isn't running in a coroutine context. it's running on a real thread. It suspends and resumes other coroutines as they call IO that would block, but it isn't one itself.
o
right, that was my understanding
t
would it work if one of the coroutines called delay()? I guess maybe it would the next time the eventloop resumed some other coroutine, maybe it'd notice there is another one to be scheduled.
but the delay() time could be way off because my eventloop might be in select() longer that it should (because it's unaware of the scheduled delay)
o
hmmm, I see what you mean. personally I think that this a bit of a weird case -- you probably shouldn't be using select() in a dispatcher, it should be the implementation of how you
suspend
a blocking operation
t
this is all single threaded, if that wasn't clear.
the coroutines don't call select(). They ask the eventLoop for IO and it might suspend them if it isn't ready.
but someone needs to call select(). And it's always going to block.
or you burn a CPU in a busy loop and waste electricity.
o
right, that's what I'm saying, that "ask for IO" should be part of a
suspendCoroutine { ... }
block that hands the continuation over to a thread running
select()
t
that's the way it works. except there is only one thread.
o
right, but in this case you don't need to be a general dispatcher
you don't care about
delay
calls, they're handled by the actual dispatcher, whatever it is
t
yeah. but if the only thread is in select() the dispatcher can't dispatch when the time is up.
o
I'm confused how you're getting to that conclusion, the code inside
suspendCoroutine { ... }
will not be calling select, it is a hand-off to the single thread running select
once you call
cont.resume
in the select-thread, it goes back to the original dispatcher and starts executing there
t
so in my head I think of coroutines as old school cooperative multitasking with some neat syntactic sugar.
but if i don't call cont.resumt() in a "resonable" amount of time the delay() will "return" late. maybe very late.
actually. forget about my case. if you have a singleThreaded dispatches there is no guarantee delay() will every return at all. if some other coroutine does a while(1), yes?
o
yes, that's correct
what I don't get is you saying " if i don't call cont.resumt() in a "resonable" amount of time the delay() will "return" late. maybe very late."
in this scenario, you are in explicitly an IO call of some sort
t
select() == while(true)
o
I don't know how you handle it now, but I would imagine something like
suspend fun callIO() { suspendCoroutine { passToSelect() } }
when the coroutine calls
delay
, that never touches this code
t
hmm... upside down. All the coroutines are launched before the eventLoop starts. So they call end up getting suspendCoroutine() some where in the event loop. Which just stashes that context until later. Then after all the coroutines are launched the EventLoops's Run() is called, which is basically while(true) { select() }
there are stashed coroutines contexts assoicated w/ select keys, and some timer queues, etc)
so yes, any calls to delay are opaque to the eventLoop.
o
this sounds over-complicated?
t
I'm not sure how else it could be done?
o
can I see some of the actual code / how it is used?
t
it's single threaded network IO w/o callbacks. (or rather kotlin's compiler is generating the callbacks automagicaly)
so you can write what looks like blocking IO, that isn't.
o
right, so why do you need a dispatcher for this? stash the appropriate select handle in the TCPSocket instance, pass it to a single common thread inside the calls (if I recall how select() works properly)
t
that's what I'm doing.
but there is only one thread. total
o
so there is no dispatcher?
t
Not sure what you mean by a dispatcher
o
the thing that mounts a coroutine onto an actual thread
e.g.
Dispatchers.Default
,
<http://Dispatchers.IO|Dispatchers.IO>
t
oh... yeah.. sorry.. yeah. It's a single threaded one that just calls .run() on whatever it's handed.
o
right, you don't need that
t
I don't? how to I ensure that everything runs on the main thready only?
o
I mean for sure, you can use one separate from this discussion
but it shouldn't be managing the select calls too
t
how do I not manage the select() calls if there is only one thread?
o
what I'm saying is that this TCPSocket shouldn't care about which dispatcher it's on
you create a single global thread to process
select()
it is handed the
Continuation
from
suspendCancellableCoroutine
and the relevant info for
select()
when the data is ready, call
Continuation.resume
t
"handed" ? doesn't that imply another thread?
o
yes
the thread the coroutine is mounted on, and the select thread, are different threads (in my model here)
t
I can't have different threads.
o
because...?
t
the whole point is to only have one thread running in the entire VM ever. (except ones you can't control like GC, etc)
o
that seems highly restricting
t
scale and speed. context switches are eons for what I'm doing.
o
well, you're going to have a lot of trouble with select blocking other coroutines from doing anything at all
t
it is highly restricting... why would I bother to write another network framework besides the billions that are out there if it wasn't a restrictive problem?
correct. I've done this many times in the past, but it's all been callback based. and yes, you need to make sure the callbacks all return immediately. This is no different, except it's easier to read....
o
I don't think the coroutines library is designed to solve your problem, without hacking away at the internals
t
maybe not. But I thought I'd try because it looks nicer to use than callbacks.
o
right
you can still use
suspend
but I would forget about the coroutines library itself
t
not sure I understand the difference...?
you mean, just don't use delay()?
o
I mean remove the entire
coroutines
library from your classpath
suspend
is a kotlin compiler feature, you don't need the library to use it, see e.g.
Sequence
which is
suspend
-based, but not from the coroutine lib
t
isn't suspendCoroutine() in the library?
and launch{} ?
o
yes,
launch {}
is part of that library
no
suspendCoroutine
is not part of that library, it's part of the stdlib
t
I see... hmmm...intresting. I didn't know that.
how do you start a coroutine then?
see line 39,
iterator()
basically, you call
block.createCoroutineUnintercepted
, and then
resume
it wherever you want it to start
t
oh yeah... I see.
o
in this case, the sequence builder resumes it in
hasNext
t
yeah... this is way better... I'll have to write my own delay() if I want one, but.... I don't have to fight w/ coroutineContexts and stuff.
o
yes, exactly, it's very low level, basically just the suspend -> "callback" system
t
yeah... this is exactly what I want!
wow! so mega useful! Thank you so much!
now I can charge at my stupid windmill better!