Hi folks, I wonder, how to emit the answer from a ...
# coroutines
m
Hi folks, I wonder, how to emit the answer from a Flow B, from another Flow A? For example, I have a method returning a filepath using a Flow. If the file exists, the path will be calculated and generated. Otherwise, a network call will be performed, then the file will be stored & the path will be sent. For the latter, I'd like to not use a blocking call, so I'm creating a callbackFlow, but collecting that flow & emitting the result through the top level one does not work. What am I missing?
j
Why is the method returning a flow if it's a single file path? Why not just make it a suspend function?
m
because I need to call it from the UI thread, and I don't want to block it.
j
A suspend function also achieves that
m
through runBlocking, which I don't want to do
I want to expose computations as Flows instead of using other observer-like tools, or callbacks, and, if I need to do other potentially slow computations inside those, I'd like to consume them as Flows as well. Otherwise, I would just stick with callbacks
j
I think you misunderstood suspend functions, they are exactly the thing that replaces callback based code for one-shot callbacks, and flows are the equivalent for multishot callbacks
Using
runBlocking
is obviously wrong in your case, this is the thing that blocks, not suspend functions. You don't have to use
runBlocking
to call suspend functions, you can use other coroutine builders
When you use flows, someone somewhere has to collect the flow when you actually want to do stuff, and that will require a coroutine too.
If you share some code I can show you what I mean, and how you can probably simplify a lot
m
Thank you
this is the current block I'm trying to build:
Copy code
fun getMemeImage(url : String) : Flow<String> {

    val flow = flow {
        if(!isImageCached(url)){
            val request = Request.Builder().url(url).build()
            callbackFlow<String> {
                client.newCall(request).enqueue(object : Callback{
                    override fun onFailure(call: Call, e: IOException) {
                        cancel("request failed", e)
                    }
                    override fun onResponse(call: Call, response: Response) {
                        response.body?.let {
                            var fileName = calculateImageName(url)
                            var file = File(fileName)
                            it.byteStream().copyTo(file.outputStream(), 1024*1000)
                            trySend(fileName)
                        }
                    }
                })
            }.flowOn(Scopes.getIODispatcher()).collect {
                this@flow.emit(it)
            }
        } else {
            this.emit(calculateImageName(url))
        }
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
    return flow
}
I managed to make it work, looks like the problem was in the UI side. Thank you!
j
callbackFlow
is a utility to convert a multi-shot callback API into a flow. The equivalent to turn a one-shot callback (like an HTTP request) into a suspend function is
suspendCoroutine
or
suspendCancellableCoroutine
. That being said, for OkHttp, people have already written those coroutine adapters: https://github.com/gildor/kotlin-coroutines-okhttp So, after adding this library (which is excessively tiny), you can replace the
enqueue()
call with just an
await()
. With OkHttp 5, this will even be built-in, and the suspend function will be `executeAsync()`: https://github.com/square/okhttp/tree/master/okhttp-coroutines
Now you have a couple problems here. For instance, you use
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, but that has almost no effect because nothing happens in your callback flow. Using a
callbackFlow
allows to run concurrent stuff in different coroutines or threads. In your case, all the IO happens in the callback of OkHttp, which is very likely running on OkHttp's own thread pool, and almost nothing runs in the actual flow's body, so there is no need for a context switch. Also, there is no
awaitClose()
in your
callbackFlow
so the block might even finish before the HTTP request is even made. In any case, there is no point in fixing this, because you can just replace all of it with
await()
.
Also, you use
response.body?.let { ... }
to emit the (transformed) response from OkHttp. This means that in case
body
is null, nothing happens and your flow will emit 0 elements, which will make consumers hang forever
Another issue is that the both IO streams used here are never closed.
So here is the version I would recommend instead:
Copy code
suspend fun getMemeImage(url : String) : String {
    val fileName = calculateImageName(url)
    if (isImageCached(url)) {
        return fileName
    }
    val request = Request.Builder().url(url).build()
    val response = client.newCall(request).await()
    val body = response.body ?: TODO("decide what to do if the body is null")
    body.writeToFile(fileName, bufferSize = 1024 * 1000)
    return fileName
}

private suspend fun ResponseBody.writeToFile(fileName: String, bufferSize: Int) {
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        byteStream().use { input ->
            File(fileName).outputStream().use { output ->
                input.copyTo(output, bufferSize)
            }
        }
    }
}
160 Views