I am sending response content from ktor where the ...
# ktor
r
I am sending response content from ktor where the content comes from an
InputStream
. Oddly, this doesn't appear to be a built-in function? I've created this class which I'm passing to
call.respond
, but can someone comment on whether I'm missing something here?
Copy code
class InputStreamContent(
  private val stream: InputStream,
  etag: String?,
  override val contentLength: Long?,
  override val contentType: ContentType): OutgoingContent.ReadChannelContent() {

  init {
    if (etag != null) versions += EntityTagVersion(etag)
  }

  override fun readFrom(): ByteReadChannel = stream.toByteReadChannel()
}
📝 1
c
Looks good, it should work but remember that
InputStream
is blocking so it may cause problems once the
stream
get stuck
r
Is there a more coroutine friendly construct I can use?
This stream is sourced over a network so it could definitely block.
Also note: the docs are getting way better but I found no docs related to implementing this.
Ok, so if I'm understanding this correctly, I need my stream source to use an Async mechanism instead, like
AsynchronousSocketChannel
or something.
c
Yes, exactly. If your source stream is transferred over http then you can try ktor client that is async and you can get content as a byte channel
@Deactivated User ^^^
d
@rocketraman related to implementing a custom OutgoingContent? Or to using channels?
r
I'm going to try and integrate https://github.com/Azure/azure-storage-java/tree/New-Storage-SDK-V10-Preview, which uses ReactiveX (rx 2). Any samples I can look at that uses ktor + the rx2 coroutine extensions?
@Deactivated User For docs, yes
c
I don't see any async API...
Ah I see, they just didn't update javadocs,
but I still can't see download yet, only upload
r
@cy Yeah, this is a beta of a new release of their driver so they likely haven't got much in the way of docs yet.
At the end of the day though, I guess I will have some type of
Single
or
Observable
to deal with.
d
You can use
writer
from kotlinx.coroutines-io to custom generate a
ByteReadChannel
. If you have a callback function receiving ByteBuffer chunks, you can create a channel and pass to that writer job
Flowable<ByteBuffer>
Maybe you can create an extension function to convert a Flowable<ByteBuffer> into a ByteReadChannel and then send a content with that ByteReadChannel. Also if the API allows you to get an endpoint + headers you can use the ktor’s HttpClient and pipe the client body to the server response
r
Thanks guys, let me play with this a bit. I've done lots of async programming, but coroutines and rxjava are both mostly new to me, so this should be interesting.
👍 1
c
implementing backpressure with RX is the most difficult part so be careful 😉
r
Ok, so really interesting 🙂
@cy @Deactivated User I believe this turned out to be quite simple -- I did as @Deactivated User suggested and converted a
Flowable<ByteBuffer>
to a
ByteReadChannel
via `writer`:
Copy code
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
  try {
    this@toByteReadChannel.consumeEach {
      channel.writeFully(it)
    }
  } catch (e: Throwable) {
    channel.close(e)
  }
}.channel
I haven't fully explored whether this is doing all the right things under the hood, but it seems to work. If I put a breakpoint at the
writeFully
line I see the client getting bytes a chunk at a time, as each breakpoint is hit and run.
I think this handles back-pressure properly because the source is a
Flowable
that is being consumed by a suspendable.
c
this@toByteReadChannel.consumeEach
what does it do? Does Flowable have
consumeEach
? Where it is implemented?
r
@cy NOTE: I had a bit of trouble with the opposite during upload: converting from a
ByteReadChannel
to a
Flowable<ByteBuffer>
. I ended up with this:
Copy code
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
  return rxFlowable {
    this@toFlowable.lookAheadSuspend {
      consumeEachRemaining {
        // we have to make a copy otherwise the ByteBuffer gets recycled by ktor and the Flowable items get corrupted
        this@rxFlowable.channel.send(it.copy())
        true
      }
      this@rxFlowable.close()
    }
  }
}
Note the
copy()
in there on the
ByteBuffer
from ktor. Without that, the upload contained garbage. I'm pretty sure its due to concurrent ByteBuffer recycling by ktor, which corrupts the Flowable. Is there a better approach to this?
@cy This is a utility extension that comes from the rx2 kotlin coroutines library. https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx2
c
Well, the buffer inside of lookAheadSuspend is always the same: it is a ring buffer
r
Ah that explains why I was having issues. Is there a better
ByteReadChannel
API I should use to split it up into `ByteBuffer`'s I can send to a
Flowable
? Btw,
rxFlowable
here also comes from the same rx2 coroutines adapter lib.
Or is this question better asked on #coroutines ?
c
as f ar as understand, Flowable doesn't provide any way to recycle sent elements, right ?
so copying is the only way I see...
r
Yes, it seems so. Perhaps I will post on #coroutines just to get opinion from the experts there.