https://kotlinlang.org logo
Title
r

rocketraman

06/27/2018, 3:23 PM
I have a coroutines-based API (ktor) that uses
ByteReadChannel
that I need to adapt to a rx2 API that uses
Flowable<ByteBuffer>
. Here are the methods I've come up with, with some guidance from @Deactivated User and @cy on #ktor, to adapt these in both directions -- these seem to work but opinions on correctness, efficiency, and style are welcome: Adapting
Flowable<ByteBuffer>
to
ByteReadChannel
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
  try {
    this@toByteReadChannel.consumeEach {
      channel.writeFully(it)
    }
  } catch (e: Throwable) {
    channel.close(e)
  }
}.channel
and in the other direction:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
  return rxFlowable {
    this@toFlowable.lookAheadSuspend {
      consumeEachRemaining {
        // we have to make a copy as the ByteBuffer here comes from a reused ring buffer
        this@rxFlowable.channel.send(it.copy())
        true
      }
      this@rxFlowable.close()
    }
  }
}
Note the
copy()
in the second code snippet. I don't like this but don't know how to get rid of it.
One update to this: the explicit call to
this@rxFlowable.close()
was occasionally causing the library consuming the
Flowable
to throw an error. I've removed it, but is that ok to do in this case?