I would need some `ktor-io` help. I encounter some...
# ktor
d
I would need some
ktor-io
help. I encounter some difficulties because
ReadByteChannel
and
ByteReadPacket
don't share a common super-interface for all the
read***
operations. I have to read a possibly large amount X of bytes (up to 2^63-1) from a
ReadByteChannel
. The precise number X of bytes is read first then I wish to delegate the read of the content of those X bytes to a sub-method which have to read (in this amount of bytes) an unknown number of packets of different sizes. My problem is that
readPacket(size: Int)
takes an
Int
and not a
Long
(and also that may be crazy to load a possibly very large number of bytes in memory). Ideally, I would like to create as "sub-channel" with a limit (of X bytes). Any hint on how I should proceed ? (Please tell me if I am not clear enough.)
a
Could you please share some code to clarify your problem?
d
Well I ended up, counting manually how much bytes I had to read left. Bellow is an extract of the code (the project is still private, I'll open-source it soon). The
readCommand
method reads a "command". It first reads a byte that contains flags, one of which specifies whether the command content size is encoded as an unsigned byte or as an unsigned long. It then reads the command size (in
readCommandContent
) followed by the command name. Depending on the name it reads the command data. The problematic case here is the
CommandName.READY
case: I have to read an arbitrary number of "properties" of arbitrary size. However, I can't know if I read all the available properties without counting the remaining bytes manually. What I am looking for is some APIs to not have to manually count the remaining bytes (that could be up to 2^63-1 bytes).
Copy code
internal suspend fun ByteReadChannel.readCommand(): Command {
    val flags = readByte().toUByte()
    if (flags and FLAG_COMMAND == NULL) invalidFrame("Expected command")
    return readCommandContent(flags)
}

private suspend fun ByteReadChannel.readCommandContent(flags: UByte): Command {
    val size = if (flags and FLAG_LONG_SIZE == NULL) {
        readByte().toUByte().toLong()
    } else {
        readLong().toULong().toLong()
    }

    val commandNameString = readShortString().decodeToString()
    val commandName = CommandName.find(commandNameString)
        ?: invalidFrame("Invalid command name: $commandNameString")

    val remaining = size - readShortString().size + 1

    return when (commandName) {
        CommandName.READY -> ReadyCommand(readProperties(remaining))
        CommandName.ERROR -> ErrorCommand(readShortString().decodeToString())
        else -> TODO()
    }
}


private suspend fun ByteReadChannel.readProperties(dataSize: Long): Map<PropertyName, ByteArray> {
    var remaining = dataSize
    val properties = mutableMapOf<PropertyName, ByteArray>()
    while (remaining > 0) {
        val (propertyName, value) = readProperty()
        properties[propertyName] = value
        remaining -= 1 + propertyName.bytes.size + 4 + value.size
    }
    return properties
}

private suspend fun ByteReadChannel.readProperty(): Pair<PropertyName, ByteArray> {
    val propertyName = PropertyName.find(readShortString().decodeToString())
        ?: invalidFrame("Can't read property")
    val valueSize = readInt()
    val valueBytes = ByteArray(valueSize)
    readFully(valueBytes)
    return propertyName to valueBytes
}

// ...
@Aleksei Tirman [JB] Is my explanation more understandable ?
a
So do you want to have a sub-channel to read properties for every command? If I understand correctly the whole purpose is to not read incidentally more bytes than a command contains, right?
d
Absolutely, you understood correctly! :D
Maybe there is an API, or a way to do this, that I missed?
@e5l Would you mind giving your thoughts on this? Shouldn't there be a method such as the following defined on `ReadByteChannel`:
Copy code
fun <T> readPacket(size: Long, reader: BytePacketReader.() -> T): T
where
BytePacketReader
would extend
Input
and have an additional
remaining
property?