rocketraman
06/27/2018, 3:23 PMByteReadChannel
that I need to adapt to a rx2 API that uses Flowable<ByteBuffer>
. Here are the methods I've come up with, with some guidance from @Deactivated User and @cy on #ktor, to adapt these in both directions -- these seem to work but opinions on correctness, efficiency, and style are welcome:
Adapting Flowable<ByteBuffer>
to ByteReadChannel
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
try {
this@toByteReadChannel.consumeEach {
channel.writeFully(it)
}
} catch (e: Throwable) {
channel.close(e)
}
}.channel
and in the other direction:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
return rxFlowable {
this@toFlowable.lookAheadSuspend {
consumeEachRemaining {
// we have to make a copy as the ByteBuffer here comes from a reused ring buffer
this@rxFlowable.channel.send(it.copy())
true
}
this@rxFlowable.close()
}
}
}
Note the copy()
in the second code snippet. I don't like this but don't know how to get rid of it.this@rxFlowable.close()
was occasionally causing the library consuming the Flowable
to throw an error. I've removed it, but is that ok to do in this case?