rocketraman
06/26/2018, 2:08 PMInputStream
. Oddly, this doesn't appear to be a built-in function? I've created this class which I'm passing to call.respond
, but can someone comment on whether I'm missing something here?
class InputStreamContent(
private val stream: InputStream,
etag: String?,
override val contentLength: Long?,
override val contentType: ContentType): OutgoingContent.ReadChannelContent() {
init {
if (etag != null) versions += EntityTagVersion(etag)
}
override fun readFrom(): ByteReadChannel = stream.toByteReadChannel()
}
cy
06/26/2018, 2:34 PMInputStream
is blocking so it may cause problems once the stream
get stuckrocketraman
06/26/2018, 2:39 PMrocketraman
06/26/2018, 2:40 PMrocketraman
06/26/2018, 2:41 PMrocketraman
06/26/2018, 2:48 PMAsynchronousSocketChannel
or something.cy
06/26/2018, 2:51 PMcy
06/26/2018, 2:51 PMDeactivated User
06/26/2018, 2:52 PMrocketraman
06/26/2018, 2:53 PMrocketraman
06/26/2018, 2:53 PMcy
06/26/2018, 2:59 PMcy
06/26/2018, 3:04 PMcy
06/26/2018, 3:04 PMrocketraman
06/26/2018, 3:08 PMrocketraman
06/26/2018, 3:14 PMSingle
or Observable
to deal with.rocketraman
06/26/2018, 3:19 PMDeactivated User
06/26/2018, 3:20 PMwriter
from kotlinx.coroutines-io to custom generate a ByteReadChannel
. If you have a callback function receiving ByteBuffer chunks, you can create a channel and pass to that writer jobDeactivated User
06/26/2018, 3:20 PMDeactivated User
06/26/2018, 3:22 PMrocketraman
06/26/2018, 3:23 PMcy
06/26/2018, 4:15 PMrocketraman
06/26/2018, 4:23 PMrocketraman
06/26/2018, 9:29 PMFlowable<ByteBuffer>
to a ByteReadChannel
via `writer`:
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
try {
this@toByteReadChannel.consumeEach {
channel.writeFully(it)
}
} catch (e: Throwable) {
channel.close(e)
}
}.channel
I haven't fully explored whether this is doing all the right things under the hood, but it seems to work. If I put a breakpoint at the writeFully
line I see the client getting bytes a chunk at a time, as each breakpoint is hit and run.rocketraman
06/26/2018, 10:30 PMFlowable
that is being consumed by a suspendable.cy
06/27/2018, 8:19 AMthis@toByteReadChannel.consumeEach
what does it do? Does Flowable have consumeEach
? Where it is implemented?rocketraman
06/27/2018, 2:14 PMByteReadChannel
to a Flowable<ByteBuffer>
. I ended up with this:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
return rxFlowable {
this@toFlowable.lookAheadSuspend {
consumeEachRemaining {
// we have to make a copy otherwise the ByteBuffer gets recycled by ktor and the Flowable items get corrupted
this@rxFlowable.channel.send(it.copy())
true
}
this@rxFlowable.close()
}
}
}
Note the copy()
in there on the ByteBuffer
from ktor. Without that, the upload contained garbage. I'm pretty sure its due to concurrent ByteBuffer recycling by ktor, which corrupts the Flowable. Is there a better approach to this?rocketraman
06/27/2018, 2:14 PMcy
06/27/2018, 2:17 PMrocketraman
06/27/2018, 2:20 PMByteReadChannel
API I should use to split it up into `ByteBuffer`'s I can send to a Flowable
? Btw, rxFlowable
here also comes from the same rx2 coroutines adapter lib.rocketraman
06/27/2018, 2:21 PMcy
06/27/2018, 3:08 PMcy
06/27/2018, 3:08 PMrocketraman
06/27/2018, 3:18 PM