Hugo Costa
03/26/2025, 5:42 PMByteStream
(custom class) that we can translate into a Flow<ByteArray>
, which we can then convert to String, parse with Jackson and consume.
I'm currently not sure how best to handle this concurrently. From
response.body?.toFlow()
what is the fastest way (concurrently if possible) to go through it?
I believe the safest implementation is to go through the ByteArray, decode to a String and then have an accumulative variable so ensure that if a JSON object is split between ByteArrays it gets parsed, but I don't believe we're able to run this concurrently, this it's taking quite a while. Any ideas?uli
03/26/2025, 6:19 PMbyteArrayFlow.map {
async { parseJson(it) }
}
.collect {
val json=it.await()
...
}
jw
03/26/2025, 6:25 PMjw
03/26/2025, 6:25 PMHugo Costa
03/26/2025, 6:27 PMByteStream
into Flow<ByteArray>
and we can customise the chunk size, which then calls S3 to get it from thereuli
03/26/2025, 6:27 PMjw
03/26/2025, 6:28 PMjw
03/26/2025, 6:29 PMHugo Costa
03/26/2025, 6:34 PMjw
03/26/2025, 6:36 PMByteArray
size is based on network reads).Hugo Costa
03/26/2025, 6:37 PMjw
03/26/2025, 6:37 PMSource
from `ByteArray`s that you donate. Then you can do indexOf('\n'.code.toByte())
to get the next newline, and readString(thatIndex)
to perform the UTF-8 decoding. This will handle split codepoints and only doing a single UTF-8 decode of the entire JSON object which you can then hand to Jackson.jw
03/26/2025, 6:38 PMHugo Costa
03/26/2025, 6:41 PMByteArray("abc")
ByteArray("def\n")
jw
03/26/2025, 6:41 PMjw
03/26/2025, 6:41 PMjw
03/26/2025, 6:43 PMjw
03/26/2025, 6:46 PM♥
is, for example, 0xE299A5. If you got 0xE299 at the end of one byte array and 0xA5 at the start of the second one, you'd get two replacement characters because decoded separately neither is a valid UTF-8 byte sequence.Hugo Costa
03/26/2025, 6:48 PMHugo Costa
03/26/2025, 6:53 PMByteStream
- https://github.com/smithy-lang/smithy-kotlin/blob/0a29a00a17964b7126974744127765c5[…]core/common/src/aws/smithy/kotlin/runtime/content/ByteStream.ktHugo Costa
03/26/2025, 6:55 PMjw
03/26/2025, 6:59 PMByteStream
abstraction is already doing you harm and causing copies to be performed. I would do a when
on it to get access to the underlying stream which seems like it can already do everything you need.jw
03/26/2025, 6:59 PMSourceStream
, whose implementation is a copy of Okio vendored into the SDK with the copyright washed off.Hugo Costa
03/26/2025, 7:03 PMSourceStream
. When you mention getting a when
statement on it, you mean so that I can access the file directly?jw
03/26/2025, 7:07 PMjw
03/26/2025, 7:09 PMSourceStream
it looks like you can call readFrom()
to get their SdkSource
(nee Okio's Source
) and then call buffer()
on it to get a SdkBufferedSource
(nee Okio's BufferedSource
).Hugo Costa
03/26/2025, 7:10 PMjw
03/26/2025, 7:10 PMindexOf
API from their copy, which makes finding the newlines on their SdkBufferedSource
harder than it needs to be (but not imposisble)jw
03/26/2025, 7:10 PMreadUtf8(thatIndex)
to get a full string of a single JSON objectHugo Costa
03/26/2025, 7:15 PMfun ByteStream.toJsonLines(): Flow<String> = flow {
when (this@toJsonLines) {
is ByteStream.Buffer -> TODO()
is ByteStream.ChannelStream -> TODO()
is ByteStream.SourceStream -> {
val bufferedSource = readFrom().buffer()
while (!bufferedSource.exhausted()) {
var lineLength = 0L
// Look ahead for newline
val peek = bufferedSource.peek()
while (!peek.exhausted()) {
if (peek.readByte() == '\n'.code.toByte()) {
break
}
lineLength++
}
// Read the line
val line = bufferedSource.readUtf8(lineLength)
emit(line)
// Skip the newline character
if (!bufferedSource.exhausted()) {
bufferedSource.skip(1)
}
}
}
}
}
something along these lines? It's currently performing slowerHugo Costa
03/26/2025, 7:15 PMjw
03/26/2025, 7:15 PMjw
03/26/2025, 7:16 PMreadUtf8Line()
that would do all this for you. It's a shame they gutted it so severely.Hugo Costa
03/26/2025, 7:17 PMHugo Costa
03/26/2025, 7:17 PMAlexandru Caraus
03/27/2025, 6:33 AMHugo Costa
03/27/2025, 1:52 PMHugo Costa
03/27/2025, 1:52 PM