Gilles Barbier
01/10/2024, 11:46 PMconcurrency
64 but reaches a dead lock for = 64??? (on my MacBook Pro)
private val consumingScope = CoroutineScope(Dispatchers.IO)
private val producingScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private var counter = 0
fun main(): Unit = startConsumer(concurrency = 64)
private fun processMessage(
pulsarMessage: String,
workerIndex: Int
) = producingScope.future {
println("Worker $workerIndex: start($pulsarMessage)")
delay(Random.nextLong(1000))
println("Worker $workerIndex: end ($pulsarMessage)")
}.join()
private fun startConsumer(concurrency: Int) = consumingScope.future {
val channel = Channel<String>()
// start processing coroutines
repeat(concurrency) {
launch {
for (message in channel) {
processMessage(message, it)
}
}
}
// loop sending messages to the channel
while (isActive) {
counter++
channel.send("$counter")
}
}.join()
Gilles Barbier
01/11/2024, 12:09 AMSam
01/11/2024, 8:19 AMprocessMessage
, you're calling CompletableFuture.join()
. This will block the thread until the future is complete. The future is running in the producingScope
, so it needs a thread from the IO dispatcher in order to complete its work. But processMessage
, called from the consumingScope
, is also running on the IO dispatcher. So the thread it blocks comes from the same thread pool that the future is trying to run on.Sam
01/11/2024, 8:20 AMSam
01/11/2024, 8:24 AMGilles Barbier
01/11/2024, 9:01 AMGilles Barbier
01/11/2024, 9:02 AMGilles Barbier
01/11/2024, 9:03 AMSam
01/11/2024, 9:12 AMSam
01/11/2024, 9:12 AMGilles Barbier
01/11/2024, 9:18 AMSam
01/11/2024, 9:25 AMGilles Barbier
01/11/2024, 9:36 AMprocessMessage
and was using coroutines for parallel processing. Also I was under the impression that coroutines was giving me a better framework for error management.Sam
01/11/2024, 9:45 AMprocessMessage
, which currently contains the random delay, may eventually contain multiple coroutines that run concurrently?Gilles Barbier
01/11/2024, 9:46 AMSam
01/11/2024, 9:55 AMsuspend fun pullMessages() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// run some blocking code to pull messages
}
suspend fun runSomeJavaCode() = withContext(myCustomThreadPoolDispatcher) {
// run some user-supplied Java code
}
That way, you can write your framework code using suspending functions on the default dispatcher.Gilles Barbier
01/11/2024, 9:58 AM