Hey folks, I’m trying to create a socket in my KMP...
# ktor
a
Hey folks, I’m trying to create a socket in my KMP project using ktor but it seems like it doesn’t support JS target. Is that true or am I missing something? For reference, this is the dependency I’m trying to use:
Copy code
io.ktor:ktor-network
e
You must use Node's Socket. No Ktor support.
I had to deal with the same stuff, but wrapping it was fairly straightforward. Just forget all the nice utilities, you'll have to reimplement even the most basic functionalities.
Example to which I'll refer back tomorrow. I need to read bytes from the socket until an
\r\n
is encountered. This would be straightforward with Ktor, just use
readUntilDelimiter
However in Node's case I need to: 1. read
n
bytes 2. check if the
\r\n
token is present in those bytes 3. if it isn't, go to 1. again 4. if it's present but it's not the last two bytes, split and push back the remaining bytes with
unshift
or
push
This would be straightforward with Ktor, just use
readUntilDelimiter
Well, not so much. I can't know if the delimiter was found or not when the destination
ByteBuffer
is full
a
Thanks for the info Edoardo 🙏
m
@Edoardo Luppi would you happen to have this code OSS somewhere? I might have to do the same thing and I'd fancy some inspiration
e
@mbonnin the entire socket wrapper or the line reading part?
m
More the wrapper. Planning to adapt that to okio for line reading
But only if you have something at hand, I can dig else
e
@mbonnin how the reading is done.
NodeBuffer
is a type alias for Node's
Buffer
ZByteArray
is a type alias for
Uint8Array
Copy code
override suspend fun readBytes(byteCount: Int): NodeBuffer {
  require(byteCount > 0) {
    "The byte count must be greater than zero"
  }

  // Fast path
  val fastChunk = socket.read(byteCount).unsafeCast<NodeBuffer?>()

  if (fastChunk != null) {
    return fastChunk
  }

  return suspendCancellableCoroutineWithTimeout(30.seconds) { continuation ->
    var removeListeners = {}
    val readableHandler = {
      val chunk = socket.read(byteCount).unsafeCast<NodeBuffer?>()

      if (chunk != null) {
        removeListeners()
        continuation.resume(chunk)
      }
    }

    val closeHandler = {
      removeListeners()
      continuation.resumeWithException(IllegalStateException("Unexpected socket 'close' event"))
    }

    val endHandler = {
      removeListeners()
      continuation.resumeWithException(IllegalStateException("Unexpected socket 'end' event"))
    }

    val errorHandler: (Error) -> Unit = {
      removeListeners()
      continuation.resumeWithException(it)
    }

    removeListeners = {
      socket.removeListener(Event.CLOSE, closeHandler)
      socket.removeListener(Event.ERROR, errorHandler)
      socket.removeListener(Event.END, endHandler)
      socket.removeListener(Event.READABLE, readableHandler)
    }

    socket.once(Event.CLOSE, closeHandler)
    socket.once(Event.END, endHandler)
    socket.once(Event.ERROR, errorHandler)
    socket.on(Event.READABLE, readableHandler)

    readableHandler()
  }
}
How writing is done.
Copy code
private suspend fun write(bytes: ZByteArray) =
  suspendCancellableCoroutineWithTimeout(30.seconds) { continuation ->
    socket.write(bytes, "binary".unsafeCast<BufferEncoding>()) {
      if (it == null) {
        continuation.resume(Unit)
      } else {
        continuation.resumeWithException(it)
      }
    }
  }
thank you color 1
Btw, if you notice code that can be improved, I'd be happy to take in the edits 😂
Reading a line.
Copy code
override suspend fun readLine(lineBreak: LineBreak, transcoder: CharsetTranscoder?): String {
  val tempTranscoder = transcoder ?: this.transcoder
  val delimiterBytes = tempTranscoder.encodeString(lineBreak.seq)
  val lineBuffer = createOutputStream(tempTranscoder, 1024)

  while (true) {
    val toRead = maxOf(1, availableForRead())
    val bytes = readBytes(toRead)
    lineBuffer.writeBytes(bytes)

    val index = bytes.indexOf(delimiterBytes[0])

    if (index < 0) {
      // No line break token in this byte sequence, keep reading
      continue
    }

    // The index in the line buffer where the line break token might have been found
    val endIndex = lineBuffer.getBytesWritten() - (bytes.size - index)

    // Match! We got a '\n' or a '\r'.
    // If the line break token we are looking for is '\r\n', we need some more processing
    if (lineBreak == LineBreak.CRLF) {
      // If we already have the necessary byte for the check we do not need network calls
      if (bytes.length > index + 1) {
        if (bytes[index + 1] != delimiterBytes[1]) {
          // Not a '\n', keep reading
          continue
        }
      } else {
        // We do not have enough data to understand if the next byte is a '\n'.
        // Thus we request one more byte from the network
        val additionalByte = readBytes(1)
        lineBuffer.writeBytes(additionalByte)

        if (additionalByte[0] != delimiterBytes[1]) {
          // Not a '\n', keep reading
          continue
        }
      }
    }

    val allBytes = lineBuffer.getBytes()

    // Put the remaining bytes back into the socket read buffer.
    // In the process discard the line break token
    val remainingBytes = allBytes.slice(endIndex + delimiterBytes.size)
    socket.unshift(remainingBytes, "binary".unsafeCast<BufferEncoding>())

    val lineBytes = allBytes.slice(0, endIndex)
    return tempTranscoder.decodeBytes(lineBytes)
  }
}
There are parts of the code related to my own abstraction, but it should be easy to read and adapt.
m
That's what I ended up doing if you're curious: https://github.com/apollographql/apollo-kotlin/pull/5316
e
Thanks! Looks good, only left a comment for the imports
❤️ 1