Abdulahi
08/27/2023, 8:38 PMio.ktor:ktor-network
Edoardo Luppi
08/27/2023, 9:17 PMEdoardo Luppi
08/27/2023, 9:18 PMEdoardo Luppi
08/27/2023, 9:51 PM\r\n
is encountered.
This would be straightforward with Ktor, just use readUntilDelimiter
However in Node's case I need to:
1. read n
bytes
2. check if the \r\n
token is present in those bytes
3. if it isn't, go to 1. again
4. if it's present but it's not the last two bytes, split and push back the remaining bytes with unshift
push
Edoardo Luppi
08/27/2023, 11:43 PMThis would be straightforward with Ktor, just useWell, not so much. I can't know if the delimiter was found or not when the destinationreadUntilDelimiter
ByteBuffer
is fullAbdulahi
08/29/2023, 11:58 AMmbonnin
10/16/2023, 8:13 PMEdoardo Luppi
10/17/2023, 8:02 AMmbonnin
10/17/2023, 8:29 AMmbonnin
10/17/2023, 8:29 AMEdoardo Luppi
10/17/2023, 9:27 AMNodeBuffer
is a type alias for Node's Buffer
ZByteArray
is a type alias for Uint8Array
override suspend fun readBytes(byteCount: Int): NodeBuffer {
require(byteCount > 0) {
"The byte count must be greater than zero"
}
// Fast path
val fastChunk = socket.read(byteCount).unsafeCast<NodeBuffer?>()
if (fastChunk != null) {
return fastChunk
}
return suspendCancellableCoroutineWithTimeout(30.seconds) { continuation ->
var removeListeners = {}
val readableHandler = {
val chunk = socket.read(byteCount).unsafeCast<NodeBuffer?>()
if (chunk != null) {
removeListeners()
continuation.resume(chunk)
}
}
val closeHandler = {
removeListeners()
continuation.resumeWithException(IllegalStateException("Unexpected socket 'close' event"))
}
val endHandler = {
removeListeners()
continuation.resumeWithException(IllegalStateException("Unexpected socket 'end' event"))
}
val errorHandler: (Error) -> Unit = {
removeListeners()
continuation.resumeWithException(it)
}
removeListeners = {
socket.removeListener(Event.CLOSE, closeHandler)
socket.removeListener(Event.ERROR, errorHandler)
socket.removeListener(Event.END, endHandler)
socket.removeListener(Event.READABLE, readableHandler)
}
socket.once(Event.CLOSE, closeHandler)
socket.once(Event.END, endHandler)
socket.once(Event.ERROR, errorHandler)
socket.on(Event.READABLE, readableHandler)
readableHandler()
}
}
How writing is done.
private suspend fun write(bytes: ZByteArray) =
suspendCancellableCoroutineWithTimeout(30.seconds) { continuation ->
socket.write(bytes, "binary".unsafeCast<BufferEncoding>()) {
if (it == null) {
continuation.resume(Unit)
} else {
continuation.resumeWithException(it)
}
}
}
Edoardo Luppi
10/17/2023, 9:31 AMEdoardo Luppi
10/17/2023, 9:33 AMoverride suspend fun readLine(lineBreak: LineBreak, transcoder: CharsetTranscoder?): String {
val tempTranscoder = transcoder ?: this.transcoder
val delimiterBytes = tempTranscoder.encodeString(lineBreak.seq)
val lineBuffer = createOutputStream(tempTranscoder, 1024)
while (true) {
val toRead = maxOf(1, availableForRead())
val bytes = readBytes(toRead)
lineBuffer.writeBytes(bytes)
val index = bytes.indexOf(delimiterBytes[0])
if (index < 0) {
// No line break token in this byte sequence, keep reading
continue
}
// The index in the line buffer where the line break token might have been found
val endIndex = lineBuffer.getBytesWritten() - (bytes.size - index)
// Match! We got a '\n' or a '\r'.
// If the line break token we are looking for is '\r\n', we need some more processing
if (lineBreak == LineBreak.CRLF) {
// If we already have the necessary byte for the check we do not need network calls
if (bytes.length > index + 1) {
if (bytes[index + 1] != delimiterBytes[1]) {
// Not a '\n', keep reading
continue
}
} else {
// We do not have enough data to understand if the next byte is a '\n'.
// Thus we request one more byte from the network
val additionalByte = readBytes(1)
lineBuffer.writeBytes(additionalByte)
if (additionalByte[0] != delimiterBytes[1]) {
// Not a '\n', keep reading
continue
}
}
}
val allBytes = lineBuffer.getBytes()
// Put the remaining bytes back into the socket read buffer.
// In the process discard the line break token
val remainingBytes = allBytes.slice(endIndex + delimiterBytes.size)
socket.unshift(remainingBytes, "binary".unsafeCast<BufferEncoding>())
val lineBytes = allBytes.slice(0, endIndex)
return tempTranscoder.decodeBytes(lineBytes)
}
}
Edoardo Luppi
10/17/2023, 9:37 AMmbonnin
10/20/2023, 4:45 PMEdoardo Luppi
10/20/2023, 4:59 PM