Zhang Zihan
02/07/2025, 11:08 AMSource
via ByteReadChannel.toInputStream().asSource().buffered()
, calls to readAtMostTo
and request
will be blocked. This does not seem to be expected, it should be non-blocking and return the byte count processed immediately.
For this reason, I had to create a new RawSouce class to wrap ByteReadChannel
. But I encountered a problem:
How to notify Ktor ByteReadChannel
to receive bytes from the data source and cache them in the internal Buffer without blocking? (Just like <http://kotlinx.io|kotlinx.io> Source.request(n)
).Aleksei Tirman [JB]
02/07/2025, 11:13 AMByteReadChannel.toInputStream
states the following:
Create blocking java.io.InputStream for this channel that does block every time the channel suspends at read. Similar to do reading in runBlocking however you can pass it to regular blocking APISo, the blocking is expected.
Aleksei Tirman [JB]
02/07/2025, 11:14 AMByteReadChannel
to InputStream
?Zhang Zihan
02/07/2025, 11:20 AMkotlinx-io
Source
.
When reading a file using Source
readAtMostTo
it doesn't block.
I need to get bytes from a ByteReadChannel
in time, and blocking will cause some trouble. I have to frequently get availableForRead
to prevent blocking:
val socket: Socket = ...
val readChannel = socket.openReadChannel()
val source = readChannel.toInputStream().asSource().buffered()
val buffer = Buffer()
val mutex = Mutex()
launch(Dispatchers.IO) {
while (true) {
val availableForRead = readChannel.availableForRead
if (availableForRead > 0) {
mutex.withLock {
source.readAtMostTo(buffer, availableForRead.toLong())
}
}
readChannel.awaitContent(1)
}
}
launch {
while (true) {
if (buffer.size > 0) {
val bytes = mutex.withLock {
buffer.readByteArray()
}
println(bytes.decodeToString())
}
delay(2500)
}
}
This is the wrapper class I created, but I don't know how to notify the ByteReadChannel
to receive new data, because any related functions will cause blocking:
@JvmInline
value class ByteReadChannelSource(private val channel: ByteReadChannel) : RawSource {
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
if (channel.isClosedForRead) throw channel.closedCause ?: IllegalStateException()
require(byteCount in 0..Int.MAX_VALUE) { "byteCount: $byteCount" }
val availableForRead = channel.availableForRead
if (availableForRead == 0) return -1
val byteReadCount = minOf(byteCount.toInt(), availableForRead)
val buffer = runBlocking {
channel.readBuffer(byteReadCount)
}
return sink.transferFrom(buffer)
}
override fun close() {
channel.cancel()
}
}
Aleksei Tirman [JB]
02/07/2025, 12:42 PMSource
created from InputStream
is a blocking operation because it implies reading from the InputStream
which blocks the current thread.Aleksei Tirman [JB]
02/07/2025, 12:43 PMInputStream
?Zhang Zihan
02/07/2025, 3:09 PMreadAtMostTo
. I didn't find it in ByteReadChannel
. Then I tried to find it in `InputStream`/`Source`, but I didn't find it either.Bruce Hamilton
02/07/2025, 4:06 PMByteChannel
has internal properties that can be used for this kind of work
channel.awaitContent() // populate the buffer
channel.readBuffer.readAtMostTo(sink)
Zhang Zihan
02/08/2025, 1:03 AMgetter
of ByteChannel.readBuffer
calls moveFlushToReadBuffer
, which can get data from downstream immediately without blocking.
So the following wrapper can then be used to create a non-blocking kotlinx-io
`Source`:
@JvmInline
value class ByteReadChannelSource(private val channel: ByteReadChannel) : RawSource {
@OptIn(InternalAPI::class)
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
// The getter of readBuffer will get data from the downstream immediately
return channel.readBuffer.readAtMostTo(sink, byteCount)
}
override fun close() {
channel.cancel()
}
}
fun ByteReadChannel.asSource() = ByteReadChannelSource(this)
// main
val socket: Source = ...
val readChannel = socket.openReadChannel()
val source = readChannel.asSource().buffered()
// Print the latest data every 100ms without causing any blocking or suspend
launch(Dispatchers.IO) {
while (true) {
val bytes = source.readByteArray()
if (bytes.isNotEmpty()) {
println(bytes.decodeToString())
}
delay(100)
}
}