chiroptical
11/25/2022, 3:25 PMCoroutineScope.windows and dayOne (there are edge cases in strictlyIncreasing that aren't important for this conversation). Second, is it possible to fold in parallel as well? I tried creating two actors, passing inputs to them, and then combining them at the end but it just didn't do anything. I am incredibly open to suggestions here🧵chiroptical
11/25/2022, 3:27 PMcoroutineScope on CoroutineScope.windows that I added because IntelliJ suggested it while I was re-writing everything.chiroptical
11/25/2022, 3:28 PMchiroptical
11/25/2022, 3:29 PMchiroptical
11/25/2022, 3:31 PMchiroptical
11/25/2022, 3:35 PMparMap for parMapUnordered and get a boost.chiroptical
11/25/2022, 3:39 PMchiroptical
11/25/2022, 4:24 PMsimon.vergauwen
11/26/2022, 3:45 PMFlow relying on the automatic back-pressure from Flow. I talked about it this week on KotlinDevDay, will publish and share slides here in a sec 😉
Some improvements, you can safely rely on this instead of CoroutineScope + produce. parMap + fold is probably best solution here.
fun windows(numElements: Int = 2): Flow<List<String>> = flow {
Path("inputFiles/dayOne.test.txt")
.useLines { lines ->
lines.chunked(numElements).forEach { emit(it) }
}
}chiroptical
11/26/2022, 3:51 PMchiroptical
11/26/2022, 3:55 PMflowOn(<http://Dispatchers.IO|Dispatchers.IO>) can unblock the main thread. So, if I was to have a windows and lines Flow<T>, would you say these are more efficient?
fun lines(path: Path) =
path.useLines { lines -> lines.asFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>) }
fun windows(numElements: Int = 2, path: Path) = flow {
path.useLines { lines ->
lines.chunked(numElements).forEach { emit(it) }
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)chiroptical
11/26/2022, 3:58 PMchunked doesn't quite work for the rolling windows but I imagine they have a function for that.chiroptical
11/26/2022, 3:59 PMwindowed but it was easy to find now that I have seen chunked!simon.vergauwen
11/26/2022, 3:59 PM<http://Dispatchers.IO|Dispatchers.IO>, everything "above" the call to flowOn will run on IO.simon.vergauwen
11/26/2022, 4:00 PMIO. It will unblock the main thread yes.simon.vergauwen
11/26/2022, 4:00 PMchiroptical
11/26/2022, 4:01 PMsimon.vergauwen
11/26/2022, 4:01 PMchiroptical
11/26/2022, 4:10 PMMutableStateFlow?simon.vergauwen
11/26/2022, 4:19 PMparFold function? This is inspired from the original impl, but written from Slack so potentially incorrect. https://github.com/Kotlin/kotlinx.coroutines/blob/6dfabf763fe9fc91fbb73eb0f2d5b488[…]43f1/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
To be honest I'd still prefer parMap + fold. This breaks back-pressure on collect
public suspend inline fun <T, R> Flow<T>.parFoldMap(
ctx: CoroutineContext = EmptyCoroutineContext,
initial: R,
crossinline operation: suspend (value: T) -> R
combine: suspend (R, R) -> R
): R {
val accumulator: Atomic<R> = Atomic(initial)
coroutineScope {
collect { value ->
launch {
val res = operation(value)
accumulator.update { acc -> acc + res }
}
}
}
return accumulator.get()
}simon.vergauwen
11/26/2022, 4:19 PMUnorderedchiroptical
11/26/2022, 4:22 PMparMap + fold solution. It’s pretty readable too