Hi, I have a custom HTTP engine implementation and...
# ktor
a
Hi, I have a custom HTTP engine implementation and I’m trying to pass back the send and receive channels back to the caller. However when accessing the channels outside of this context they appear to be closed. I can’t find where the channels are closed in the Ktor code. Is there any way to pass these channels to a different context without any issues? I need them for HTTP hijacking, more specifically to attach to containers using the Docker API. I appreciate any help!
a
Can you share your code as text and highlight the part where the channels are passed back to the caller?
a
yes,
Copy code
@InternalAPI
    override suspend fun execute(data: HttpRequestData): HttpResponseData {
        val socket = when (data.url.protocol.name) {
            "tcp", "http" -> {
                aSocket(ActorSelectorManager(dispatcher)).tcp().connect(data.url.host, data.url.port)
            }

            "unix" -> {
                aSocket(ActorSelectorManager(dispatcher)).tcp()
                    .connect(UnixSocketAddress(data.attributes[(AttributeKey("address"))]))
            }

            else -> {
                throw IllegalArgumentException("Unsupported protocol ${data.url.protocol}")
            }
        }

        val receiveChannel = socket.openReadChannel()
        val sendChannel = socket.openWriteChannel(autoFlush = true)

val callContext = callContext()

        val input = this@UnixDSCIOEngine.mapEngineExceptions(receiveChannel, data)
        val originOutput = this@UnixDSCIOEngine.mapEngineExceptions(sendChannel, data)

        if (data.attributes.getOrNull(AttributeKey("hijack")) == true) {
            writeRequest(data, output, callContext, false)
            timeoutJob.cancel()
            return hijack(sendChannel, input)
        }
Copy code
private suspend fun hijack(
        sendChannel: ByteWriteChannel,
        input: ByteReadChannel
    ): HttpResponseData {
        val rawResponse = parseResponse(input) ?: throw EOFException("Failed to parse HTTP response: unexpected EOF")

        println(rawResponse.headers)
        val headers = HeadersImpl(rawResponse.headers.toMap())
        val contentType = headers[HttpHeaders.ContentType]

        if (contentType == "application/vnd.docker.multiplexed-stream") { //TODO: add support
            throw UnsupportedOperationException(
                "Multiplexed streams are not supported. Streams msut be attached to a TTY."
            )
        }

        val dkStream = DockerStream(sendChannel, input)
        val requestTime = GMTDate()
        rawResponse.use {
            val status = HttpStatusCode(rawResponse.status, rawResponse.statusText.toString())
            val version = HttpProtocolVersion.parse(rawResponse.version)
            val body = dkStream
            return HttpResponseData(status, requestTime, headers, version, body, callContext())
        }
    }
they are passed in the body of the HttpResponseData, in the hijack function
a
I think that's because the response body is canceled after it's saved (this line). Can you try streaming the response body like in the example?
a
That’s very interesting, thank you so much I will try this method
Copy code
statement.execute { res ->
        val dockerStream = res.body<DockerStream>()
        val c = dockerStream.sendChannel
        val channel = dockerStream.receiveChannel
        while (!channel.isClosedForRead) {
            val packet = channel.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
            while (!packet.isEmpty) {
                val bytes = packet.readBytes()
                file.appendBytes(bytes)
                println("Received ${file.length()} bytes from ${res.contentLength()}")
            }
        }
    c.close()
    }
I attempted to do it like the example, but when debugging the channels still appear to be terminated. I’ve verified that the cleanup does not occur before streaming so something else must be closing the channels.
please ignore the above message, it was my mistake, so far it appears that the channels are open
when trying something like this, input works fine but the sendchannel always terminates. i’ve verified it works when done in the context of the HttpEngine but here it does not
Copy code
.execute { res ->
        val channels = res.body<Pair<ByteWriteChannel, ByteReadChannel>>()
        val sendChannel = channels.first
        val input = channels.second

        coroutineScope {
            val writeJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                do {
                    val r = input.readUTF8Line()
                    if (r != null)
                        println(r)
                } while (true)
            }

            val readJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                do {
                    val v = readln()
                    sendChannel.writeStringUtf8("$v\n")
                } while (v != "exit")
                writeJob.cancel()
                sendChannel.close()
                input.cancel()
            }

            writeJob.join()
            readJob.join()
        }
    }