myanmarking
02/17/2020, 1:13 PMdave08
02/17/2020, 1:14 PMmyanmarking
02/17/2020, 1:15 PMfun doSomething() = callbackFlow{
withContext(<http://Dispatchers.IO|Dispatchers.IO>){
offer(Unit)
}
}
dave08
02/17/2020, 1:17 PMmyanmarking
02/17/2020, 1:17 PMdave08
02/17/2020, 1:17 PMprivate fun Call.downloadAndSaveTo(
output: Sink,
bufferSize: Long = DEFAULT_BUFFER_SIZE.toLong() * 10,
flushSize: Long = bufferSize * 100
)Flow<Long> = callbackFlow<Long> {
enqueue(object : Callback {
override fun onFailure(call: Call?, e: IOException) {
this@callbackFlow.close(e)
}
override fun onResponse(call: Call?, response: Response) {
if (!response.isSuccessful) {
this@callbackFlow.close(IllegalStateException("Unexpected HTTP code: ${response.code()}"))
}
try {
response.body()?.use { body ->
val contentLength = body.contentLength()
val buffer = Buffer()
var finished = false
var totalLength = 0L
var lastFlush = 0L
output.buffer().use { out ->
body.source().use { source ->
while (isActive) {
val read = source.read(buffer, bufferSize)
if (read == -1L) {
out.flush()
finished = true
break
}
out.write(buffer, read)
totalLength += read
if ((totalLength - lastFlush) / flushSize > 0) {
out.flush()
lastFlush = totalLength
} else
out.emit()
offer(read)
}
}
}
}
channel.close()
} catch (e: Exception) {
this@callbackFlow.close(e)
}
}
})
awaitClose {
// Cancel the call
this@downloadAndSaveTo.cancel()
// Close other resources..
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
myanmarking
02/17/2020, 1:20 PMdave08
02/17/2020, 1:21 PMval read = source.read(buffer, bufferSize)
myanmarking
02/17/2020, 1:21 PMdave08
02/17/2020, 1:25 PMchannelFlow
that something happened. That's why I catch it and call close
there...awaitClose
?myanmarking
02/17/2020, 1:28 PMdave08
02/17/2020, 1:46 PMclose
there... 🤔