Hey folks, I have a small issue with sequences. I ...
# announcements
n
Hey folks, I have a small issue with sequences. I have a CSV file parser which parses rows in batches. For each batch, it updates an input set object which contains information about the batching progress (failed rows, completed rows) and stores the updated input set and the parsed objects in our database. Here's the current code:
Copy code
val inputSet = createInputSet(path, month, year, currentTimestamp)

val parser = BatchingDeclineRateFileParser(batchingConfiguration.batchSize, path)

return generateSequence { parser.parseBatch() }
        .map { batch ->
            val updatedInputSet = updateBatchedInputSet(inputSet, batch)

            transactionManager.runInTransaction {
                rateInputRepository.createAll(batch.batchItems, month, year, updatedInputSet.id)
                inputSetRepository.updateBatchStep(updatedInputSet)
            }
            updatedInputSet
        }
        .find { it.isProcessingDone } ?: throw JobException("Batch did not complete for some reason")
This works in general, but due to using map I only update the original input set each time. Is there a function on
Sequence
which works like e.g. ReactiveX's
scan
where I can fold the incoming values into an accumulator? http://reactivex.io/documentation/operators/scan.html
g
You mentioned
fold
, have you tried using it?
n
reduce and fold are both terminating operations, which is bad since parseBatch does not terminate by itself
So that'd create an infinite sequence which will never terminate
parseBatch
returns an instance of a sealed class which has the three implementations
Batch
,
Partial
and
Complete
. I first thought about using
takeWhile
to check for
Complete
, but then I'd miss the last batch, since the completing batch wouldn't be passed to fold
m
So if I understand correctly, you want a custom sequence operator that behaves like this?
Copy code
fun <T, R> Sequence<T>.customScan(initial: R, operation: (acc: R, T) -> R): Sequence<R> {
    return sequence result@ {
        val iterator = iterator()
        if (!iterator.hasNext()) return@result
        var accumulator = initial
        while (iterator.hasNext()) {
            val current = iterator.next()
            accumulator = operation(accumulator, current)
            yield(accumulator)
        }
    }
}
It’s based on the implementations of
Sequence<T>.zipWithNext
and
Sequence<T>.fold
. Note that it’s not tested it, it’s just a quick example. I don’t think you can have the specific behavior you want by combining stdlib operations.
n
Went ahead and implemented it myself:
Copy code
public fun <T, Acc> Sequence<T>.scan(initial: Acc, transform: (Acc, T) -> Acc): Sequence<Acc> {
    return ScanningSequence(this, initial, transform)
}

internal class ScanningSequence<T, Acc>
constructor(
        private val sequence: Sequence<T>,
        private val initial: Acc,
        private val transformer: (Acc, T) -> Acc
) : Sequence<Acc> {

    override fun iterator(): Iterator<Acc> = object : Iterator<Acc> {
        var current = initial

        val iterator = sequence.iterator()
        override fun next(): Acc {
            current = transformer(current, iterator.next())
            return current
        }

        override fun hasNext(): Boolean {
            return iterator.hasNext()
        }
    }
}
@Matthieu Esnault Looks like we had similar ideas 😄 I based mine off of `map`'s implementation
👌 1
k
Yeah this would be useful to have in the standard library, I just keep copying over my own ad-hoc implementation as well.
n
Looks like it's coming in 1.4: https://youtrack.jetbrains.com/issue/KT-7657
👍 2