McEna
05/01/2023, 9:39 PMJoffrey
05/01/2023, 9:43 PMMcEna
05/02/2023, 5:36 AMJoffrey
05/02/2023, 5:39 AMMcEna
05/02/2023, 6:38 AMJoffrey
05/02/2023, 7:05 AMrunBlocking
is obviously wrong in your case, this is the thing that blocks, not suspend functions. You don't have to use runBlocking
to call suspend functions, you can use other coroutine buildersMcEna
05/02/2023, 7:33 AMfun getMemeImage(url : String) : Flow<String> {
val flow = flow {
if(!isImageCached(url)){
val request = Request.Builder().url(url).build()
callbackFlow<String> {
client.newCall(request).enqueue(object : Callback{
override fun onFailure(call: Call, e: IOException) {
cancel("request failed", e)
}
override fun onResponse(call: Call, response: Response) {
response.body?.let {
var fileName = calculateImageName(url)
var file = File(fileName)
it.byteStream().copyTo(file.outputStream(), 1024*1000)
trySend(fileName)
}
}
})
}.flowOn(Scopes.getIODispatcher()).collect {
this@flow.emit(it)
}
} else {
this.emit(calculateImageName(url))
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
return flow
}
Joffrey
05/02/2023, 8:09 AMcallbackFlow
is a utility to convert a multi-shot callback API into a flow. The equivalent to turn a one-shot callback (like an HTTP request) into a suspend function is suspendCoroutine
or suspendCancellableCoroutine
. That being said, for OkHttp, people have already written those coroutine adapters:
https://github.com/gildor/kotlin-coroutines-okhttp
So, after adding this library (which is excessively tiny), you can replace the enqueue()
call with just an await()
.
With OkHttp 5, this will even be built-in, and the suspend function will be `executeAsync()`:
https://github.com/square/okhttp/tree/master/okhttp-coroutinesflowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, but that has almost no effect because nothing happens in your callback flow. Using a callbackFlow
allows to run concurrent stuff in different coroutines or threads. In your case, all the IO happens in the callback of OkHttp, which is very likely running on OkHttp's own thread pool, and almost nothing runs in the actual flow's body, so there is no need for a context switch. Also, there is no awaitClose()
in your callbackFlow
so the block might even finish before the HTTP request is even made.
In any case, there is no point in fixing this, because you can just replace all of it with await()
.response.body?.let { ... }
to emit the (transformed) response from OkHttp. This means that in case body
is null, nothing happens and your flow will emit 0 elements, which will make consumers hang foreversuspend fun getMemeImage(url : String) : String {
val fileName = calculateImageName(url)
if (isImageCached(url)) {
return fileName
}
val request = Request.Builder().url(url).build()
val response = client.newCall(request).await()
val body = response.body ?: TODO("decide what to do if the body is null")
body.writeToFile(fileName, bufferSize = 1024 * 1000)
return fileName
}
private suspend fun ResponseBody.writeToFile(fileName: String, bufferSize: Int) {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
byteStream().use { input ->
File(fileName).outputStream().use { output ->
input.copyTo(output, bufferSize)
}
}
}
}