With kotlinx-coroutines-io-jvm 0.1.0-beta1 I used ...
# coroutines
r
With kotlinx-coroutines-io-jvm 0.1.0-beta1 I used to have code that did
ByteReadChannel.lookAheadSuspend
in which I was able to call
consumeEachRemaining
, like this:
Copy code
fun CoroutineScope.flowableOf(byteReadChannel: ByteReadChannel): Flowable<ByteBuffer> = rxFlowable {
  byteReadChannel.lookAheadSuspend {
    consumeEachRemaining {
      send(it.copy())
      true
    }
  }
}
In kotlinx-coroutines-io 0.1.0, this same code compiles but does not work -- it does not read the entire channel. Neither does the corresponding non-suspending
lookAhead
method work.
o
@cy
r
Trying it now with reverting to 0.1.0-beta-1, but staying on ktor 1.0.0-rc
Nope, doesn't look like those two are compatible
c
Let me look at it
It doesn't do
awaitAtLeast
You can always write a loop instead
r
What doesn't do
awaitAtLeast
? The 0.1.0 library code?
c
consumeEachRemaining
there are no suspension points at all in it's loop
r
But its the same code as 0.1.0-beta-1 isn't it? AFAICT the library implementation is the same.
c
Then it worked accidentally
In ktor it is used to write directly to NIO socket in CIO engine
r
Ok, is there a simpler way to consume all of a
ByteReadChannel
? I wouldn't expect this to be very difficult.
c
Most likely
consumeEachRemaining
need to be suspend function. For now you need a loop
r
Ok, let me try to replicate the logic in ktor
Now I got too many bytes, lol
c
This is relatively dangerous API, but in your case I think it is acceptable to use it. For regular users I wouldn't recommend to use it at all
If you have too many bytes, perhaps you forgot
consumed(N)
Or have it specified wrong
r
Yup, I forgot
consumed
Hmm:
java.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
Copy code
lookAheadSuspend {
    // consumeEachRemaining does not work for now, use the loop logic from ktor
    while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      }

      while (buffer.hasRemaining()) {
        val count = buffer.remaining()
        send(buffer.copy())
        consumed(count)
      }
    }
  }
Exception thrown from
consumed(count)
c
You actually don't need the second loop
What exception?
r
java.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
c
Does
copy
modify buffer's position?
r
Yes it does seem to, its
copy
from Ktor
c
Well, firs of all, it's ktor internal function, you shouldn't use it anymore
The second thing is that it keeps position untouched so you should have infinite loop here
It's not infinite just because consumed fails as you are trying to consume the same bytes twice
In fact you don't need the second loop at all
r
Fair enough... ok, this is what I have now, which seems to work in a single test case:
Copy code
while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      } else {
        val count = buffer.remaining()
        send(buffer.duplicate())
        consumed(count)
      }
    }
duplicate
is just the bog-standard JDK ByteBuffer method
c
This will lead to serious damage
duplicate doesn't copy buffer content but creates a view
r
Oh, right, it shares content
c
You need your own copy function instead
r
Ok, this is what I have now:
Copy code
lookAheadSuspend {
    // consumeEachRemaining does not work for now, use the loop logic from ktor
    while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      } else {
        val count = buffer.remaining()
        send(buffer.copy())
        consumed(count)
      }
    }
  }
and
Copy code
fun ByteBuffer.copy(): ByteBuffer {
  val readBuffer = asReadOnlyBuffer()
  return ByteBuffer.allocate(readBuffer.remaining()).apply {
    put(readBuffer)
    flip()
  }
}
Seems to work but any comments?
c
continue
is redundant since you have
else
. However it should work
👍 1
r
Hang on, above you said ktor internal copy leaves buffer position untouched. My version does the same. I think if we want to actually consumer the input channel (which in this case, I think it should be), then my copy method should be:
Copy code
fun ByteBuffer.copy(): ByteBuffer {
  return ByteBuffer.allocate(remaining()).apply {
    put(this@copy)
    flip()
  }
}
... and yes, this works too.
c
there is no difference since you don't have a loop anymore
r
Ah, the
buffer.hasRemaining
-- right
So there is no problem with moving the position on these ktor-provided buffers right?
And also, do you want me to create an issue for
consumeEachRemaining
?
c
Yes, please
And yes, the problem was in the loop condition so now there is no problem with moving buffer's position
The only requirement is to not share a buffer returned from
request
👍 1
it is even not required to consume all bytes from the buffer but you can't consume more bytes than you actually have
r