Maybe a n00b question, but is there a way to treat...
# getting-started
d
Maybe a n00b question, but is there a way to treat XMLStreamReader as a Sequence? Want to process an XML feed with constant memory and use things like
.windowed
but not sure if this is the way to go. Pretty new to Kotlin and the JVM 😬
n
first step is to try to get it as an Iterable/Iterator, but I'm not seeing anything like that
I don't think SAX-type parsers really lend themselves to that, but I could be wrong
d
yeah I tried something like this:
Copy code
override fun iterator() = object : Iterator<RawProduct> {
        override fun hasNext() = reader.hasNext()
        override fun next(): RawProduct {
            var item: RawProduct? = null
            while (item == null) {
                if (reader.next() == XMLStreamReader.START_ELEMENT && reader.name.toString() == "item")
                    item = xmlMapper.streamValue<Map<String, Any>>(reader).filter { it.value is String } as RawProduct
            }
            return item
        }
But I can’t really figure out how to close the stream if ended prematurely
n
oh, you'll probably find it easier to use
sequence {}
with
yield
calls inside
you'll probably need to close it outside the sequence
t
@nanodeath That would be a violation of Iterable semantics
n
huh?
d
@Tomasz Krakowiak do you think its possible? Or is Sequence the wrong abstraction for doing this?
t
@Dennis Tel Could you ping FQN/package of xmlMapper.streamValue? Looks like Jackson, but google cannot find anything.
d
yup its jackson:
Copy code
private inline fun <reified T> XmlMapper.streamValue(reader: XMLStreamReader): T =
            this.readValue(reader, jacksonTypeRef<T>())

        private val xmlMapper = XmlMapper(
            JacksonXmlModule().apply {
                setDefaultUseWrapper(false)
            }).registerKotlinModule()
            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) as XmlMapper
n
can you share the snippet where you're actually using the iterator?
d
Untitled
n
so I'd probably still say
Copy code
myInputStream.use { stream ->
    XMLSeq.from(stream).windowed().forEach { ... }
}
t
@Dennis Tel In order to conform with Sequence/Iterable/Iterator semantics, you would need to read all data first and release resources. Java Stream's are more appropriate abstraction as they support releasing underlying resources.
n
I do occasionally wish Sequences also had an
onClose
handler...
d
Unfortunate, but I know where to look next. Thanks ya’ll!
n
don't forget that if you do use a Stream + onClose, you need to actually close the stream itself I believe, see https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/nio/file/Files.html#walk(java.nio.file.Path,java.nio.file.FileVisitOption...) for an example (Tomasz, correct me if I'm wrong)
(see API Note)
t
@nanodeath You're correct. Stream's need to be closed, unless specified otherwise (like streams backed by collections).
d
ok i’m looking into wrapping the input stream in an iterator which works with the Java Stream
n
this is a bit roundabout but I'd still probably implement it internally as a sequence and then convert that to a stream using Sequence#asStream(). for example in your code above hasNext() isn't actually consistent with next(), and with sequences you don't need to worry about that
t
This also provide semantically correct way to produce iterator:
Copy code
try(Stream<X> stream = createStream()) {
    Iterator<X> iterator = stream.iterator()
    consumeIterator(iterator)
} // Stream is automatically closed as created using try-with-resources
d
Ok I think i’m close but think I’m lost now 😄
Copy code
fun <T> processXMLStream(stream: InputStream): Stream<T> {
    val reader = XMLInputFactory.newFactory().createXMLStreamReader(stream)

    val iterator = object : Iterator<RawProduct> {
        override fun next(): RawProduct {
            try {
                return if (reader.next() == XMLStreamReader.START_ELEMENT && reader.name.toString() == "item") {
                    xmlMapper.streamValue<Map<String, Any>>(reader).filter { it.value is String } as RawProduct
                } else next()
            } catch (e: XMLStreamException) {
                throw RuntimeException(e);
            }
        }

        override fun hasNext(): Boolean {
            return try {
                val hasNext = reader.hasNext()
                if (!hasNext) reader.close(); // close the stream here

                hasNext;
            } catch (e: XMLStreamException) {
                false;
            }
        }
    };
    
    return ???
}
Not sure what to return here (don’t have experience with Java Streams)
n
if (!hasNext) reader.close(); // close the stream here
also I'd leave this out of the iterator and tack it onto the stream instead
t
Copy code
Spliterator<Path> spliterator =
    Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false).onClose{reader.close()};
Something like that : )
âž• 1
d
ok so this is probably it 😄
Copy code
fun xmlSequence(stream: InputStream): Sequence<RawProduct> {
    val reader = XMLInputFactory.newFactory().createXMLStreamReader(stream)

    val iterator = object : Iterator<RawProduct> {
        override fun next(): RawProduct = try {
            if (reader.next() == XMLStreamReader.START_ELEMENT && reader.name.toString() == "item") {
                xmlMapper.streamValue<Map<String, Any>>(reader).filter { it.value is String } as RawProduct
            } else next()
        } catch (e: XMLStreamException) {
            throw RuntimeException(e);
        }

        override fun hasNext(): Boolean = try {
            reader.hasNext()
        } catch (e: XMLStreamException) {
            false;
        }
    };

    return StreamSupport.stream(
        Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
        false
    ).onClose { reader.close() }.asSequence()
}
Or is it still a bad idea to convert it to a sequence?
t
asSequence
does not close the stream.
d
Wouldn’t that make
asSequence
always a poor choice to use with regards to streams?
t
asSequence
method is invalid, as Sequence kdoc clearly states "Sequences can be iterated multiple times" while this is not possible with iterator obtained from stream, the way current implementation does. I think it's kotlin stdlib semantical bug.
Copy code
@SinceKotlin("1.2")
public fun <T> Stream<T>.asSequence(): Sequence<T> = Sequence { iterator() }
d
Copy code
Sequences can be iterated multiple times, however some sequence implementations might constrain themselves to be iterated only once. That is mentioned specifically in their documentation (e.g. generateSequence overload). The latter sequences throw an exception on an attempt to iterate them the second time.
This doesn’t really say sequences have to be iterable multiple times right?
t
@Dennis Tel Ah, sorry. You're right. Still Stream.asSequence kdoc should state it returns Sequence, which can be iterated only one time and it doesn't.
d
I’ll give this a go and see how this turns out. There is a bunch of code that expects a sequence but i’ll probably wrap some thing around it that make sure the stream is closed
🤘 1
Thanks for all the help and information, learned a lot in a short time 😄
Major props! 😄
t
Also, welcome to Kotlin and JVM community. You have a potential to be good developer, as you read the docs and understood possible memory leak issue : ) Unless you come from Rust community, than - meh ; p
d
Haha nope, i’m mainly a designer/frontend developer but have also done a bunch of stuff with Erlang and Elixir 😄
but thank you for the compliment 🙂
n
Unless you come from Rust
they're all good languages Bront :)
t
@nanodeath Yes, Rust is splendidly paradigmatic about resource management - this is what I meant : ) They would all be like - "Compiler! What the heck are you allowing me to do!"