Use <https://kotlinlang.org/api/kotlinx.coroutines...
# coroutines
u
hmm why does that work? does okhttp check for java thread interrupts implicitly?
j
yes
u
does that in any chance add some overhead versus
while (bytes >= 0 && coroutineContext.isActive) {
? (I’m measuring download speed)
j
Seems unlikely. If you're going for speed you shouldn't be using
byteStream()
but should instead be using Okio for the whole way to the file.
u
how would that help? I only read a 8K buffer each time and throw it away
Copy code
val start = System.currentTimeMillis()
body.use { it.byteStream().use { iss -> iss.justIterateUntilEnd() } }
val end = System.currentTimeMillis()
would okio’s stream implementation be faster?
j
Well, maybe. You're not really doing anything with the bytes there. So calling
it.skip(Long.MAX_VALUE)
would be even faster, but then you're measuring even less.
Are you trying to measure the speed of the network?
Or the speed of copying bytes?
u
speed of the network via downloading some bigger static file
I figured stream reading would be negligible but yes less overhead is preferable
let me revise that, I do count the bytes read to calculate the speed
j
What you have is probably fine, then. It would be slightly faster to operate on the
BufferedSource
and call
skip()
since it avoids copying the bytes one extra time.
u
so this is what I have
Copy code
val totalBytesRead = runInterruptible {
    body.source().use {
        val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
        var bytesRead = 0L
        while (!it.exhausted()) {
            bytesRead += it.read(buffer)
        }
        bytesRead
    }
}
how would I factor the
skip
into it?
.skip(buffer.size)
?
j
skip BUFFER_LENGTH at a time, delete all the byte array stuff
u
right but what about the last “buffer”, skip doesnt return anything
j
do you have a content length?
u
most likely no
j
otherwise you'd need to do request+skip
u
I still don’t get how will I get the total bytes?
Copy code
var bytesRead = 0
while (!it.request(bufferSize)) {
    it.skip(bufferSize)
    bytesRead += bufferSize
}
I mean the last buffer length
j
if
request
returns false the count of bytes is available at
it.buffer.size()
which ideally you should still skip so that the internal segment becomes empty and is returned to the pool
u
Copy code
body.source().use {
    var bytesRead = 0L
    while (true) {
        if (!it.request(BUFFER_SIZE)) {
            val lastBufferSize = it.buffer.size
            bytesRead += lastBufferSize
            it.skip(lastBufferSize)
            break
        }
        it.skip(BUFFER_SIZE)
        bytesRead += BUFFER_SIZE
    }
    bytesRead
}
like this?
j
do you only want the total byte count?
or are you reporting timings after each buffer read?
u
just the total download size, to figure out speed
j
i see. sorry we never really talked about exactly what you were doing. you can do that with a oneliner like:
Copy code
val bytesRead = it.source().use { it.readAll(blackholeSink()) }
u
oh, my bad, sweet!
and this is all more efficient, because okhttp is okio buffers underneath anyways, and using
byteStream()
makes it transform back to inputstreams + byteArray allocations?
j
that's right. we're avoiding intermediate copies between byte arrays.
u
got it, thank you so much!
btw if I use the
runInterruptible
and cancel the job then I get
java.io.InterruptedIOException: interrupted
which is then reported as a regular error, should I just filter it away? It seems that I cannot treat it as
CancellationException
and rethrow it
j
Hmm I would have thought that
runCatching
would handle that, but looks like it only handles
InterruptedException
.
u
Copy code
private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
    try {
        val threadState = ThreadState(coroutineContext.job)
        threadState.setup()
        try {
            return block()
        } finally {
            threadState.clearInterrupt()
        }
    } catch (e: InterruptedException) {
        throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
    }
}
yes exactly, not the IO one
j
Probably worth a bug to discuss on the coroutines repo.
u
but in meantime I should be able to just
Copy code
try {
    runInterruptible { ... }
} catch (ex: InterruptedIOException) {
    throw CancellationException().initCause(ex)
}
right?
j
yes
btw it does behave a bit oddly, if I wrap it with
withTimeoutOrNull
Copy code
private suspend fun readTotalBytes(body: ResponseBody): Long {
    return body.use { b ->
        withTimeoutOrNull(timeMillis = 1000) {
            runIoInterruptible {
                simulateWork(5000)
                44
            }
        } ?: -1L
    }
}

fun simulateWork(delayMillis: Long) {
    val start = System.currentTimeMillis()
    @Suppress("ControlFlowWithEmptyBody")
    while (true) {
        if (System.currentTimeMillis() - start >= delayMillis) {
            break
        }
        if (Thread.interrupted()) {
            LOG.d("INTERRUPTED")
            break
        }
    }
}
this works as expected, timout kicks in, and
-1L
is returned
if I use what we said to use
Copy code
private suspend fun readTotalBytes(body: ResponseBody): Long {
    return body.use { b ->
        withTimeoutOrNull(timeMillis = 1000) {
            runIoInterruptible {
                b.source().use {
                    it.readAll(blackholeSink())
                }
            }
        } ?: -1L
    }
}

suspend fun <T> runIoInterruptible(block: () -> T): T {
    return try {
        runInterruptible(block = block)
    } catch (ex: InterruptedIOException) {
        throw CancellationException().initCause(ex)
    }
}
then, I see the InterruptedIO get caught, and cancellation thrown, however, it messes up the
withTimeoutOrNull
, it never returns, as if it threw
and I’m not sure why it doesn’t work, when
runInterriptible
uses the same mechanism of throwing CancellationException
only this seems to work as expected
Copy code
runInterruptible {
    try {
        b.source().use {
            it.readAll(blackholeSink())
        }
    } catch (ex: InterruptedIOException) { <-----------------
        throw InterruptedException().initCause(ex)
    }
}