Ali Kay
07/10/2024, 7:59 AMAleksei Tirman [JB]
07/10/2024, 8:01 AMAli Kay
07/10/2024, 8:05 AM@InternalAPI
override suspend fun execute(data: HttpRequestData): HttpResponseData {
val socket = when (data.url.protocol.name) {
"tcp", "http" -> {
aSocket(ActorSelectorManager(dispatcher)).tcp().connect(data.url.host, data.url.port)
}
"unix" -> {
aSocket(ActorSelectorManager(dispatcher)).tcp()
.connect(UnixSocketAddress(data.attributes[(AttributeKey("address"))]))
}
else -> {
throw IllegalArgumentException("Unsupported protocol ${data.url.protocol}")
}
}
val receiveChannel = socket.openReadChannel()
val sendChannel = socket.openWriteChannel(autoFlush = true)
val callContext = callContext()
val input = this@UnixDSCIOEngine.mapEngineExceptions(receiveChannel, data)
val originOutput = this@UnixDSCIOEngine.mapEngineExceptions(sendChannel, data)
if (data.attributes.getOrNull(AttributeKey("hijack")) == true) {
writeRequest(data, output, callContext, false)
timeoutJob.cancel()
return hijack(sendChannel, input)
}
private suspend fun hijack(
sendChannel: ByteWriteChannel,
input: ByteReadChannel
): HttpResponseData {
val rawResponse = parseResponse(input) ?: throw EOFException("Failed to parse HTTP response: unexpected EOF")
println(rawResponse.headers)
val headers = HeadersImpl(rawResponse.headers.toMap())
val contentType = headers[HttpHeaders.ContentType]
if (contentType == "application/vnd.docker.multiplexed-stream") { //TODO: add support
throw UnsupportedOperationException(
"Multiplexed streams are not supported. Streams msut be attached to a TTY."
)
}
val dkStream = DockerStream(sendChannel, input)
val requestTime = GMTDate()
rawResponse.use {
val status = HttpStatusCode(rawResponse.status, rawResponse.statusText.toString())
val version = HttpProtocolVersion.parse(rawResponse.version)
val body = dkStream
return HttpResponseData(status, requestTime, headers, version, body, callContext())
}
}
Ali Kay
07/10/2024, 8:05 AMAleksei Tirman [JB]
07/10/2024, 8:13 AMAli Kay
07/10/2024, 8:15 AMAli Kay
07/10/2024, 8:43 AMstatement.execute { res ->
val dockerStream = res.body<DockerStream>()
val c = dockerStream.sendChannel
val channel = dockerStream.receiveChannel
while (!channel.isClosedForRead) {
val packet = channel.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
val bytes = packet.readBytes()
file.appendBytes(bytes)
println("Received ${file.length()} bytes from ${res.contentLength()}")
}
}
c.close()
}
I attempted to do it like the example, but when debugging the channels still appear to be terminated. I’ve verified that the cleanup does not occur before streaming so something else must be closing the channels.Ali Kay
07/10/2024, 8:53 AMAli Kay
07/10/2024, 11:35 AM.execute { res ->
val channels = res.body<Pair<ByteWriteChannel, ByteReadChannel>>()
val sendChannel = channels.first
val input = channels.second
coroutineScope {
val writeJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
do {
val r = input.readUTF8Line()
if (r != null)
println(r)
} while (true)
}
val readJob = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
do {
val v = readln()
sendChannel.writeStringUtf8("$v\n")
} while (v != "exit")
writeJob.cancel()
sendChannel.close()
input.cancel()
}
writeJob.join()
readJob.join()
}
}