rocketraman
11/20/2018, 2:03 PMByteReadChannel.lookAheadSuspend
in which I was able to call consumeEachRemaining
, like this:
fun CoroutineScope.flowableOf(byteReadChannel: ByteReadChannel): Flowable<ByteBuffer> = rxFlowable {
byteReadChannel.lookAheadSuspend {
consumeEachRemaining {
send(it.copy())
true
}
}
}
In kotlinx-coroutines-io 0.1.0, this same code compiles but does not work -- it does not read the entire channel. Neither does the corresponding non-suspending lookAhead
method work.orangy
11/20/2018, 2:35 PMrocketraman
11/20/2018, 2:41 PMcy
11/20/2018, 2:43 PMawaitAtLeast
rocketraman
11/20/2018, 2:51 PMawaitAtLeast
? The 0.1.0 library code?cy
11/20/2018, 2:52 PMconsumeEachRemaining
rocketraman
11/20/2018, 2:53 PMcy
11/20/2018, 2:56 PMrocketraman
11/20/2018, 2:57 PMByteReadChannel
? I wouldn't expect this to be very difficult.cy
11/20/2018, 3:00 PMconsumeEachRemaining
need to be suspend function. For now you need a looprocketraman
11/20/2018, 3:00 PMcy
11/20/2018, 3:04 PMconsumed(N)
rocketraman
11/20/2018, 3:05 PMconsumed
java.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
lookAheadSuspend {
// consumeEachRemaining does not work for now, use the loop logic from ktor
while (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
}
while (buffer.hasRemaining()) {
val count = buffer.remaining()
send(buffer.copy())
consumed(count)
}
}
}
consumed(count)
cy
11/20/2018, 3:14 PMrocketraman
11/20/2018, 3:15 PMjava.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
cy
11/20/2018, 3:16 PMcopy
modify buffer's position?rocketraman
11/20/2018, 3:16 PMcopy
from Ktorcy
11/20/2018, 3:17 PMrocketraman
11/20/2018, 3:19 PMwhile (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
} else {
val count = buffer.remaining()
send(buffer.duplicate())
consumed(count)
}
}
duplicate
is just the bog-standard JDK ByteBuffer methodcy
11/20/2018, 3:20 PMrocketraman
11/20/2018, 3:20 PMcy
11/20/2018, 3:21 PMrocketraman
11/20/2018, 3:33 PMlookAheadSuspend {
// consumeEachRemaining does not work for now, use the loop logic from ktor
while (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
} else {
val count = buffer.remaining()
send(buffer.copy())
consumed(count)
}
}
}
and
fun ByteBuffer.copy(): ByteBuffer {
val readBuffer = asReadOnlyBuffer()
return ByteBuffer.allocate(readBuffer.remaining()).apply {
put(readBuffer)
flip()
}
}
cy
11/20/2018, 3:34 PMcontinue
is redundant since you have else
. However it should workrocketraman
11/20/2018, 3:42 PMfun ByteBuffer.copy(): ByteBuffer {
return ByteBuffer.allocate(remaining()).apply {
put(this@copy)
flip()
}
}
... and yes, this works too.cy
11/20/2018, 4:25 PMrocketraman
11/20/2018, 4:26 PMbuffer.hasRemaining
-- rightconsumeEachRemaining
?cy
11/20/2018, 5:12 PMrequest
rocketraman
11/20/2018, 5:19 PM