ursus
12/12/2022, 7:22 PMjw
12/12/2022, 7:22 PMursus
12/12/2022, 7:32 PMwhile (bytes >= 0 && coroutineContext.isActive) {
?
(I’m measuring download speed)jw
12/12/2022, 7:35 PMbyteStream()
but should instead be using Okio for the whole way to the file.ursus
12/12/2022, 7:38 PMval start = System.currentTimeMillis()
body.use { it.byteStream().use { iss -> iss.justIterateUntilEnd() } }
val end = System.currentTimeMillis()
would okio’s stream implementation be faster?jw
12/12/2022, 7:42 PMit.skip(Long.MAX_VALUE)
would be even faster, but then you're measuring even less.jw
12/12/2022, 7:43 PMjw
12/12/2022, 7:43 PMursus
12/12/2022, 7:43 PMursus
12/12/2022, 7:44 PMursus
12/12/2022, 7:46 PMjw
12/12/2022, 7:47 PMBufferedSource
and call skip()
since it avoids copying the bytes one extra time.ursus
12/12/2022, 7:59 PMval totalBytesRead = runInterruptible {
body.source().use {
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
var bytesRead = 0L
while (!it.exhausted()) {
bytesRead += it.read(buffer)
}
bytesRead
}
}
how would I factor the skip
into it?ursus
12/12/2022, 7:59 PM.skip(buffer.size)
?jw
12/12/2022, 7:59 PMursus
12/12/2022, 8:00 PMjw
12/12/2022, 8:00 PMursus
12/12/2022, 8:01 PMjw
12/12/2022, 8:01 PMursus
12/12/2022, 8:08 PMvar bytesRead = 0
while (!it.request(bufferSize)) {
it.skip(bufferSize)
bytesRead += bufferSize
}
ursus
12/12/2022, 8:08 PMjw
12/12/2022, 8:09 PMrequest
returns false the count of bytes is available at it.buffer.size()
jw
12/12/2022, 8:10 PMursus
12/12/2022, 8:13 PMbody.source().use {
var bytesRead = 0L
while (true) {
if (!it.request(BUFFER_SIZE)) {
val lastBufferSize = it.buffer.size
bytesRead += lastBufferSize
it.skip(lastBufferSize)
break
}
it.skip(BUFFER_SIZE)
bytesRead += BUFFER_SIZE
}
bytesRead
}
like this?jw
12/12/2022, 8:14 PMjw
12/12/2022, 8:14 PMursus
12/12/2022, 8:15 PMjw
12/12/2022, 8:15 PMval bytesRead = it.source().use { it.readAll(blackholeSink()) }
ursus
12/12/2022, 8:18 PMursus
12/12/2022, 8:20 PMbyteStream()
makes it transform back to inputstreams + byteArray allocations?jw
12/12/2022, 8:21 PMursus
12/12/2022, 8:21 PMursus
12/12/2022, 8:47 PMrunInterruptible
and cancel the job then I get java.io.InterruptedIOException: interrupted
which is then reported as a regular error, should I just filter it away?
It seems that I cannot treat it as CancellationException
and rethrow itjw
12/12/2022, 8:53 PMrunCatching
would handle that, but looks like it only handles InterruptedException
.ursus
12/12/2022, 8:54 PMprivate fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
try {
val threadState = ThreadState(coroutineContext.job)
threadState.setup()
try {
return block()
} finally {
threadState.clearInterrupt()
}
} catch (e: InterruptedException) {
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
}
}
yes exactly, not the IO onejw
12/12/2022, 8:54 PMursus
12/12/2022, 8:57 PMtry {
runInterruptible { ... }
} catch (ex: InterruptedIOException) {
throw CancellationException().initCause(ex)
}
right?jw
12/12/2022, 8:57 PMursus
12/12/2022, 9:03 PMursus
12/12/2022, 9:44 PMwithTimeoutOrNull
ursus
12/12/2022, 9:45 PMprivate suspend fun readTotalBytes(body: ResponseBody): Long {
return body.use { b ->
withTimeoutOrNull(timeMillis = 1000) {
runIoInterruptible {
simulateWork(5000)
44
}
} ?: -1L
}
}
fun simulateWork(delayMillis: Long) {
val start = System.currentTimeMillis()
@Suppress("ControlFlowWithEmptyBody")
while (true) {
if (System.currentTimeMillis() - start >= delayMillis) {
break
}
if (Thread.interrupted()) {
LOG.d("INTERRUPTED")
break
}
}
}
this works as expected, timout kicks in, and -1L
is returnedursus
12/12/2022, 9:46 PMprivate suspend fun readTotalBytes(body: ResponseBody): Long {
return body.use { b ->
withTimeoutOrNull(timeMillis = 1000) {
runIoInterruptible {
b.source().use {
it.readAll(blackholeSink())
}
}
} ?: -1L
}
}
suspend fun <T> runIoInterruptible(block: () -> T): T {
return try {
runInterruptible(block = block)
} catch (ex: InterruptedIOException) {
throw CancellationException().initCause(ex)
}
}
ursus
12/12/2022, 9:47 PMwithTimeoutOrNull
, it never returns, as if it threwursus
12/12/2022, 9:48 PMrunInterriptible
uses the same mechanism of throwing CancellationExceptionursus
12/12/2022, 9:54 PMrunInterruptible {
try {
b.source().use {
it.readAll(blackholeSink())
}
} catch (ex: InterruptedIOException) { <-----------------
throw InterruptedException().initCause(ex)
}
}