https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Mark

05/29/2019, 6:31 AM
I’m writing a suspend function that takes a suspend block as an arg, executes it asynchronously, meanwhile monitoring the length of a file and periodically notifying a callback of the current file length. At the moment I have this, but I’m sure there’s a much neater way, for example, not using GlobalScope.
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L, block: suspend () -> Unit) {
    val deferred = GlobalScope.async {
        block()
    }
    while (deferred.isActive) {
        fileLengthCallback(length())
        delay(refreshInterval)
    }
    fileLengthCallback(length())
}
g

gildor

05/29/2019, 6:34 AM
Yes, you shouldn’t use GlobalScope, instead you should use
coroutineScope
function to create new scope that attached to this suspend function and start async inside
2
l

louiscad

05/29/2019, 6:36 AM
Why are you using
async
if you don't call
await
on it also?
m

Mark

05/29/2019, 6:36 AM
I guess I just need the Job?
👌 1
l

louiscad

05/29/2019, 6:37 AM
You don't need it. A
Job
from
launch
is enough.
And use
coroutineScope { … }
in place of
GlobalScope
as Andrey said, so you don't break structured concurrency and don't leak the child coroutine.
m

Mark

05/29/2019, 6:50 AM
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L, block: suspend () -> Unit) {
    coroutineScope {
        val job = launch {
            block()
        }
        while (job.isActive) {
            fileLengthCallback(length())
            delay(refreshInterval)
        }
    }
    fileLengthCallback(length())
}
Better?
👌 1
l

louiscad

05/29/2019, 6:51 AM
Yes, although I'm not sure about what is the point of
block
g

gildor

05/29/2019, 6:53 AM
wouldn’t be better to call this function from block?
It looks as strange ingection of lifecycle of one suspend function to another
m

Mark

05/29/2019, 6:53 AM
Just to encapsulate how long to monitor the file for. The block is doing something on the file (think: building sqlite indexes)
This way the block does not need to bother about fileLengthCallback
g

gildor

05/29/2019, 6:54 AM
I mean just have:
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L) {
        while (true) {
            fileLengthCallback(length())
            delay(refreshInterval)
        }
    }
    fileLengthCallback(length())
}
m

Mark

05/29/2019, 6:55 AM
How is it cancelled?
How is the final callback ever invoked?
g

gildor

05/29/2019, 6:56 AM
if parent coroutine is cancelled
m

Mark

05/29/2019, 6:56 AM
But still the final line is never invoked?
g

gildor

05/29/2019, 6:56 AM
yes, you right, you should also add try/finally
but it’s hard to advice something without knowing use case better
in general your approach will work, just looks a bit strange
m

Mark

05/29/2019, 6:57 AM
Think of the block executing a bunch of sql commands (that modify db size) and the file being the underlying db file
If I don’t put the launch in the trackLength func then I have to put it in the func calling the block, which seems messy to me
Does this approach make more sense?
Copy code
suspend fun <T> (suspend (File) -> T).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit) = withContext(Dispatchers.Main) {
    return@withContext coroutineScope {
        val deferred = async {
            invoke(file)
        }
        while (deferred.isActive) {
            callback(file.length())
            delay(refreshInterval)
        }
        callback(file.length())
        deferred.await()
    }
}
i.e. an extension function on an existing suspending function providing an alternative to invoke(). Also back to using
async
so that we can return the result of the
invoke
g

gildor

05/29/2019, 7:39 AM
Using coroutineScope and withContext togeter is redundant, withContext also creates scope
in general make sense for me
m

Mark

05/29/2019, 7:42 AM
Thanks Andrey, that makes sense. Is there also a problem with the
await
? I vaguely remember something about the async not being invoked until await is called???
g

gildor

05/29/2019, 7:43 AM
no, by default async is eager, so it start invocation immediately (until you explicitly set Lazy start mode)
m

Mark

05/29/2019, 7:43 AM
Ok, I guess I was confusing it with something else
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(Dispatchers.Main) {
    val deferred = async {
        invoke(file)
    }
    while (deferred.isActive) {
        callback(file.length())
        delay(refreshInterval)
    }
    callback(file.length())
	deferred.await()
}
g

gildor

05/29/2019, 7:46 AM
Careful with Dispatchers.Main
file.length() is reading from disc and block your main thread
if you need this for your callback, just wrap callback to main and use IO by default
m

Mark

05/29/2019, 7:47 AM
ah yes, good point
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val notifyCallbackFun = suspend {
        val length = file.length()
        withContext(Dispatchers.Main) {
            callback(length)
        }
    }
    val deferred = async {
        invoke(file)
    }
    while (deferred.isActive) {
        notifyCallbackFun()
        delay(refreshInterval)
    }
    notifyCallbackFun()
    deferred.await()
}
g

gildor

05/29/2019, 8:32 AM
should work
👍 1
e

elizarov

05/29/2019, 8:40 AM
deferred.await()
at the end is redundant. The
await
is implicit at the end of the scope.
g

gildor

05/29/2019, 8:42 AM
but
deferred.await()
returns result R
👍 1
so it should be the last statement of withContext
e

elizarov

05/29/2019, 8:42 AM
Good point.
Also:
Copy code
val notifyCallbackFun = suspend {
-->
Copy code
suspend fun notifyCallbackFun() {
(just use a local function!)
👍 1
1
However, I personally don’t like the whole idea of introducing a function here. I’d write it like this:
Copy code
while (true) {
        notifyCallbackFun()
        if (!deferred.isActive) break
        delay(refreshInterval)
    }
And now you can just inline
notifyCallbackFun
into this code.
m

Mark

05/29/2019, 8:44 AM
Ah yes, good point, I forgot about those! In the end I just got rid of it:
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R {
    return suspend { invoke(file) }.invokeAndTrackFileLength(file, refreshInterval, callback)
}

suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    if (refreshInterval <= 0L) {
        throw IllegalArgumentException("invalid refreshInterval: $refreshInterval")
    }
    val deferred = async {
        invoke()
    }
    var oldLength = -1L
    while (deferred.isActive) {
        file.length().takeIf { it != oldLength }?.let { len ->
            withContext(Dispatchers.Main) {
                if (deferred.isActive) {
                    callback(len)
                }
            }
            //logd("notifying length: $it")
            oldLength = len
        }
        delay(refreshInterval)
    }
    deferred.await().also {
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
Hmm, isn’t there a small problem here? I really want the delay() to cancel as soon as the
block.invoke()
finishes. How to do this?
g

gildor

05/29/2019, 9:03 AM
to do that you should create one more coroutine on each loop cycle and cancel it using
invokeOnComplete
registered on deferred
m

Mark

05/29/2019, 9:04 AM
Okay, though why not launch a new coroutine for the whole loop and cancel that the same way?
g

gildor

05/29/2019, 9:05 AM
oh yeah, you right, for whole loop is fine in this case
m

Mark

05/29/2019, 9:07 AM
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val deferred = async {
        invoke()
    }
    val trackJob = launch {
        var oldLength = -1L
        while (deferred.isActive) {
            file.length().takeIf { it != oldLength }?.let { len ->
                withContext(Dispatchers.Main) {
                    if (deferred.isActive) {
                        callback(len)
                    }
                }
                oldLength = len
            }
            delay(refreshInterval)
        }
    }
    deferred.invokeOnCompletion {
        trackJob.cancel()
    }
    deferred.await().also {
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
Now it looks like those
deferred.isActive()
calls are no longer necessary?
g

gildor

05/29/2019, 9:11 AM
yes, you right
m

Mark

05/29/2019, 9:18 AM
So why still use the
async
? Could we just do:
Copy code
try {
    invoke()
} finally {
    trackJob.cancel()
    val len = file.length()
    withContext(Dispatchers.Main) {
        callback(len)
    }
}
g

gildor

05/29/2019, 9:22 AM
Not sure which part of the code you want to raeplace
m

Mark

05/29/2019, 9:22 AM
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val trackJob = launch {
        var oldLength = -1L
        while (true) {
            val len = file.length()
            if (len != oldLength) {
                withContext(Dispatchers.Main) {
                    callback(len)
                }
                oldLength = len
            }
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
g

gildor

05/29/2019, 9:31 AM
yeah, this make sense, much more simple
👍 1
m

Mark

05/29/2019, 9:55 AM
The final thing that bothers me is the File.length() call so I made a suspending function to ensure it runs on the IO dispatcher. So now I can make the method use
withContext(Dispatchers.Main)
. However, I worry that this means there will be too much context switching since the file.length() call is made much more often than the callback(len) call.
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(Dispatchers.Main) {
    val trackJob = launch {
        var oldLength = -1L
        while (true) {
            file.lengthIO().takeIf { it != oldLength }?.let { len ->
                callback(len)
                oldLength = len
            }
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        callback(file.lengthIO())
    }
}

suspend fun File.lengthIO() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    length()
}
Should I wrap the body of the
launch{...body...}
in a
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
to reduce the amount of context switching or maybe it doesn’t matter?
g

gildor

05/29/2019, 9:58 AM
why would you do this? launch already uses IO, because it is dispatcher of parent scope (withContext scope)
m

Mark

05/29/2019, 10:01 AM
But the parent scope is now using Dispatchers.Main
g

gildor

05/29/2019, 10:01 AM
ah, I see, you changed default dispatcher
m

Mark

05/29/2019, 10:02 AM
Yeah, so consequently there is now less withContext()
g

gildor

05/29/2019, 10:02 AM
you will have context switch in any case
m

Mark

05/29/2019, 10:03 AM
I ask because the
len != oldLength
check is often false
so if the launch dispatcher is IO, then there will be less switching. What I don’t know is whether this is something to worry about.
g

gildor

05/29/2019, 10:04 AM
yeah, maybe you can optimize it a bit
also you can use length() directly, so less suspend points
m

Mark

05/29/2019, 10:06 AM
Hmm, I thought the call would just be passed through since same dispatcher, no?
g

gildor

05/29/2019, 10:07 AM
yes, dispatcher is the same, but it also will generate suspend point, cancellation, new context object etc
m

Mark

05/29/2019, 10:07 AM
Oh okay, thanks. Good to know!
So, I ditched the default dispatcher (back to using
coroutineScope
) and so now use the appropriate dispatcher inline. Also added a local function 😉
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = coroutineScope {
    var oldLength = -1L
    suspend fun notifyCallback(length: Long) {
        if (length != oldLength) {
            withContext(Dispatchers.Main) {
                callback(length)
            }
            oldLength = length
        }
    }
    val trackJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        while (true) {
            notifyCallback(file.length())
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        notifyCallback(file.lengthIO())
    }
}
suspend fun File.lengthIO() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    length()
}
z

Zach Klippenstein (he/him) [MOD]

05/29/2019, 5:09 PM
I/O is going to be an order of magnitude slower than any coroutine context switching anyway, I wouldn’t worry about optimizing context switches unless you’re doing lots of CPU-bound work.
g

gildor

05/29/2019, 11:22 PM
Not every IO is so slow (methods like length() or exist() are not so slow) also it highly depends on how often you executing this code, example with repeat(100) actually make sense to optimize
41 Views