rocketraman
11/01/2018, 5:03 PMelizarov
11/01/2018, 6:22 PMrocketraman
11/02/2018, 6:18 AMsuspend fun Flowable<ByteBuffer>.toByteReadChannel(scope: CoroutineScope) = scope.writer(scope.coroutineContext, autoFlush = true) {
try {
this@toByteReadChannel.consumeEach {
channel.writeFully(it)
}
} catch (e: Throwable) {
channel.close(e)
}
}.channel
Note here I have explicitly passed scope, since its already an extension of Flowable<ByteBuffer>
. Is there a better approach?toFlowable
function, implemented like this:
suspend fun DataChannelProvider.toFlowable(scope: CoroutineScope): Flowable<ByteBuffer> = scope.rxFlowable(Dispatchers.Default) {
this@toFlowable().lookAheadSuspend {
consumeEachRemaining {
// we have to make a copy as the ByteBuffer here comes from a reused ring buffer
this@rxFlowable.send(it.copy())
true
}
}
}
Zach Klippenstein (he/him) [MOD]
11/02/2018, 10:10 PMsuspend
function, you can use coroutineContext
to get the scope from the current coroutine.
suspend fun Flowable<ByteBuffer>.toByteReadChannel() = coroutineContext {
writer(…
rocketraman
11/03/2018, 2:03 PMcoroutineScope { ... }
but otherwise, ah, thank you @Zach Klippenstein (he/him) [MOD]