Hello, I am currently working on parsing a 15GB fi...
# coroutines
h
Hello, I am currently working on parsing a 15GB file with JSON objects separated by a new line. This is stored in an S3 which I am accessing via the AWS Kotlin SDK, which implements its get operation with a
ByteStream
(custom class) that we can translate into a
Flow<ByteArray>
, which we can then convert to String, parse with Jackson and consume. I'm currently not sure how best to handle this concurrently. From
Copy code
response.body?.toFlow()
what is the fastest way (concurrently if possible) to go through it? I believe the safest implementation is to go through the ByteArray, decode to a String and then have an accumulative variable so ensure that if a JSON object is split between ByteArrays it gets parsed, but I don't believe we're able to run this concurrently, this it's taking quite a while. Any ideas?
u
How about reading the ByteStream into a ByteArray, until you find two consecutive new lines? Then emit into a flow. Then continue readingthe rest, emitting at every “\n\n”. The flow could then be collected on a different thread. splitting and reading on the one hand and parsing on the other hand concurrently. If you want the parsing to be concurrent as well, you can make it async:
Copy code
byteArrayFlow.map {
  async { parseJson(it) }
}
.collect {
  val json=it.await()
  ...
}
j
There's no point in making it concurrent if you have all of the data already buffered into memory. You will only make it go slower. Parallelism is still an obvious choice, though, assuming you can spend those resources here.
Unless we're saying you want the byte array chunking and JSON parsing to be concurrent (i.e., find a single chunk, then JSON parse it, then next chunk, then JSON parse that, etc.)
h
The object is not all stored in memory, we transform
ByteStream
into
Flow<ByteArray>
and we can customise the chunk size, which then calls S3 to get it from there
u
With multiple cores, even parsing the already chunked data could be concurrent with benefits. But depending on the network connection, it is probably most important to start parsing while the data is still loading: reading the ByteStream into a ByteArray, until you find two consecutive new lines? Then emit into a flow. Then continue readingthe rest, emitting at every “\n\n”.
j
Right, that's parallelism though, not concurrency.
🙂 1
I'm saying once you find a chunk the JSON parsing of that chunk should be entirely synchronous, whether on that same core or on another.
👌 2
h
With some LLM magic, I think it captures it well. Has to run sequentially though (which is probably ok here?)
j
Mmm no I would not use that. There are lots of needless very large copies in here. It also doesn't handle a codepoint split across two byte arrays, so you'll wind up with a pair of replacement characters at random spots non-deterministically (since presumably the AWS SDK's
ByteArray
size is based on network reads).
h
ByteArray here is already the Kotlin one (From stdlib)
j
You might consider using kotlinx.io which has unsafe operations that allow you to build a
Source
from `ByteArray`s that you donate. Then you can do
indexOf('\n'.code.toByte())
to get the next newline, and
readString(thatIndex)
to perform the UTF-8 decoding. This will handle split codepoints and only doing a single UTF-8 decode of the entire JSON object which you can then hand to Jackson.
Yes but you are not the one creating it. If the underlying Socket returns two bytes of a eight-byte emoji at the end of one and the remaining six bytes at the start of the next one you will get two replacement characters instead of one emoji.
h
Why would that be that case? Wouldn't that be
Copy code
ByteArray("abc")
ByteArray("def\n")
j
those are single-byte codepoints
you need a multi-byte codepoint to demonstrate the problem
anything non-ascii, basically
So
is, for example, 0xE299A5. If you got 0xE299 at the end of one byte array and 0xA5 at the start of the second one, you'd get two replacement characters because decoded separately neither is a valid UTF-8 byte sequence.
h
Oh, I see your point, if we end up having more complex characters, the characters themselves might be split between ByteArrays
That is currently working for now, but can definitively see I am not pushing my CPU at all. I'll come back to this tomorrow, already a bit late here. Thank you both for your inputs, much appreciated!
j
That
ByteStream
abstraction is already doing you harm and causing copies to be performed. I would do a
when
on it to get access to the underlying stream which seems like it can already do everything you need.
Ideally the underlying type is a
SourceStream
, whose implementation is a copy of Okio vendored into the SDK with the copyright washed off.
🌶️ 1
h
the one that appears at runtime is
SourceStream
. When you mention getting a
when
statement on it, you mean so that I can access the file directly?
j
Well you're accessing the underlying stream rather than an abstraction of it
Well, with a
SourceStream
it looks like you can call
readFrom()
to get their
SdkSource
(nee Okio's
Source
) and then call
buffer()
on it to get a
SdkBufferedSource
(nee Okio's
BufferedSource
).
h
Oh clear, so instead of doing ByteStream -> ByteArray -> String, go directly to the String
j
However, they annoyingly have removed Okio's
indexOf
API from their copy, which makes finding the newlines on their
SdkBufferedSource
harder than it needs to be (but not imposisble)
Once you do find them, it's just a matter of calling
readUtf8(thatIndex)
to get a full string of a single JSON object
h
Copy code
fun ByteStream.toJsonLines(): Flow<String> = flow {
        when (this@toJsonLines) {
            is ByteStream.Buffer -> TODO()
            is ByteStream.ChannelStream -> TODO()
            is ByteStream.SourceStream -> {
                val bufferedSource = readFrom().buffer()

                while (!bufferedSource.exhausted()) {
                    var lineLength = 0L

                    // Look ahead for newline
                    val peek = bufferedSource.peek()
                    while (!peek.exhausted()) {
                        if (peek.readByte() == '\n'.code.toByte()) {
                            break
                        }
                        lineLength++
                    }

                    // Read the line
                    val line = bufferedSource.readUtf8(lineLength)
                    emit(line)

                    // Skip the newline character
                    if (!bufferedSource.exhausted()) {
                        bufferedSource.skip(1)
                    }
                }
            }
        }
    }
something along these lines? It's currently performing slower
I need to revisit it tomorrow
j
Yeah that peek and loop is not doing you any favors for performance
The real Okio has a
readUtf8Line()
that would do all this for you. It's a shame they gutted it so severely.
h
I'll probably look up that implementation and copy it to my package then
Thanks!
a
You could read in chunks till you find an element, emit that element and then pass the rest of the chunk back to read further ( i would do it like that)
thank you color 1
h
Spending a bit more time than I should on this. Started to deploy this on an AWS Lambda to see the performance without being constrained by my local network. With a maxed out lambda (6 vCPUs), I'm able to go through the whole file in close to 15 minutes, which obviously sucks. Another thing that makes Lambda suck is that you can only see the memory usage, so I'm not even sure if this is IO or CPU (although eventually understood when I scaled the RAM which leads to more CPU). I guess I'll just have to live with this taking a long while. Best to spin up an ECS task for this. And yes, I'll be discussing with the people exporting this to just put it into a SQL table, especially as I'm just interested in less than 10% of the info there 🙃
Thank you for the support on this, was a fun ride!