dave08
04/23/2018, 12:12 PMApplicationCall
context, what's the better practice, to use suspending functions, withContext(CommonPool)
, or async { }
(since some processing can be done in parallel..)?cy
04/23/2018, 12:38 PMcy
04/23/2018, 12:39 PMasync
you can launch multiple taskscy
04/23/2018, 12:40 PMasync
could be called in non-suspend functiondave08
04/23/2018, 12:42 PMCommonPool
for this? Also, does each request have it's own coroutine running in a thread pool, so that if one request is held up by a blocking process, the other one would be handled?cy
04/23/2018, 12:51 PMdave08
04/23/2018, 12:55 PMsuspend fun
doing a single blocking process in a request, the other requests won't be blocked (or are they all running on the same thread... and I will need to do withContext(CommonPool)
for it)?cy
04/23/2018, 1:04 PMwithContext
launches a new child coroutine on a specificed context (separate thread pool) while the original one is suspended until the child will complete or crashcy
04/23/2018, 1:05 PMdave08
04/23/2018, 1:09 PMsuspend fun
w/o withContext
, then no other requests are handled meanwhile? I haven't found anything in the docs about these points (in Vert.x they write it a few time DON'T BLOCK THE EVENT LOOP... same in Ktor?)cy
04/23/2018, 1:17 PMcy
04/23/2018, 1:18 PMdave08
04/23/2018, 1:19 PMcy
04/23/2018, 1:20 PMDeactivated User
04/23/2018, 1:40 PMdave08
04/23/2018, 1:51 PMdave08
04/23/2018, 1:52 PMcy
04/23/2018, 1:59 PMFileItem
is always ByteArrayInputStream
or FileInputStream
and never from the socketdave08
04/23/2018, 2:00 PMcy
04/23/2018, 2:05 PMcy
04/23/2018, 2:06 PMAsynchronousFileChannel
Deactivated User
04/23/2018, 2:10 PMdave08
04/23/2018, 2:11 PMcy
04/23/2018, 2:20 PMDeactivated User
04/23/2018, 2:20 PMasync(ioCoroutineDispatcher) {
part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyTo(it) } }
}.await()
Maybe it can be reworked to multiplex reading instead of reading the whole file in that thread at once.Deactivated User
04/23/2018, 2:22 PMDeactivated User
04/23/2018, 2:34 PM//part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyToSuspend(it) } }
suspend fun InputStream.copyToSuspend(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE, dispatcher: CoroutineDispatcher = ioCoroutineDispatcher): Long {
var bytesCopied: Long = 0
val buffer = ByteArray(bufferSize)
while (true) {
val bytes = withContext(dispatcher) { read(buffer) }
if (bytes < 0) break
withContext(dispatcher) { out.write(buffer, 0, bytes) }
bytesCopied += bytes
}
return bytesCopied
}
dave08
04/23/2018, 2:38 PMwithContext() {}
instead of async { }.await()
, since you're in a suspend fun
? Since anyways you await on each looping...cy
04/23/2018, 2:38 PMdave08
04/23/2018, 2:39 PMproduce { }
might be better?cy
04/23/2018, 2:40 PMcy
04/23/2018, 2:40 PMval channel = writer(CommonPool) {
val buffer = ByteArray(4096)
val stream = file.inputStream()
while (true) {
val rc = stream.read(buffer)
if (rc == -1) break
channel.writeFully(buffer, 0, rc)
}
}
// here we have a channel that is asynchronous
dave08
04/23/2018, 2:42 PMwithContext
doesn't start a new coroutine, it just switches the coroutine context... so it could be used to surround the whole function, if you don't need to switch back and forth...cy
04/23/2018, 2:44 PMwriter
is similar to produce
but for byte channelDeactivated User
04/23/2018, 2:45 PMcy
04/23/2018, 2:46 PMcy
04/23/2018, 2:48 PMdave08
04/23/2018, 2:54 PMcopyToSuspend
part)?dave08
04/23/2018, 2:57 PMwithContext(CommonPool)
until this is fixed?dave08
04/23/2018, 2:58 PMioCoroutineDispatcher
?dave08
04/23/2018, 2:59 PMDeactivated User
04/23/2018, 2:59 PMdave08
04/23/2018, 3:01 PMdave08
04/23/2018, 3:01 PMcy
04/23/2018, 4:44 PMioCoroutineDispatcher
! Unlike blocking in request handler coroutine, blocking on ioCoroutineDispatcher
could cause infinite deadlockdave08
04/23/2018, 4:47 PMcy
04/23/2018, 4:48 PMcy
04/23/2018, 4:49 PMnewFixedThreadPoolContext
and ExecutorService.asCoroutineDispatcher()
dave08
04/23/2018, 4:50 PMCommonPool
...Deactivated User
04/23/2018, 4:52 PM//part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyToSuspend(it) } }
suspend fun InputStream.copyToSuspend(
out: OutputStream,
bufferSize: Int = DEFAULT_BUFFER_SIZE,
yieldSize: Int = 4 * 1024 * 1024,
dispatcher: CoroutineDispatcher = ioCoroutineDispatcher
): Long {
return withContext(dispatcher) {
val buffer = ByteArray(bufferSize)
var bytesCopied = 0L
var bytesAfterYield = 0L
while (true) {
val bytes = read(buffer).takeIf { it >= 0 } ?: break
out.write(buffer, 0, bytes)
if (bytesAfterYield >= yieldSize) {
yield()
bytesAfterYield %= yieldSize
}
bytesCopied += bytes
bytesAfterYield += bytes
}
return@withContext bytesCopied
}
}