I’m writing a suspend function that takes a suspen...
# coroutines
m
I’m writing a suspend function that takes a suspend block as an arg, executes it asynchronously, meanwhile monitoring the length of a file and periodically notifying a callback of the current file length. At the moment I have this, but I’m sure there’s a much neater way, for example, not using GlobalScope.
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L, block: suspend () -> Unit) {
    val deferred = GlobalScope.async {
        block()
    }
    while (deferred.isActive) {
        fileLengthCallback(length())
        delay(refreshInterval)
    }
    fileLengthCallback(length())
}
g
Yes, you shouldn’t use GlobalScope, instead you should use
coroutineScope
function to create new scope that attached to this suspend function and start async inside
2
l
Why are you using
async
if you don't call
await
on it also?
m
I guess I just need the Job?
👌 1
l
You don't need it. A
Job
from
launch
is enough.
And use
coroutineScope { … }
in place of
GlobalScope
as Andrey said, so you don't break structured concurrency and don't leak the child coroutine.
m
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L, block: suspend () -> Unit) {
    coroutineScope {
        val job = launch {
            block()
        }
        while (job.isActive) {
            fileLengthCallback(length())
            delay(refreshInterval)
        }
    }
    fileLengthCallback(length())
}
Better?
👌 1
l
Yes, although I'm not sure about what is the point of
block
g
wouldn’t be better to call this function from block?
It looks as strange ingection of lifecycle of one suspend function to another
m
Just to encapsulate how long to monitor the file for. The block is doing something on the file (think: building sqlite indexes)
This way the block does not need to bother about fileLengthCallback
g
I mean just have:
Copy code
suspend fun File.trackLength(fileLengthCallback: (Long) -> Unit, refreshInterval: Long = 100L) {
        while (true) {
            fileLengthCallback(length())
            delay(refreshInterval)
        }
    }
    fileLengthCallback(length())
}
m
How is it cancelled?
How is the final callback ever invoked?
g
if parent coroutine is cancelled
m
But still the final line is never invoked?
g
yes, you right, you should also add try/finally
but it’s hard to advice something without knowing use case better
in general your approach will work, just looks a bit strange
m
Think of the block executing a bunch of sql commands (that modify db size) and the file being the underlying db file
If I don’t put the launch in the trackLength func then I have to put it in the func calling the block, which seems messy to me
Does this approach make more sense?
Copy code
suspend fun <T> (suspend (File) -> T).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit) = withContext(Dispatchers.Main) {
    return@withContext coroutineScope {
        val deferred = async {
            invoke(file)
        }
        while (deferred.isActive) {
            callback(file.length())
            delay(refreshInterval)
        }
        callback(file.length())
        deferred.await()
    }
}
i.e. an extension function on an existing suspending function providing an alternative to invoke(). Also back to using
async
so that we can return the result of the
invoke
g
Using coroutineScope and withContext togeter is redundant, withContext also creates scope
in general make sense for me
m
Thanks Andrey, that makes sense. Is there also a problem with the
await
? I vaguely remember something about the async not being invoked until await is called???
g
no, by default async is eager, so it start invocation immediately (until you explicitly set Lazy start mode)
m
Ok, I guess I was confusing it with something else
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(Dispatchers.Main) {
    val deferred = async {
        invoke(file)
    }
    while (deferred.isActive) {
        callback(file.length())
        delay(refreshInterval)
    }
    callback(file.length())
	deferred.await()
}
g
Careful with Dispatchers.Main
file.length() is reading from disc and block your main thread
if you need this for your callback, just wrap callback to main and use IO by default
m
ah yes, good point
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val notifyCallbackFun = suspend {
        val length = file.length()
        withContext(Dispatchers.Main) {
            callback(length)
        }
    }
    val deferred = async {
        invoke(file)
    }
    while (deferred.isActive) {
        notifyCallbackFun()
        delay(refreshInterval)
    }
    notifyCallbackFun()
    deferred.await()
}
g
should work
👍 1
e
deferred.await()
at the end is redundant. The
await
is implicit at the end of the scope.
g
but
deferred.await()
returns result R
👍 1
so it should be the last statement of withContext
e
Good point.
Also:
Copy code
val notifyCallbackFun = suspend {
-->
Copy code
suspend fun notifyCallbackFun() {
(just use a local function!)
👍 1
1
However, I personally don’t like the whole idea of introducing a function here. I’d write it like this:
Copy code
while (true) {
        notifyCallbackFun()
        if (!deferred.isActive) break
        delay(refreshInterval)
    }
And now you can just inline
notifyCallbackFun
into this code.
m
Ah yes, good point, I forgot about those! In the end I just got rid of it:
Copy code
suspend fun <R> (suspend (File) -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R {
    return suspend { invoke(file) }.invokeAndTrackFileLength(file, refreshInterval, callback)
}

suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    if (refreshInterval <= 0L) {
        throw IllegalArgumentException("invalid refreshInterval: $refreshInterval")
    }
    val deferred = async {
        invoke()
    }
    var oldLength = -1L
    while (deferred.isActive) {
        file.length().takeIf { it != oldLength }?.let { len ->
            withContext(Dispatchers.Main) {
                if (deferred.isActive) {
                    callback(len)
                }
            }
            //logd("notifying length: $it")
            oldLength = len
        }
        delay(refreshInterval)
    }
    deferred.await().also {
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
Hmm, isn’t there a small problem here? I really want the delay() to cancel as soon as the
block.invoke()
finishes. How to do this?
g
to do that you should create one more coroutine on each loop cycle and cancel it using
invokeOnComplete
registered on deferred
m
Okay, though why not launch a new coroutine for the whole loop and cancel that the same way?
g
oh yeah, you right, for whole loop is fine in this case
m
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val deferred = async {
        invoke()
    }
    val trackJob = launch {
        var oldLength = -1L
        while (deferred.isActive) {
            file.length().takeIf { it != oldLength }?.let { len ->
                withContext(Dispatchers.Main) {
                    if (deferred.isActive) {
                        callback(len)
                    }
                }
                oldLength = len
            }
            delay(refreshInterval)
        }
    }
    deferred.invokeOnCompletion {
        trackJob.cancel()
    }
    deferred.await().also {
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
Now it looks like those
deferred.isActive()
calls are no longer necessary?
g
yes, you right
m
So why still use the
async
? Could we just do:
Copy code
try {
    invoke()
} finally {
    trackJob.cancel()
    val len = file.length()
    withContext(Dispatchers.Main) {
        callback(len)
    }
}
g
Not sure which part of the code you want to raeplace
m
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    val trackJob = launch {
        var oldLength = -1L
        while (true) {
            val len = file.length()
            if (len != oldLength) {
                withContext(Dispatchers.Main) {
                    callback(len)
                }
                oldLength = len
            }
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        val len = file.length()
        withContext(Dispatchers.Main) {
            callback(len)
        }
    }
}
g
yeah, this make sense, much more simple
👍 1
m
The final thing that bothers me is the File.length() call so I made a suspending function to ensure it runs on the IO dispatcher. So now I can make the method use
withContext(Dispatchers.Main)
. However, I worry that this means there will be too much context switching since the file.length() call is made much more often than the callback(len) call.
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = withContext(Dispatchers.Main) {
    val trackJob = launch {
        var oldLength = -1L
        while (true) {
            file.lengthIO().takeIf { it != oldLength }?.let { len ->
                callback(len)
                oldLength = len
            }
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        callback(file.lengthIO())
    }
}

suspend fun File.lengthIO() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    length()
}
Should I wrap the body of the
launch{...body...}
in a
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
to reduce the amount of context switching or maybe it doesn’t matter?
g
why would you do this? launch already uses IO, because it is dispatcher of parent scope (withContext scope)
m
But the parent scope is now using Dispatchers.Main
g
ah, I see, you changed default dispatcher
m
Yeah, so consequently there is now less withContext()
g
you will have context switch in any case
m
I ask because the
len != oldLength
check is often false
so if the launch dispatcher is IO, then there will be less switching. What I don’t know is whether this is something to worry about.
g
yeah, maybe you can optimize it a bit
also you can use length() directly, so less suspend points
m
Hmm, I thought the call would just be passed through since same dispatcher, no?
g
yes, dispatcher is the same, but it also will generate suspend point, cancellation, new context object etc
m
Oh okay, thanks. Good to know!
So, I ditched the default dispatcher (back to using
coroutineScope
) and so now use the appropriate dispatcher inline. Also added a local function 😉
Copy code
suspend fun <R> (suspend () -> R).invokeAndTrackFileLength(file: File, refreshInterval: Long = 200L, callback: (Long) -> Unit): R = coroutineScope {
    var oldLength = -1L
    suspend fun notifyCallback(length: Long) {
        if (length != oldLength) {
            withContext(Dispatchers.Main) {
                callback(length)
            }
            oldLength = length
        }
    }
    val trackJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        while (true) {
            notifyCallback(file.length())
            delay(refreshInterval)
        }
    }

    try {
        invoke()
    } finally {
        trackJob.cancel()
        notifyCallback(file.lengthIO())
    }
}
suspend fun File.lengthIO() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    length()
}
z
I/O is going to be an order of magnitude slower than any coroutine context switching anyway, I wouldn’t worry about optimizing context switches unless you’re doing lots of CPU-bound work.
g
Not every IO is so slow (methods like length() or exist() are not so slow) also it highly depends on how often you executing this code, example with repeat(100) actually make sense to optimize