Hello! I found that after creating a `Source` via ...
# ktor
z
Hello! I found that after creating a
Source
via
ByteReadChannel.toInputStream().asSource().buffered()
, calls to
readAtMostTo
and
request
will be blocked. This does not seem to be expected, it should be non-blocking and return the byte count processed immediately. For this reason, I had to create a new RawSouce class to wrap
ByteReadChannel
. But I encountered a problem: How to notify Ktor
ByteReadChannel
to receive bytes from the data source and cache them in the internal Buffer without blocking? (Just like
<http://kotlinx.io|kotlinx.io> Source.request(n)
).
a
The KDoc for the
ByteReadChannel.toInputStream
states the following:
Create blocking java.io.InputStream for this channel that does block every time the channel suspends at read. Similar to do reading in runBlocking however you can pass it to regular blocking API
So, the blocking is expected.
Can you tell me why you need to convert the
ByteReadChannel
to
InputStream
?
z
I mean it doesn't seem to work as expected in the
kotlinx-io
Source
. When reading a file using
Source
readAtMostTo
it doesn't block. I need to get bytes from a
ByteReadChannel
in time, and blocking will cause some trouble. I have to frequently get
availableForRead
to prevent blocking:
Copy code
val socket: Socket = ...
val readChannel = socket.openReadChannel()
val source = readChannel.toInputStream().asSource().buffered()

val buffer = Buffer()
val mutex = Mutex()
launch(Dispatchers.IO) {
    while (true) {
        val availableForRead = readChannel.availableForRead
        if (availableForRead > 0) {
            mutex.withLock {
                source.readAtMostTo(buffer, availableForRead.toLong())
            }
        }
        readChannel.awaitContent(1)
    }
}
launch {
    while (true) {
        if (buffer.size > 0) {
            val bytes = mutex.withLock {
                buffer.readByteArray()
            }
            println(bytes.decodeToString())
        }
        delay(2500)
    }
}
This is the wrapper class I created, but I don't know how to notify the
ByteReadChannel
to receive new data, because any related functions will cause blocking:
Copy code
@JvmInline
value class ByteReadChannelSource(private val channel: ByteReadChannel) : RawSource {
    override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
        if (channel.isClosedForRead) throw channel.closedCause ?: IllegalStateException()
        require(byteCount in 0..Int.MAX_VALUE) { "byteCount: $byteCount" }

        val availableForRead = channel.availableForRead
        if (availableForRead == 0) return -1

        val byteReadCount = minOf(byteCount.toInt(), availableForRead)
        val buffer = runBlocking {
            channel.readBuffer(byteReadCount)
        }
        return sink.transferFrom(buffer)
    }

    override fun close() {
        channel.cancel()
    }
}
a
Reading from the
Source
created from
InputStream
is a blocking operation because it implies reading from the
InputStream
which blocks the current thread.
Why do need a conversion to the
InputStream
?
z
Because I need a non-blocking
readAtMostTo
. I didn't find it in
ByteReadChannel
. Then I tried to find it in `InputStream`/`Source`, but I didn't find it either.
b
ByteChannel
has internal properties that can be used for this kind of work
Copy code
channel.awaitContent() // populate the buffer
channel.readBuffer.readAtMostTo(sink)
👍🏻 1
z
By reading the source code, I found that the
getter
of
ByteChannel.readBuffer
calls
moveFlushToReadBuffer
, which can get data from downstream immediately without blocking. So the following wrapper can then be used to create a non-blocking
kotlinx-io
`Source`:
Copy code
@JvmInline
value class ByteReadChannelSource(private val channel: ByteReadChannel) : RawSource {
    @OptIn(InternalAPI::class)
    override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
        // The getter of readBuffer will get data from the downstream immediately
        return channel.readBuffer.readAtMostTo(sink, byteCount)
    }

    override fun close() {
        channel.cancel()
    }
}

fun ByteReadChannel.asSource() = ByteReadChannelSource(this)

// main
val socket: Source = ...
val readChannel = socket.openReadChannel()
val source = readChannel.asSource().buffered()

// Print the latest data every 100ms without causing any blocking or suspend
launch(Dispatchers.IO) {
    while (true) {
        val bytes = source.readByteArray()
        if (bytes.isNotEmpty()) {
            println(bytes.decodeToString())
        }
        delay(100)
    }
}
👍 1