https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

myanmarking

02/17/2020, 1:13 PM
unless you change the dispatcher of emmit
d

dave08

02/17/2020, 1:14 PM
What do you mean by this?
m

myanmarking

02/17/2020, 1:15 PM
with flow, you cannot emit events in another dispatcher. So if you do that, it will crash
Copy code
fun doSomething() = callbackFlow{
    withContext(<http://Dispatchers.IO|Dispatchers.IO>){
        offer(Unit)
    }
}
you cannot do this
d

dave08

02/17/2020, 1:17 PM
No, I don't do that (I use channelFlow for that...), but I still have a `okio.SocketAsyncTimeout.newTimeoutException (SocketAsyncTimeout.java:159)`that's crashing the app ...
m

myanmarking

02/17/2020, 1:17 PM
without seeing the code its hard to guess
d

dave08

02/17/2020, 1:17 PM
I thought callback flow is for that...
yup, these things are a bit hard to pinpoint, I admit. It's crashing on a timeout here:
Copy code
private fun Call.downloadAndSaveTo(
			output: Sink,
			bufferSize: Long = DEFAULT_BUFFER_SIZE.toLong() * 10,
			flushSize: Long = bufferSize * 100
	)Flow<Long> = callbackFlow<Long> {
		enqueue(object : Callback {
			override fun onFailure(call: Call?, e: IOException) {
				this@callbackFlow.close(e)
			}

			override fun onResponse(call: Call?, response: Response) {
				if (!response.isSuccessful) {
					this@callbackFlow.close(IllegalStateException("Unexpected HTTP code: ${response.code()}"))
				}

				try {
					response.body()?.use { body ->
						val contentLength = body.contentLength()
						val buffer = Buffer()
						var finished = false
						var totalLength = 0L

						var lastFlush = 0L

						output.buffer().use { out ->
							body.source().use { source ->
								while (isActive) {
									val read = source.read(buffer, bufferSize)
									if (read == -1L) {
										out.flush()

										finished = true
										break
									}

									out.write(buffer, read)
									totalLength += read

									if ((totalLength - lastFlush) / flushSize > 0) {
										out.flush()
										lastFlush = totalLength
									} else
										out.emit()

									offer(read)
								}
							}
						}
					}

					channel.close()
				} catch (e: Exception) {
					this@callbackFlow.close(e)
				}
			}
		})

		awaitClose {
			// Cancel the call
			this@downloadAndSaveTo.cancel()

			// Close other resources..
		}
	}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
m

myanmarking

02/17/2020, 1:20 PM
ok give me a sec
d

dave08

02/17/2020, 1:21 PM
I think on `
Copy code
val read = source.read(buffer, bufferSize)
m

myanmarking

02/17/2020, 1:21 PM
hm you dont need the try catch i think
the only place i see a problem, is awaitClose
d

dave08

02/17/2020, 1:26 PM
What do you see wrong with the
awaitClose
?
m

myanmarking

02/17/2020, 1:28 PM
wait i deleted the comment lol
you read it right 😛 ?
if there is a crash inside awaitClose, im not sure it wont be thrown
d

dave08

02/17/2020, 1:46 PM
Right, could be, but all I have is
close
there... 🤔
5 Views