https://kotlinlang.org logo
Title
m

McEna

05/01/2023, 9:39 PM
Hi folks, I wonder, how to emit the answer from a Flow B, from another Flow A? For example, I have a method returning a filepath using a Flow. If the file exists, the path will be calculated and generated. Otherwise, a network call will be performed, then the file will be stored & the path will be sent. For the latter, I'd like to not use a blocking call, so I'm creating a callbackFlow, but collecting that flow & emitting the result through the top level one does not work. What am I missing?
j

Joffrey

05/01/2023, 9:43 PM
Why is the method returning a flow if it's a single file path? Why not just make it a suspend function?
m

McEna

05/02/2023, 5:36 AM
because I need to call it from the UI thread, and I don't want to block it.
j

Joffrey

05/02/2023, 5:39 AM
A suspend function also achieves that
m

McEna

05/02/2023, 6:38 AM
through runBlocking, which I don't want to do
I want to expose computations as Flows instead of using other observer-like tools, or callbacks, and, if I need to do other potentially slow computations inside those, I'd like to consume them as Flows as well. Otherwise, I would just stick with callbacks
j

Joffrey

05/02/2023, 7:05 AM
I think you misunderstood suspend functions, they are exactly the thing that replaces callback based code for one-shot callbacks, and flows are the equivalent for multishot callbacks
Using
runBlocking
is obviously wrong in your case, this is the thing that blocks, not suspend functions. You don't have to use
runBlocking
to call suspend functions, you can use other coroutine builders
When you use flows, someone somewhere has to collect the flow when you actually want to do stuff, and that will require a coroutine too.
If you share some code I can show you what I mean, and how you can probably simplify a lot
m

McEna

05/02/2023, 7:33 AM
Thank you
this is the current block I'm trying to build:
fun getMemeImage(url : String) : Flow<String> {

    val flow = flow {
        if(!isImageCached(url)){
            val request = Request.Builder().url(url).build()
            callbackFlow<String> {
                client.newCall(request).enqueue(object : Callback{
                    override fun onFailure(call: Call, e: IOException) {
                        cancel("request failed", e)
                    }
                    override fun onResponse(call: Call, response: Response) {
                        response.body?.let {
                            var fileName = calculateImageName(url)
                            var file = File(fileName)
                            it.byteStream().copyTo(file.outputStream(), 1024*1000)
                            trySend(fileName)
                        }
                    }
                })
            }.flowOn(Scopes.getIODispatcher()).collect {
                this@flow.emit(it)
            }
        } else {
            this.emit(calculateImageName(url))
        }
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
    return flow
}
I managed to make it work, looks like the problem was in the UI side. Thank you!
j

Joffrey

05/02/2023, 8:09 AM
callbackFlow
is a utility to convert a multi-shot callback API into a flow. The equivalent to turn a one-shot callback (like an HTTP request) into a suspend function is
suspendCoroutine
or
suspendCancellableCoroutine
. That being said, for OkHttp, people have already written those coroutine adapters: https://github.com/gildor/kotlin-coroutines-okhttp So, after adding this library (which is excessively tiny), you can replace the
enqueue()
call with just an
await()
. With OkHttp 5, this will even be built-in, and the suspend function will be `executeAsync()`: https://github.com/square/okhttp/tree/master/okhttp-coroutines
Now you have a couple problems here. For instance, you use
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, but that has almost no effect because nothing happens in your callback flow. Using a
callbackFlow
allows to run concurrent stuff in different coroutines or threads. In your case, all the IO happens in the callback of OkHttp, which is very likely running on OkHttp's own thread pool, and almost nothing runs in the actual flow's body, so there is no need for a context switch. Also, there is no
awaitClose()
in your
callbackFlow
so the block might even finish before the HTTP request is even made. In any case, there is no point in fixing this, because you can just replace all of it with
await()
.
Also, you use
response.body?.let { ... }
to emit the (transformed) response from OkHttp. This means that in case
body
is null, nothing happens and your flow will emit 0 elements, which will make consumers hang forever
Another issue is that the both IO streams used here are never closed.
So here is the version I would recommend instead:
suspend fun getMemeImage(url : String) : String {
    val fileName = calculateImageName(url)
    if (isImageCached(url)) {
        return fileName
    }
    val request = Request.Builder().url(url).build()
    val response = client.newCall(request).await()
    val body = response.body ?: TODO("decide what to do if the body is null")
    body.writeToFile(fileName, bufferSize = 1024 * 1000)
    return fileName
}

private suspend fun ResponseBody.writeToFile(fileName: String, bufferSize: Int) {
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        byteStream().use { input ->
            File(fileName).outputStream().use { output ->
                input.copyTo(output, bufferSize)
            }
        }
    }
}