Looking for a bit of advice... I have an endpoint...
# getting-started
r
Looking for a bit of advice... I have an endpoint that basically returns an enormous JSON array. I'd like to avoid reading the whole thing into memory. I'd also like to process objects in the array in parallel. And I'd like to clean up (close the underlying
InputStream
) if I decide not to process the whole array. So basically I want one function to return
things: LazyCollection<T>
where
LazyCollection<T>: AutoCloseable
and another to do
things.use { }
to ensure it's cleaned up... It feels like this must be a pretty common pattern? My googling is letting me down...
This next bit is NOT KOTLIN (or at least it's only Kotlin in that I couldn't do it using Sequence!) I started with
generateSequence
, but
Sequence
doesn't implement `Closeable`/`AutoCloseable` which makes it hard to hand it off to another object and let that object clean up, so I switched to this:
Copy code
java.util.stream.Stream
  .generate { /* work to parse out a single object from the inputStream */ }
  .takeWhile { it != null }
  .onClose { inputStream.close() }
However, I now can't use
stream.parallel().forEach
, because it ends up trying to process the actual JSON tokens in parallel rather than the resulting objects. So my theory is that I need to read the objects in in batches of I don't know, 16 or so, and then process those 16 in parallel, read in the next batch, and so on. I'd like the client using it to still see it as a single
Stream<Thing>
though - it shouldn't need to care about the underlying implementation...
Is there something better I don't know about? Perhaps in the coroutines libs?
a
sounds like you’re on to something - sequentially parsing a JSON in parallell isn’t going to be easy 😄
for coroutines, there’s
awaitAll
to wait for a list of async operations to complete
m
If the order of the processing doesn't matter, a channel with a buffer might be a good solution. Have one producer and many consumers. The producer will block while the buffer is full and then consumers take things as they are able to.
r
Thanks, I'll look into channel
e
Flow (rather than Channel) is the abstraction that allows for cleanup
but with Java Streams, there is a
sequential()
method
oh wait generate is sequential by default
I think your issue is with the consumer; you meant to use
.forEachOrdered
r
.parallel().forEachOrdered
still blows up on the json parsing; not sure how it could not really? The block passed to generate is actively consuming the InputStream in a json parser, so parallel calls to it will fail. Without some kind of sequential chunking / batching I can't see how I can process the resulting parse objects in parallel.
I'll have a look at
Flow
, see if it might have some lib methods to save me trying to do it myself
e
I'm actually not sure what you're running into. a
Stream.generate
itself is not parallel
but with flow, you can definitely
Copy code
flow {
    file.bufferedReader().use { while (true) emit(it.readlnOrNull() ?: break) }
}
and the reader will get closed whether the flow is fully consumed or the collector is cancelled
💯 1