rocketraman
06/26/2018, 2:08 PMInputStream. Oddly, this doesn't appear to be a built-in function? I've created this class which I'm passing to call.respond, but can someone comment on whether I'm missing something here?
class InputStreamContent(
  private val stream: InputStream,
  etag: String?,
  override val contentLength: Long?,
  override val contentType: ContentType): OutgoingContent.ReadChannelContent() {
  init {
    if (etag != null) versions += EntityTagVersion(etag)
  }
  override fun readFrom(): ByteReadChannel = stream.toByteReadChannel()
}cy
06/26/2018, 2:34 PMInputStream is blocking so it may cause problems once the stream get stuckrocketraman
06/26/2018, 2:39 PMrocketraman
06/26/2018, 2:40 PMrocketraman
06/26/2018, 2:41 PMrocketraman
06/26/2018, 2:48 PMAsynchronousSocketChannel or something.cy
06/26/2018, 2:51 PMcy
06/26/2018, 2:51 PMDeactivated User
06/26/2018, 2:52 PMrocketraman
06/26/2018, 2:53 PMrocketraman
06/26/2018, 2:53 PMcy
06/26/2018, 2:59 PMcy
06/26/2018, 3:04 PMcy
06/26/2018, 3:04 PMrocketraman
06/26/2018, 3:08 PMrocketraman
06/26/2018, 3:14 PMSingle or Observable to deal with.rocketraman
06/26/2018, 3:19 PMDeactivated User
06/26/2018, 3:20 PMwriter from kotlinx.coroutines-io to custom generate a ByteReadChannel. If you have a callback function receiving ByteBuffer chunks, you can create a channel and pass to that writer jobDeactivated User
06/26/2018, 3:20 PMDeactivated User
06/26/2018, 3:22 PMrocketraman
06/26/2018, 3:23 PMcy
06/26/2018, 4:15 PMrocketraman
06/26/2018, 4:23 PMrocketraman
06/26/2018, 9:29 PMFlowable<ByteBuffer> to a ByteReadChannel via `writer`:
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
  try {
    this@toByteReadChannel.consumeEach {
      channel.writeFully(it)
    }
  } catch (e: Throwable) {
    channel.close(e)
  }
}.channel
I haven't fully explored whether this is doing all the right things under the hood, but it seems to work. If I put a breakpoint at the writeFully line I see the client getting bytes a chunk at a time, as each breakpoint is hit and run.rocketraman
06/26/2018, 10:30 PMFlowable that is being consumed by a suspendable.cy
06/27/2018, 8:19 AMthis@toByteReadChannel.consumeEach what does it do? Does Flowable have consumeEach? Where it is implemented?rocketraman
06/27/2018, 2:14 PMByteReadChannel to a Flowable<ByteBuffer>. I ended up with this:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
  return rxFlowable {
    this@toFlowable.lookAheadSuspend {
      consumeEachRemaining {
        // we have to make a copy otherwise the ByteBuffer gets recycled by ktor and the Flowable items get corrupted
        this@rxFlowable.channel.send(it.copy())
        true
      }
      this@rxFlowable.close()
    }
  }
}
Note the copy() in there on the ByteBuffer from ktor. Without that, the upload contained garbage. I'm pretty sure its due to concurrent ByteBuffer recycling by ktor, which corrupts the Flowable. Is there a better approach to this?rocketraman
06/27/2018, 2:14 PMcy
06/27/2018, 2:17 PMrocketraman
06/27/2018, 2:20 PMByteReadChannel API I should use to split it up into `ByteBuffer`'s I can send to a Flowable? Btw, rxFlowable here also comes from the same rx2 coroutines adapter lib.rocketraman
06/27/2018, 2:21 PMcy
06/27/2018, 3:08 PMcy
06/27/2018, 3:08 PMrocketraman
06/27/2018, 3:18 PM