https://kotlinlang.org logo
Title
r

rocketraman

11/20/2018, 2:03 PM
With kotlinx-coroutines-io-jvm 0.1.0-beta1 I used to have code that did
ByteReadChannel.lookAheadSuspend
in which I was able to call
consumeEachRemaining
, like this:
fun CoroutineScope.flowableOf(byteReadChannel: ByteReadChannel): Flowable<ByteBuffer> = rxFlowable {
  byteReadChannel.lookAheadSuspend {
    consumeEachRemaining {
      send(it.copy())
      true
    }
  }
}
In kotlinx-coroutines-io 0.1.0, this same code compiles but does not work -- it does not read the entire channel. Neither does the corresponding non-suspending
lookAhead
method work.
o

orangy

11/20/2018, 2:35 PM
@cy
r

rocketraman

11/20/2018, 2:41 PM
Trying it now with reverting to 0.1.0-beta-1, but staying on ktor 1.0.0-rc
Nope, doesn't look like those two are compatible
c

cy

11/20/2018, 2:43 PM
Let me look at it
It doesn't do
awaitAtLeast
You can always write a loop instead
r

rocketraman

11/20/2018, 2:51 PM
What doesn't do
awaitAtLeast
? The 0.1.0 library code?
c

cy

11/20/2018, 2:52 PM
consumeEachRemaining
there are no suspension points at all in it's loop
r

rocketraman

11/20/2018, 2:53 PM
But its the same code as 0.1.0-beta-1 isn't it? AFAICT the library implementation is the same.
c

cy

11/20/2018, 2:56 PM
Then it worked accidentally
In ktor it is used to write directly to NIO socket in CIO engine
r

rocketraman

11/20/2018, 2:57 PM
Ok, is there a simpler way to consume all of a
ByteReadChannel
? I wouldn't expect this to be very difficult.
c

cy

11/20/2018, 3:00 PM
Most likely
consumeEachRemaining
need to be suspend function. For now you need a loop
r

rocketraman

11/20/2018, 3:00 PM
Ok, let me try to replicate the logic in ktor
Now I got too many bytes, lol
c

cy

11/20/2018, 3:04 PM
This is relatively dangerous API, but in your case I think it is acceptable to use it. For regular users I wouldn't recommend to use it at all
If you have too many bytes, perhaps you forgot
consumed(N)
Or have it specified wrong
r

rocketraman

11/20/2018, 3:05 PM
Yup, I forgot
consumed
Hmm:
java.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
lookAheadSuspend {
    // consumeEachRemaining does not work for now, use the loop logic from ktor
    while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      }

      while (buffer.hasRemaining()) {
        val count = buffer.remaining()
        send(buffer.copy())
        consumed(count)
      }
    }
  }
Exception thrown from
consumed(count)
c

cy

11/20/2018, 3:14 PM
You actually don't need the second loop
What exception?
r

rocketraman

11/20/2018, 3:15 PM
java.lang.IllegalStateException: Unable to consume 4088 bytes: not enough available bytes
c

cy

11/20/2018, 3:16 PM
Does
copy
modify buffer's position?
r

rocketraman

11/20/2018, 3:16 PM
Yes it does seem to, its
copy
from Ktor
c

cy

11/20/2018, 3:17 PM
Well, firs of all, it's ktor internal function, you shouldn't use it anymore
The second thing is that it keeps position untouched so you should have infinite loop here
It's not infinite just because consumed fails as you are trying to consume the same bytes twice
In fact you don't need the second loop at all
r

rocketraman

11/20/2018, 3:19 PM
Fair enough... ok, this is what I have now, which seems to work in a single test case:
while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      } else {
        val count = buffer.remaining()
        send(buffer.duplicate())
        consumed(count)
      }
    }
duplicate
is just the bog-standard JDK ByteBuffer method
c

cy

11/20/2018, 3:20 PM
This will lead to serious damage
duplicate doesn't copy buffer content but creates a view
r

rocketraman

11/20/2018, 3:20 PM
Oh, right, it shares content
c

cy

11/20/2018, 3:21 PM
You need your own copy function instead
r

rocketraman

11/20/2018, 3:33 PM
Ok, this is what I have now:
lookAheadSuspend {
    // consumeEachRemaining does not work for now, use the loop logic from ktor
    while (true) {
      val buffer = request(0, 1)
      if (buffer == null) {
        if (!awaitAtLeast(1)) break
        continue
      } else {
        val count = buffer.remaining()
        send(buffer.copy())
        consumed(count)
      }
    }
  }
and
fun ByteBuffer.copy(): ByteBuffer {
  val readBuffer = asReadOnlyBuffer()
  return ByteBuffer.allocate(readBuffer.remaining()).apply {
    put(readBuffer)
    flip()
  }
}
Seems to work but any comments?
c

cy

11/20/2018, 3:34 PM
continue
is redundant since you have
else
. However it should work
👍 1
r

rocketraman

11/20/2018, 3:42 PM
Hang on, above you said ktor internal copy leaves buffer position untouched. My version does the same. I think if we want to actually consumer the input channel (which in this case, I think it should be), then my copy method should be:
fun ByteBuffer.copy(): ByteBuffer {
  return ByteBuffer.allocate(remaining()).apply {
    put(this@copy)
    flip()
  }
}
... and yes, this works too.
c

cy

11/20/2018, 4:25 PM
there is no difference since you don't have a loop anymore
r

rocketraman

11/20/2018, 4:26 PM
Ah, the
buffer.hasRemaining
-- right
So there is no problem with moving the position on these ktor-provided buffers right?
And also, do you want me to create an issue for
consumeEachRemaining
?
c

cy

11/20/2018, 5:12 PM
Yes, please
And yes, the problem was in the loop condition so now there is no problem with moving buffer's position
The only requirement is to not share a buffer returned from
request
👍 1
it is even not required to consume all bytes from the buffer but you can't consume more bytes than you actually have
r

rocketraman

11/20/2018, 5:19 PM