Is there a known way to prevent such behaviour
# coroutines
v
Is there a known way to prevent such behaviour
o
I'm not sure what you mean by "not respecting the threadpool context", are you doing
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
and expecting it not to switch?
v
I have something like:
Copy code
override val executor = ThreadPoolExecutor(
                affinity,
                MAX_IO_THREADS,
                DEFAULT_TTL_SECONDS_THREAD,
                TimeUnit.SECONDS,
                SynchronousQueue()
            ) { runnable ->
                Thread(runnable).also {
                    it.name = "io-pool-${it.id}"
                    it.isDaemon = true
                    it.priority = Thread.NORM_PRIORITY
                }
            }
now if I do
Copy code
withContext(executor.asCoroutineDispatcher()){
// ... 
}
I am expecting body to always execute on
ThreadPoolExecutor
I made
but I see that is not true
o
hmm, not sure -- personally I wouldn't expect that
v
Yep same here; but I see this getting switched to caller thread sometimes
Kotlin
1.3.72
and coroutines
1.3.5
@louiscad might have some ideas
l
@v0ldem0rt I have a no idea, because I'm not using
asCoroutineDispatcher
for
Executor
since
<http://Dispatchers.IO|Dispatchers.IO>
exists for my use cases. That said, if you can make a reproducer, simulating the high load you're referring to, I think that'd be a great path towards a fix after reporting on GitHub in kotlinx.coroutines project.
v
Let me do a quick test bench
BTW is there a way to limit the number of threads for
<http://Dispatchers.IO|Dispatchers.IO>
?
📖 1
o
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
that said, I know what's up
the
ExecutorCoroutineDispatcherImpl
puts tasks on the default executor when the executor rejects the task (due to your SynchronousQueue)
❤️ 1
I do think this is odd behavior, seems counter-intuitive
v
I have repro!!!
Copy code
object Application {

    @JvmStatic
    val IO = ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
        Thread(runnable).also {
            it.name = "io-pool-${it.id}"
            it.isDaemon = true
            it.priority = Thread.NORM_PRIORITY
        }
    }

    @JvmStatic
    fun main(args: Array<String>) {
        val dispatcher = IO.asCoroutineDispatcher()
        runBlocking(Dispatchers.Default) {
            println("Running on ${Thread.currentThread().name}")
            (1..10).map {
                CoroutineScope(dispatcher).async {
                    println("Thread using ${Thread.currentThread().name}")
                    Thread.sleep(1000)
                }
            }.awaitAll()
        }
    }
}
o
smaller repro:
Copy code
val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
    Thread(runnable).also {
        it.name = "io-pool-${it.id}"
        it.isDaemon = true
        it.priority = Thread.NORM_PRIORITY
    }
}

fun main() {
    runBlocking(executor.asCoroutineDispatcher()) {
        for (i in 1..10) {
            launch {
                println(Thread.currentThread().name)
                Thread.sleep(1000)
            }
        }
    }
}
might be a good idea to file an issue with this, I don't think this behavior is mentioned in the docs
v
@octylFractal can you point me to code in
ExecutorCoroutineDispatcherImpl
that does so
I am gonna open up an issue
o
it's actually in `ExecutorCoroutineDispatcherBase`:
Copy code
override fun dispatch(context: CoroutineContext, block: Runnable) {
        try {
            executor.execute(wrapTask(block))
        } catch (e: RejectedExecutionException) {
            unTrackTask()
            DefaultExecutor.enqueue(block)
        }
    }
v
a dang
so I risk thread explosion if my service is receiving shit ton of requests and the thread pool below is all choked up
o
potentially, yes. I'm not sure what else could happen here, maybe cancelling the block?
v
yep
o
the other option is to use an unbounded task queue
v
well both cases can risk running out of memory
o
that does risk OOM though, if the load is high enough
ah, yea, because the default dispatcher does that too
v
just like threads reject after task size coroutines should do that too
or atleast have an option to do so
l
In systems I've designed running on calling thread is usually right thing to do because that should cause back pressure
d
Limiting threads in
<http://Dispatchers.IO|Dispatchers.IO>
? Use a
Semaphore
!
l
i.e. runs on servlet thread it can't accept more connections
o
that's not what coroutines do here though, they switch to the default dispatcher
l
that depends on the executor rejection policy
it can be set to run on caller
or throw exception
But coroutines should respect rejection policy
Fallback should be opted in for.
It's a big loophole on assumptions/illusion
withContext
gives you
e
How do you suggest to respect rejection?
v
Just like cancellation have a specific exception. Or allow some configuration to propagate
RejectedExecutionException
If you want me to add proposal to how it should work I would be more than happy to do so
I can add it to Github ticket
e
Consider this code:
Copy code
launch {
    val file = openFile()
    try { .... delay(1000L) ... } 
    finally { file.close() }
}
What if
delay
continuation gets gets rejected when it resumes? We cannot simply abandon this coroutine by throwing
RejectedExectionException
somewhere. We also cannot just throw
RejectedExectionException
out of
delay
because it cannot be throwing it. The right thing to do, though, is to cancel the coroutine using
RejectedExectionException
as a cancellation cause.
It will still have to be scheduled to a default executor for completion, but as an already cancelled coroutine that is supposed to terminate quickly.
v
hmmm the more I think about it the more I am getting confused on what should happen to finally state machine; somebody could be putting or interacting with something on UI thread for example assuming his try finally is within the UI launch block.
You are right about how we can provide a cleaner finally story; and hence I am thinking some kind of
AbstractCoroutineContextElement
to disable the fallback behaviour.
that would by default keep the behavior same
Or maybe somekind of FallbackCoroutineContext that can control if we should throw or dispatch on
Default
dispatcher.
d
I don't think UI thread does rejection.
Also, I think all the logic for this should be confined to the dispatcher. For different behaviour, provide different dispatcher impl.
v
Hmmm so would it make sense to have a Dispatcher that doesn't fall back so that people can explicitly opt into that behavior ?
I wonder how existing libraries like Reactor and RxJava provide similar mechanism when threadpool rejects