rocketraman
11/20/2018, 2:03 PMByteReadChannel.lookAheadSuspend
in which I was able to call consumeEachRemaining
, like this:
fun CoroutineScope.flowableOf(byteReadChannel: ByteReadChannel): Flowable<ByteBuffer> = rxFlowable {
byteReadChannel.lookAheadSuspend {
consumeEachRemaining {
send(it.copy())
true
}
}
}
In kotlinx-coroutines-io 0.1.0, this same code compiles but does not work -- it does not read the entire channel. Neither does the corresponding non-suspending lookAhead
method work.orangy
rocketraman
11/20/2018, 2:41 PMrocketraman
11/20/2018, 2:42 PMcy
11/20/2018, 2:43 PMcy
11/20/2018, 2:49 PMawaitAtLeast
cy
11/20/2018, 2:49 PMrocketraman
11/20/2018, 2:51 PMawaitAtLeast
? The 0.1.0 library code?cy
11/20/2018, 2:52 PMconsumeEachRemaining
cy
11/20/2018, 2:52 PMrocketraman
11/20/2018, 2:53 PMcy
11/20/2018, 2:56 PMcy
11/20/2018, 2:56 PMcy
11/20/2018, 2:57 PMrocketraman
11/20/2018, 2:57 PMByteReadChannel
? I wouldn't expect this to be very difficult.cy
11/20/2018, 3:00 PMconsumeEachRemaining
need to be suspend function. For now you need a looprocketraman
11/20/2018, 3:00 PMrocketraman
11/20/2018, 3:02 PMcy
11/20/2018, 3:04 PMcy
11/20/2018, 3:05 PMconsumed(N)
cy
11/20/2018, 3:05 PMrocketraman
11/20/2018, 3:05 PMconsumed
rocketraman
11/20/2018, 3:10 PMjava.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
rocketraman
11/20/2018, 3:10 PMlookAheadSuspend {
// consumeEachRemaining does not work for now, use the loop logic from ktor
while (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
}
while (buffer.hasRemaining()) {
val count = buffer.remaining()
send(buffer.copy())
consumed(count)
}
}
}
rocketraman
11/20/2018, 3:14 PMconsumed(count)
cy
11/20/2018, 3:14 PMcy
11/20/2018, 3:15 PMrocketraman
11/20/2018, 3:15 PMjava.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
cy
11/20/2018, 3:16 PMcopy
modify buffer's position?rocketraman
11/20/2018, 3:16 PMcopy
from Ktorcy
11/20/2018, 3:17 PMcy
11/20/2018, 3:17 PMcy
11/20/2018, 3:18 PMcy
11/20/2018, 3:18 PMrocketraman
11/20/2018, 3:19 PMwhile (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
} else {
val count = buffer.remaining()
send(buffer.duplicate())
consumed(count)
}
}
rocketraman
11/20/2018, 3:19 PMduplicate
is just the bog-standard JDK ByteBuffer methodcy
11/20/2018, 3:20 PMcy
11/20/2018, 3:20 PMrocketraman
11/20/2018, 3:20 PMcy
11/20/2018, 3:21 PMrocketraman
11/20/2018, 3:33 PMrocketraman
11/20/2018, 3:33 PMlookAheadSuspend {
// consumeEachRemaining does not work for now, use the loop logic from ktor
while (true) {
val buffer = request(0, 1)
if (buffer == null) {
if (!awaitAtLeast(1)) break
continue
} else {
val count = buffer.remaining()
send(buffer.copy())
consumed(count)
}
}
}
and
fun ByteBuffer.copy(): ByteBuffer {
val readBuffer = asReadOnlyBuffer()
return ByteBuffer.allocate(readBuffer.remaining()).apply {
put(readBuffer)
flip()
}
}
rocketraman
11/20/2018, 3:34 PMcy
11/20/2018, 3:34 PMcontinue
is redundant since you have else
. However it should workrocketraman
11/20/2018, 3:42 PMfun ByteBuffer.copy(): ByteBuffer {
return ByteBuffer.allocate(remaining()).apply {
put(this@copy)
flip()
}
}
... and yes, this works too.cy
11/20/2018, 4:25 PMrocketraman
11/20/2018, 4:26 PMbuffer.hasRemaining
-- rightrocketraman
11/20/2018, 4:27 PMrocketraman
11/20/2018, 4:31 PMconsumeEachRemaining
?cy
11/20/2018, 5:12 PMcy
11/20/2018, 5:13 PMcy
11/20/2018, 5:13 PMrequest
cy
11/20/2018, 5:14 PMrocketraman
11/20/2018, 5:19 PM