Rob Elliot
03/02/2023, 2:16 PMInputStream) if I decide not to process the whole array. So basically I want one function to return things: LazyCollection<T> where LazyCollection<T>: AutoCloseable and another to do things.use { } to ensure it's cleaned up...
It feels like this must be a pretty common pattern? My googling is letting me down...Rob Elliot
03/02/2023, 2:25 PMgenerateSequence, but Sequence doesn't implement `Closeable`/`AutoCloseable` which makes it hard to hand it off to another object and let that object clean up, so I switched to this:
java.util.stream.Stream
.generate { /* work to parse out a single object from the inputStream */ }
.takeWhile { it != null }
.onClose { inputStream.close() }
However, I now can't use stream.parallel().forEach, because it ends up trying to process the actual JSON tokens in parallel rather than the resulting objects. So my theory is that I need to read the objects in in batches of I don't know, 16 or so, and then process those 16 in parallel, read in the next batch, and so on.
I'd like the client using it to still see it as a single Stream<Thing> though - it shouldn't need to care about the underlying implementation...Rob Elliot
03/02/2023, 2:26 PMAugust Lilleaas
03/02/2023, 2:27 PMAugust Lilleaas
03/02/2023, 2:29 PMawaitAll to wait for a list of async operations to completemkrussel
03/02/2023, 2:39 PMRob Elliot
03/02/2023, 2:45 PMephemient
03/02/2023, 7:11 PMephemient
03/02/2023, 7:13 PMsequential() methodephemient
03/02/2023, 7:15 PMephemient
03/02/2023, 7:16 PM.forEachOrderedRob Elliot
03/02/2023, 9:06 PM.parallel().forEachOrdered still blows up on the json parsing; not sure how it could not really? The block passed to generate is actively consuming the InputStream in a json parser, so parallel calls to it will fail. Without some kind of sequential chunking / batching I can't see how I can process the resulting parse objects in parallel.Rob Elliot
03/02/2023, 9:07 PMFlow, see if it might have some lib methods to save me trying to do it myselfephemient
03/02/2023, 10:45 PMStream.generate itself is not parallelephemient
03/02/2023, 10:51 PMflow {
file.bufferedReader().use { while (true) emit(it.readlnOrNull() ?: break) }
}
and the reader will get closed whether the flow is fully consumed or the collector is cancelled