Gilles Barbier
01/10/2024, 11:46 PMconcurrency 64 but reaches a dead lock for = 64??? (on my MacBook Pro)
private val consumingScope = CoroutineScope(Dispatchers.IO)
private val producingScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private var counter = 0
fun main(): Unit = startConsumer(concurrency = 64)
private fun processMessage(
pulsarMessage: String,
workerIndex: Int
) = producingScope.future {
println("Worker $workerIndex: start($pulsarMessage)")
delay(Random.nextLong(1000))
println("Worker $workerIndex: end ($pulsarMessage)")
}.join()
private fun startConsumer(concurrency: Int) = consumingScope.future {
val channel = Channel<String>()
// start processing coroutines
repeat(concurrency) {
launch {
for (message in channel) {
processMessage(message, it)
}
}
}
// loop sending messages to the channel
while (isActive) {
counter++
channel.send("$counter")
}
}.join()Gilles Barbier
01/11/2024, 12:09 AMSam
01/11/2024, 8:19 AMprocessMessage, you're calling CompletableFuture.join(). This will block the thread until the future is complete. The future is running in the producingScope, so it needs a thread from the IO dispatcher in order to complete its work. But processMessage, called from the consumingScope, is also running on the IO dispatcher. So the thread it blocks comes from the same thread pool that the future is trying to run on.Sam
01/11/2024, 8:20 AMSam
01/11/2024, 8:24 AMGilles Barbier
01/11/2024, 9:01 AMGilles Barbier
01/11/2024, 9:02 AMGilles Barbier
01/11/2024, 9:03 AMSam
01/11/2024, 9:12 AMSam
01/11/2024, 9:12 AMGilles Barbier
01/11/2024, 9:18 AMSam
01/11/2024, 9:25 AMGilles Barbier
01/11/2024, 9:36 AMprocessMessage and was using coroutines for parallel processing. Also I was under the impression that coroutines was giving me a better framework for error management.Sam
01/11/2024, 9:45 AMprocessMessage, which currently contains the random delay, may eventually contain multiple coroutines that run concurrently?Gilles Barbier
01/11/2024, 9:46 AMSam
01/11/2024, 9:55 AMsuspend fun pullMessages() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// run some blocking code to pull messages
}
suspend fun runSomeJavaCode() = withContext(myCustomThreadPoolDispatcher) {
// run some user-supplied Java code
}
That way, you can write your framework code using suspending functions on the default dispatcher.Gilles Barbier
01/11/2024, 9:58 AM