Nimelrian
11/25/2019, 10:11 AMval inputSet = createInputSet(path, month, year, currentTimestamp)
val parser = BatchingDeclineRateFileParser(batchingConfiguration.batchSize, path)
return generateSequence { parser.parseBatch() }
.map { batch ->
val updatedInputSet = updateBatchedInputSet(inputSet, batch)
transactionManager.runInTransaction {
rateInputRepository.createAll(batch.batchItems, month, year, updatedInputSet.id)
inputSetRepository.updateBatchStep(updatedInputSet)
}
updatedInputSet
}
.find { it.isProcessingDone } ?: throw JobException("Batch did not complete for some reason")
This works in general, but due to using map I only update the original input set each time.
Is there a function on Sequence
which works like e.g. ReactiveX's scan
where I can fold the incoming values into an accumulator? http://reactivex.io/documentation/operators/scan.htmlgian
11/25/2019, 10:17 AMfold
, have you tried using it?Matthieu Esnault
11/25/2019, 10:18 AMreduce
or fold
?
https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/reduce.html
https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/fold.htmlNimelrian
11/25/2019, 10:23 AMNimelrian
11/25/2019, 10:24 AMNimelrian
11/25/2019, 10:39 AMparseBatch
returns an instance of a sealed class which has the three implementations Batch
, Partial
and Complete
. I first thought about using takeWhile
to check for Complete
, but then I'd miss the last batch, since the completing batch wouldn't be passed to foldMatthieu Esnault
11/25/2019, 11:21 AMfun <T, R> Sequence<T>.customScan(initial: R, operation: (acc: R, T) -> R): Sequence<R> {
return sequence result@ {
val iterator = iterator()
if (!iterator.hasNext()) return@result
var accumulator = initial
while (iterator.hasNext()) {
val current = iterator.next()
accumulator = operation(accumulator, current)
yield(accumulator)
}
}
}
It’s based on the implementations of Sequence<T>.zipWithNext
and Sequence<T>.fold
. Note that it’s not tested it, it’s just a quick example.
I don’t think you can have the specific behavior you want by combining stdlib operations.Nimelrian
11/25/2019, 11:21 AMpublic fun <T, Acc> Sequence<T>.scan(initial: Acc, transform: (Acc, T) -> Acc): Sequence<Acc> {
return ScanningSequence(this, initial, transform)
}
internal class ScanningSequence<T, Acc>
constructor(
private val sequence: Sequence<T>,
private val initial: Acc,
private val transformer: (Acc, T) -> Acc
) : Sequence<Acc> {
override fun iterator(): Iterator<Acc> = object : Iterator<Acc> {
var current = initial
val iterator = sequence.iterator()
override fun next(): Acc {
current = transformer(current, iterator.next())
return current
}
override fun hasNext(): Boolean {
return iterator.hasNext()
}
}
}
Nimelrian
11/25/2019, 11:23 AMkarelpeeters
11/25/2019, 11:24 AMNimelrian
11/25/2019, 11:25 AM