Hey, I am researching different Network Clients in...
# ktor
m
Hey, I am researching different Network Clients in terms of the number of threads they use. If I understand it right, Ktor Client with CIO should not block threads under the hood, but when I check active threads, it seems that it starts a lot of threads from the common pool. Here is a complete experiment:
Copy code
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.time.measureTime


val client = HttpClient(CIO) {
    engine {
        endpoint.connectTimeout = 100_000
    }
}

suspend fun getDelayedResponse(seconds: Int, a: Int) {
    client.get("<https://api.kt.academy/delay>") {
        parameter("delay", seconds)
        parameter("a", a)
    }
}

suspend fun main() = measureTime {
    runBlocking {
        repeat(1000) {
            launch {
                getDelayedResponse(2000, it)
            }
        }
        delay(1000)
        Thread.getAllStackTraces().keys.filter { it.isAlive }.map { it.name }.let {
            println("Active threads: ${it.size}")
            println(it)
        }
    }
}.let { println("Took ${it.inWholeSeconds}") }
It shows, that there are 89 active threads, 81 are DefaultDispatcher-worker-X, and the whole process takes 23 seconds (what is consistent with 81 active requests). Does it mean that CIO also requires a thread per active request?
a
Can you tell how you measure the number of active requests?
m
I don't, it is just a guess based on the number of threads used by dispatcher. As you can see in the code, I send 1000 requests, each requires 2000 ms, and overall the process takes 23 seconds. 1000 / 81 * 2s = 24.69, so it should be a bit more than 81 on average.
e
Hey @marcinmoskala, sorry for the delay. We don't explicitly start a thread for every request, but can launch a coroutine on the IO dispatcher. It may lead to starting thread if there are no threads available. Let me log an issue to check this
m
Aren't you worried that all IO threads get blocked by other part of application? If this is the case, that this client starts those threads but quickly stops using them, I should be able to confirm that by trying to use the same threads for something else. I will think of some code.
e
IO dispatcher should be able to start some new in case of blocking, so it shouldn't be an issue. Please check the KDocs for more details: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html
m
It is only if you use
limitedParallelism
. Otherwise it is limited, and it has one limit for the whole app.
e
yep, and we offload blocking operations on separate dispatcher
m
Ok, I can see you are not really keeping those threads blocked, as I can use them for something else, but do you block them in the first place?
Copy code
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.measureTime

val client = HttpClient(CIO) {
    engine {
        maxConnectionsCount = 1000
        endpoint.connectTimeout = 100_000
        requestTimeout = 100_000
    }
}

suspend fun getDelayedResponse(seconds: Int, a: Int) {
    client.get("<https://api.kt.academy/delay>") {
        parameter("delay", seconds)
        parameter("a", a)
    }
}

suspend fun main() = measureTime {
    runBlocking {
        val finished = AtomicInteger(0)
        repeat(1000) {
            launch {
                getDelayedResponse(3000, it)
                finished.incrementAndGet()
            }
        }
        delay(1000)
        printActiveThreads()
        repeat(50) {
            launch(Dispatchers.IO) {
                Thread.sleep(1000)
            }
        }
        delay(100)
        printActiveThreads()
        println(finished.get())
    }
}.let { println("Took ${it.inWholeSeconds}") }

fun printActiveThreads() {
    Thread.getAllStackTraces().keys.filter { it.isAlive }.map { it.name }.let {
        println("Active threads: ${it.size}")
        println(it)
    }
}

/*
Active threads: 93
[DefaultDispatcher-worker-16, DefaultDispatcher-worker-81, DefaultDispatcher-worker-38, DefaultDispatcher-worker-35, DefaultDispatcher-worker-58, DefaultDispatcher-worker-85, DefaultDispatcher-worker-13, DefaultDispatcher-worker-4, DefaultDispatcher-worker-28, DefaultDispatcher-worker-56, DefaultDispatcher-worker-57, DefaultDispatcher-worker-77, DefaultDispatcher-worker-73, DefaultDispatcher-worker-75, DefaultDispatcher-worker-61, DefaultDispatcher-worker-18, DefaultDispatcher-worker-41, Signal Dispatcher, DefaultDispatcher-worker-45, DefaultDispatcher-worker-36, DefaultDispatcher-worker-49, Monitor Ctrl-Break, DefaultDispatcher-worker-47, DefaultDispatcher-worker-62, DefaultDispatcher-worker-6, DefaultDispatcher-worker-29, DefaultDispatcher-worker-30, Reference Handler, DefaultDispatcher-worker-50, DefaultDispatcher-worker-68, DefaultDispatcher-worker-15, DefaultDispatcher-worker-20, DefaultDispatcher-worker-43, DefaultDispatcher-worker-65, DefaultDispatcher-worker-54, DefaultDispatcher-worker-37, DefaultDispatcher-worker-19, Common-Cleaner, DefaultDispatcher-worker-52, DefaultDispatcher-worker-10, DefaultDispatcher-worker-51, DefaultDispatcher-worker-70, DefaultDispatcher-worker-1, DefaultDispatcher-worker-25, DefaultDispatcher-worker-33, DefaultDispatcher-worker-14, DefaultDispatcher-worker-78, DefaultDispatcher-worker-44, DefaultDispatcher-worker-2, DefaultDispatcher-worker-69, DefaultDispatcher-worker-8, DefaultDispatcher-worker-40, DefaultDispatcher-worker-60, DefaultDispatcher-worker-53, DefaultDispatcher-worker-42, DefaultDispatcher-worker-74, DefaultDispatcher-worker-83, Finalizer, DefaultDispatcher-worker-48, DefaultDispatcher-worker-24, DefaultDispatcher-worker-21, DefaultDispatcher-worker-9, DefaultDispatcher-worker-17, DefaultDispatcher-worker-64, DefaultDispatcher-worker-72, DefaultDispatcher-worker-23, DefaultDispatcher-worker-3, DefaultDispatcher-worker-84, DefaultDispatcher-worker-71, DefaultDispatcher-worker-31, DefaultDispatcher-worker-63, DefaultDispatcher-worker-79, DefaultDispatcher-worker-7, DefaultDispatcher-worker-26, DefaultDispatcher-worker-46, DefaultDispatcher-worker-22, main, DefaultDispatcher-worker-34, DefaultDispatcher-worker-80, kotlinx.coroutines.DefaultExecutor, DefaultDispatcher-worker-55, DefaultDispatcher-worker-39, DefaultDispatcher-worker-5, DefaultDispatcher-worker-66, DefaultDispatcher-worker-11, DefaultDispatcher-worker-59, DefaultDispatcher-worker-76, DefaultDispatcher-worker-82, DefaultDispatcher-worker-32, Notification Thread, DefaultDispatcher-worker-27, DefaultDispatcher-worker-12, DefaultDispatcher-worker-67]
Active threads: 93
[DefaultDispatcher-worker-16, DefaultDispatcher-worker-81, DefaultDispatcher-worker-38, DefaultDispatcher-worker-35, DefaultDispatcher-worker-58, DefaultDispatcher-worker-85, DefaultDispatcher-worker-13, DefaultDispatcher-worker-4, DefaultDispatcher-worker-28, DefaultDispatcher-worker-56, DefaultDispatcher-worker-57, DefaultDispatcher-worker-77, DefaultDispatcher-worker-73, DefaultDispatcher-worker-75, DefaultDispatcher-worker-61, DefaultDispatcher-worker-18, DefaultDispatcher-worker-41, Signal Dispatcher, DefaultDispatcher-worker-45, DefaultDispatcher-worker-36, DefaultDispatcher-worker-49, Monitor Ctrl-Break, DefaultDispatcher-worker-47, DefaultDispatcher-worker-62, DefaultDispatcher-worker-6, DefaultDispatcher-worker-29, DefaultDispatcher-worker-30, Reference Handler, DefaultDispatcher-worker-50, DefaultDispatcher-worker-68, DefaultDispatcher-worker-15, DefaultDispatcher-worker-20, DefaultDispatcher-worker-43, DefaultDispatcher-worker-65, DefaultDispatcher-worker-54, DefaultDispatcher-worker-37, DefaultDispatcher-worker-19, Common-Cleaner, DefaultDispatcher-worker-52, DefaultDispatcher-worker-10, DefaultDispatcher-worker-51, DefaultDispatcher-worker-70, DefaultDispatcher-worker-1, DefaultDispatcher-worker-25, DefaultDispatcher-worker-33, DefaultDispatcher-worker-14, DefaultDispatcher-worker-78, DefaultDispatcher-worker-44, DefaultDispatcher-worker-2, DefaultDispatcher-worker-69, DefaultDispatcher-worker-8, DefaultDispatcher-worker-40, DefaultDispatcher-worker-60, DefaultDispatcher-worker-53, DefaultDispatcher-worker-42, DefaultDispatcher-worker-74, DefaultDispatcher-worker-83, Finalizer, DefaultDispatcher-worker-48, DefaultDispatcher-worker-24, DefaultDispatcher-worker-21, DefaultDispatcher-worker-9, DefaultDispatcher-worker-17, DefaultDispatcher-worker-64, DefaultDispatcher-worker-72, DefaultDispatcher-worker-23, DefaultDispatcher-worker-3, DefaultDispatcher-worker-84, DefaultDispatcher-worker-71, DefaultDispatcher-worker-31, DefaultDispatcher-worker-63, DefaultDispatcher-worker-79, DefaultDispatcher-worker-7, DefaultDispatcher-worker-26, DefaultDispatcher-worker-46, DefaultDispatcher-worker-22, main, DefaultDispatcher-worker-34, DefaultDispatcher-worker-80, kotlinx.coroutines.DefaultExecutor, DefaultDispatcher-worker-55, DefaultDispatcher-worker-39, DefaultDispatcher-worker-5, DefaultDispatcher-worker-66, DefaultDispatcher-worker-11, DefaultDispatcher-worker-59, DefaultDispatcher-worker-76, DefaultDispatcher-worker-82, DefaultDispatcher-worker-32, Notification Thread, DefaultDispatcher-worker-27, DefaultDispatcher-worker-12, DefaultDispatcher-worker-67]
0
*/
e
in CIO we have a specific thread for blocking select operation. So it shouldn't be the case, could you log an issue with a snippet? We will take a look
m
Do you mean output? Here it is:
Copy code
Active threads: 87
[DefaultDispatcher-worker-16, DefaultDispatcher-worker-37, DefaultDispatcher-worker-35, DefaultDispatcher-worker-59, DefaultDispatcher-worker-13, DefaultDispatcher-worker-4, DefaultDispatcher-worker-28, DefaultDispatcher-worker-53, DefaultDispatcher-worker-57, DefaultDispatcher-worker-78, DefaultDispatcher-worker-73, DefaultDispatcher-worker-75, DefaultDispatcher-worker-61, DefaultDispatcher-worker-18, DefaultDispatcher-worker-42, Signal Dispatcher, DefaultDispatcher-worker-45, DefaultDispatcher-worker-36, DefaultDispatcher-worker-49, Monitor Ctrl-Break, DefaultDispatcher-worker-47, DefaultDispatcher-worker-62, DefaultDispatcher-worker-6, DefaultDispatcher-worker-29, DefaultDispatcher-worker-30, Reference Handler, DefaultDispatcher-worker-50, DefaultDispatcher-worker-69, DefaultDispatcher-worker-15, DefaultDispatcher-worker-20, DefaultDispatcher-worker-43, DefaultDispatcher-worker-65, DefaultDispatcher-worker-56, DefaultDispatcher-worker-38, DefaultDispatcher-worker-19, Common-Cleaner, DefaultDispatcher-worker-55, DefaultDispatcher-worker-10, DefaultDispatcher-worker-52, DefaultDispatcher-worker-70, DefaultDispatcher-worker-1, DefaultDispatcher-worker-25, DefaultDispatcher-worker-33, DefaultDispatcher-worker-14, DefaultDispatcher-worker-79, DefaultDispatcher-worker-44, DefaultDispatcher-worker-2, DefaultDispatcher-worker-68, DefaultDispatcher-worker-8, DefaultDispatcher-worker-39, DefaultDispatcher-worker-60, DefaultDispatcher-worker-51, DefaultDispatcher-worker-41, DefaultDispatcher-worker-74, Finalizer, DefaultDispatcher-worker-48, DefaultDispatcher-worker-24, DefaultDispatcher-worker-22, DefaultDispatcher-worker-9, DefaultDispatcher-worker-17, DefaultDispatcher-worker-64, DefaultDispatcher-worker-77, DefaultDispatcher-worker-23, DefaultDispatcher-worker-3, DefaultDispatcher-worker-71, DefaultDispatcher-worker-31, DefaultDispatcher-worker-63, DefaultDispatcher-worker-72, DefaultDispatcher-worker-7, DefaultDispatcher-worker-26, DefaultDispatcher-worker-46, kotlinx.coroutines.DefaultExecutor, main, DefaultDispatcher-worker-34, DefaultDispatcher-worker-21, DefaultDispatcher-worker-54, DefaultDispatcher-worker-40, DefaultDispatcher-worker-5, DefaultDispatcher-worker-66, DefaultDispatcher-worker-11, DefaultDispatcher-worker-58, DefaultDispatcher-worker-76, DefaultDispatcher-worker-32, Notification Thread, DefaultDispatcher-worker-27, DefaultDispatcher-worker-12, DefaultDispatcher-worker-67]
Active threads: 87
[DefaultDispatcher-worker-16, DefaultDispatcher-worker-37, DefaultDispatcher-worker-35, DefaultDispatcher-worker-59, DefaultDispatcher-worker-13, DefaultDispatcher-worker-4, DefaultDispatcher-worker-28, DefaultDispatcher-worker-53, DefaultDispatcher-worker-57, DefaultDispatcher-worker-78, DefaultDispatcher-worker-73, DefaultDispatcher-worker-75, DefaultDispatcher-worker-61, DefaultDispatcher-worker-18, DefaultDispatcher-worker-42, Signal Dispatcher, DefaultDispatcher-worker-45, DefaultDispatcher-worker-36, DefaultDispatcher-worker-49, Monitor Ctrl-Break, DefaultDispatcher-worker-47, DefaultDispatcher-worker-62, DefaultDispatcher-worker-6, DefaultDispatcher-worker-29, DefaultDispatcher-worker-30, Reference Handler, DefaultDispatcher-worker-50, DefaultDispatcher-worker-69, DefaultDispatcher-worker-15, DefaultDispatcher-worker-20, DefaultDispatcher-worker-43, DefaultDispatcher-worker-65, DefaultDispatcher-worker-56, DefaultDispatcher-worker-38, DefaultDispatcher-worker-19, Common-Cleaner, DefaultDispatcher-worker-55, DefaultDispatcher-worker-10, DefaultDispatcher-worker-52, DefaultDispatcher-worker-70, DefaultDispatcher-worker-1, DefaultDispatcher-worker-25, DefaultDispatcher-worker-33, DefaultDispatcher-worker-14, DefaultDispatcher-worker-79, DefaultDispatcher-worker-44, DefaultDispatcher-worker-2, DefaultDispatcher-worker-68, DefaultDispatcher-worker-8, DefaultDispatcher-worker-39, DefaultDispatcher-worker-60, DefaultDispatcher-worker-51, DefaultDispatcher-worker-41, DefaultDispatcher-worker-74, Finalizer, DefaultDispatcher-worker-48, DefaultDispatcher-worker-24, DefaultDispatcher-worker-22, DefaultDispatcher-worker-9, DefaultDispatcher-worker-17, DefaultDispatcher-worker-64, DefaultDispatcher-worker-77, DefaultDispatcher-worker-23, DefaultDispatcher-worker-3, DefaultDispatcher-worker-71, DefaultDispatcher-worker-31, DefaultDispatcher-worker-63, DefaultDispatcher-worker-72, DefaultDispatcher-worker-7, DefaultDispatcher-worker-26, DefaultDispatcher-worker-46, kotlinx.coroutines.DefaultExecutor, main, DefaultDispatcher-worker-34, DefaultDispatcher-worker-21, DefaultDispatcher-worker-54, DefaultDispatcher-worker-40, DefaultDispatcher-worker-5, DefaultDispatcher-worker-66, DefaultDispatcher-worker-11, DefaultDispatcher-worker-58, DefaultDispatcher-worker-76, DefaultDispatcher-worker-32, Notification Thread, DefaultDispatcher-worker-27, DefaultDispatcher-worker-12, DefaultDispatcher-worker-67]
e
Could you please capture jstack of these threads?
m
There is a race condition for how many threads it starts, but after starting more on IO, this number never changes. So it looks like every request starts some rather short blocking operation, and then does not need blocking.
One moment
I will just check one more thing
e
let me know if you need any help
m
There is something in this hipothesis. When I introduce some delay between starting next network requests, there are less threads started:
Copy code
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.measureTime

val client = HttpClient(CIO) {
    engine {
        maxConnectionsCount = 1000
        endpoint.connectTimeout = 100_000
        requestTimeout = 100_000
    }
}

suspend fun getDelayedResponse(seconds: Int, a: Int) {
    client.get("<https://api.kt.academy/delay>") {
        parameter("delay", seconds)
        parameter("a", a)
    }
}

suspend fun main() = measureTime {
    runBlocking {
        val finished = AtomicInteger(0)
        repeat(1000) {
            delay(10)
            launch {
                getDelayedResponse(30000, it)
                finished.incrementAndGet()
            }
        }
        delay(10100)
        printActiveThreads() // 50
        repeat(50) {
            launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                Thread.sleep(1000)
            }
        }
        delay(100)
        printActiveThreads()
        println(finished.get())
    }
}.let { println("Took ${it.inWholeSeconds}") }

fun printActiveThreads() {
    Thread.getAllStackTraces().keys.filter { it.isAlive }.map { it.name }.let {
        println("Active threads: ${it.size}")
        println(it)
    }
}

/*
Active threads: 68
[Signal Dispatcher, DefaultDispatcher-worker-25, DefaultDispatcher-worker-16, DefaultDispatcher-worker-39, main, DefaultDispatcher-worker-9, DefaultDispatcher-worker-37, DefaultDispatcher-worker-38, DefaultDispatcher-worker-49, DefaultDispatcher-worker-54, DefaultDispatcher-worker-57, DefaultDispatcher-worker-18, Monitor Ctrl-Break, DefaultDispatcher-worker-17, DefaultDispatcher-worker-26, DefaultDispatcher-worker-22, DefaultDispatcher-worker-30, DefaultDispatcher-worker-28, DefaultDispatcher-worker-43, DefaultDispatcher-worker-10, DefaultDispatcher-worker-11, DefaultDispatcher-worker-31, DefaultDispatcher-worker-56, Finalizer, DefaultDispatcher-worker-1, DefaultDispatcher-worker-24, DefaultDispatcher-worker-46, DefaultDispatcher-worker-36, DefaultDispatcher-worker-19, Notification Thread, DefaultDispatcher-worker-34, DefaultDispatcher-worker-32, DefaultDispatcher-worker-51, DefaultDispatcher-worker-6, DefaultDispatcher-worker-12, Reference Handler, DefaultDispatcher-worker-59, DefaultDispatcher-worker-42, DefaultDispatcher-worker-50, DefaultDispatcher-worker-20, DefaultDispatcher-worker-41, DefaultDispatcher-worker-33, DefaultDispatcher-worker-23, DefaultDispatcher-worker-55, DefaultDispatcher-worker-29, kotlinx.coroutines.DefaultExecutor, DefaultDispatcher-worker-3, Common-Cleaner, DefaultDispatcher-worker-45, DefaultDispatcher-worker-53, DefaultDispatcher-worker-5, DefaultDispatcher-worker-52, DefaultDispatcher-worker-14, DefaultDispatcher-worker-44, DefaultDispatcher-worker-60, DefaultDispatcher-worker-7, DefaultDispatcher-worker-27, DefaultDispatcher-worker-4, DefaultDispatcher-worker-15, DefaultDispatcher-worker-2, DefaultDispatcher-worker-35, DefaultDispatcher-worker-21, DefaultDispatcher-worker-47, DefaultDispatcher-worker-40, DefaultDispatcher-worker-58, DefaultDispatcher-worker-13, DefaultDispatcher-worker-8, DefaultDispatcher-worker-48]
Active threads: 69
[Signal Dispatcher, DefaultDispatcher-worker-61, DefaultDispatcher-worker-25, DefaultDispatcher-worker-16, DefaultDispatcher-worker-39, main, DefaultDispatcher-worker-9, DefaultDispatcher-worker-37, DefaultDispatcher-worker-38, DefaultDispatcher-worker-49, DefaultDispatcher-worker-54, DefaultDispatcher-worker-57, DefaultDispatcher-worker-18, Monitor Ctrl-Break, DefaultDispatcher-worker-17, DefaultDispatcher-worker-26, DefaultDispatcher-worker-22, DefaultDispatcher-worker-30, DefaultDispatcher-worker-28, DefaultDispatcher-worker-43, DefaultDispatcher-worker-10, DefaultDispatcher-worker-11, DefaultDispatcher-worker-31, DefaultDispatcher-worker-56, Finalizer, DefaultDispatcher-worker-1, DefaultDispatcher-worker-24, DefaultDispatcher-worker-46, DefaultDispatcher-worker-36, DefaultDispatcher-worker-19, Notification Thread, DefaultDispatcher-worker-34, DefaultDispatcher-worker-32, DefaultDispatcher-worker-51, DefaultDispatcher-worker-6, DefaultDispatcher-worker-12, Reference Handler, DefaultDispatcher-worker-59, DefaultDispatcher-worker-42, DefaultDispatcher-worker-50, DefaultDispatcher-worker-20, DefaultDispatcher-worker-41, DefaultDispatcher-worker-33, DefaultDispatcher-worker-23, DefaultDispatcher-worker-55, DefaultDispatcher-worker-29, kotlinx.coroutines.DefaultExecutor, DefaultDispatcher-worker-3, Common-Cleaner, DefaultDispatcher-worker-45, DefaultDispatcher-worker-53, DefaultDispatcher-worker-5, DefaultDispatcher-worker-52, DefaultDispatcher-worker-14, DefaultDispatcher-worker-44, DefaultDispatcher-worker-60, DefaultDispatcher-worker-7, DefaultDispatcher-worker-27, DefaultDispatcher-worker-4, DefaultDispatcher-worker-15, DefaultDispatcher-worker-2, DefaultDispatcher-worker-35, DefaultDispatcher-worker-21, DefaultDispatcher-worker-47, DefaultDispatcher-worker-40, DefaultDispatcher-worker-58, DefaultDispatcher-worker-13, DefaultDispatcher-worker-8, DefaultDispatcher-worker-48]
0
 */
Screenshot 2024-03-04 at 10.28.36.png
@e5l
e
Could you share the stack for the hugest group of threads?
m
Copy code
"DefaultDispatcher-worker-2@1333" daemon prio=5 tid=0x13 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:376)
	  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.park(CoroutineScheduler.kt:847)
	  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.tryPark(CoroutineScheduler.kt:792)
	  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:740)
	  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:693)
image.png
e
it looks like all threads are parked in the dispatcher
m
Looks so, but what I am wondering is that why they were all started at the first place
Ok, so this is the current state of my research: https://kt.academy/article/network_client_threads But I need to make some tests on production application to make sure it is right.