https://kotlinlang.org logo
#announcements
Title
# announcements
n

Nimelrian

11/25/2019, 10:11 AM
Hey folks, I have a small issue with sequences. I have a CSV file parser which parses rows in batches. For each batch, it updates an input set object which contains information about the batching progress (failed rows, completed rows) and stores the updated input set and the parsed objects in our database. Here's the current code:
Copy code
val inputSet = createInputSet(path, month, year, currentTimestamp)

val parser = BatchingDeclineRateFileParser(batchingConfiguration.batchSize, path)

return generateSequence { parser.parseBatch() }
        .map { batch ->
            val updatedInputSet = updateBatchedInputSet(inputSet, batch)

            transactionManager.runInTransaction {
                rateInputRepository.createAll(batch.batchItems, month, year, updatedInputSet.id)
                inputSetRepository.updateBatchStep(updatedInputSet)
            }
            updatedInputSet
        }
        .find { it.isProcessingDone } ?: throw JobException("Batch did not complete for some reason")
This works in general, but due to using map I only update the original input set each time. Is there a function on
Sequence
which works like e.g. ReactiveX's
scan
where I can fold the incoming values into an accumulator? http://reactivex.io/documentation/operators/scan.html
g

gian

11/25/2019, 10:17 AM
You mentioned
fold
, have you tried using it?
n

Nimelrian

11/25/2019, 10:23 AM
reduce and fold are both terminating operations, which is bad since parseBatch does not terminate by itself
So that'd create an infinite sequence which will never terminate
parseBatch
returns an instance of a sealed class which has the three implementations
Batch
,
Partial
and
Complete
. I first thought about using
takeWhile
to check for
Complete
, but then I'd miss the last batch, since the completing batch wouldn't be passed to fold
m

Matthieu Esnault

11/25/2019, 11:21 AM
So if I understand correctly, you want a custom sequence operator that behaves like this?
Copy code
fun <T, R> Sequence<T>.customScan(initial: R, operation: (acc: R, T) -> R): Sequence<R> {
    return sequence result@ {
        val iterator = iterator()
        if (!iterator.hasNext()) return@result
        var accumulator = initial
        while (iterator.hasNext()) {
            val current = iterator.next()
            accumulator = operation(accumulator, current)
            yield(accumulator)
        }
    }
}
It’s based on the implementations of
Sequence<T>.zipWithNext
and
Sequence<T>.fold
. Note that it’s not tested it, it’s just a quick example. I don’t think you can have the specific behavior you want by combining stdlib operations.
n

Nimelrian

11/25/2019, 11:21 AM
Went ahead and implemented it myself:
Copy code
public fun <T, Acc> Sequence<T>.scan(initial: Acc, transform: (Acc, T) -> Acc): Sequence<Acc> {
    return ScanningSequence(this, initial, transform)
}

internal class ScanningSequence<T, Acc>
constructor(
        private val sequence: Sequence<T>,
        private val initial: Acc,
        private val transformer: (Acc, T) -> Acc
) : Sequence<Acc> {

    override fun iterator(): Iterator<Acc> = object : Iterator<Acc> {
        var current = initial

        val iterator = sequence.iterator()
        override fun next(): Acc {
            current = transformer(current, iterator.next())
            return current
        }

        override fun hasNext(): Boolean {
            return iterator.hasNext()
        }
    }
}
@Matthieu Esnault Looks like we had similar ideas 😄 I based mine off of `map`'s implementation
👌 1
k

karelpeeters

11/25/2019, 11:24 AM
Yeah this would be useful to have in the standard library, I just keep copying over my own ad-hoc implementation as well.
n

Nimelrian

11/25/2019, 11:25 AM
Looks like it's coming in 1.4: https://youtrack.jetbrains.com/issue/KT-7657
👍 2