chiroptical
11/25/2022, 3:25 PMCoroutineScope.windows
and dayOne
(there are edge cases in strictlyIncreasing
that aren't important for this conversation). Second, is it possible to fold in parallel as well? I tried creating two actors, passing inputs to them, and then combining them at the end but it just didn't do anything. I am incredibly open to suggestions here🧵chiroptical
11/25/2022, 3:27 PMcoroutineScope
on CoroutineScope.windows
that I added because IntelliJ suggested it while I was re-writing everything.chiroptical
11/25/2022, 3:28 PMchiroptical
11/25/2022, 3:29 PMchiroptical
11/25/2022, 3:31 PMchiroptical
11/25/2022, 3:35 PMparMap
for parMapUnordered
and get a boost.chiroptical
11/25/2022, 3:39 PMchiroptical
11/25/2022, 4:24 PMsimon.vergauwen
11/26/2022, 3:45 PMFlow
relying on the automatic back-pressure from Flow
. I talked about it this week on KotlinDevDay, will publish and share slides here in a sec 😉
Some improvements, you can safely rely on this instead of CoroutineScope
+ produce
. parMap
+ fold
is probably best solution here.
fun windows(numElements: Int = 2): Flow<List<String>> = flow {
Path("inputFiles/dayOne.test.txt")
.useLines { lines ->
lines.chunked(numElements).forEach { emit(it) }
}
}
chiroptical
11/26/2022, 3:51 PMchiroptical
11/26/2022, 3:55 PMflowOn(<http://Dispatchers.IO|Dispatchers.IO>)
can unblock the main thread. So, if I was to have a windows
and lines
Flow<T>
, would you say these are more efficient?
fun lines(path: Path) =
path.useLines { lines -> lines.asFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>) }
fun windows(numElements: Int = 2, path: Path) = flow {
path.useLines { lines ->
lines.chunked(numElements).forEach { emit(it) }
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
chiroptical
11/26/2022, 3:58 PMchunked
doesn't quite work for the rolling windows but I imagine they have a function for that.chiroptical
11/26/2022, 3:59 PMwindowed
but it was easy to find now that I have seen chunked
!simon.vergauwen
11/26/2022, 3:59 PM<http://Dispatchers.IO|Dispatchers.IO>
, everything "above" the call to flowOn
will run on IO
.simon.vergauwen
11/26/2022, 4:00 PMIO
. It will unblock the main
thread yes.simon.vergauwen
11/26/2022, 4:00 PMchiroptical
11/26/2022, 4:01 PMsimon.vergauwen
11/26/2022, 4:01 PMchiroptical
11/26/2022, 4:10 PMMutableStateFlow
?simon.vergauwen
11/26/2022, 4:19 PMparFold
function? This is inspired from the original impl, but written from Slack so potentially incorrect. https://github.com/Kotlin/kotlinx.coroutines/blob/6dfabf763fe9fc91fbb73eb0f2d5b488[…]43f1/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
To be honest I'd still prefer parMap
+ fold
. This breaks back-pressure on collect
public suspend inline fun <T, R> Flow<T>.parFoldMap(
ctx: CoroutineContext = EmptyCoroutineContext,
initial: R,
crossinline operation: suspend (value: T) -> R
combine: suspend (R, R) -> R
): R {
val accumulator: Atomic<R> = Atomic(initial)
coroutineScope {
collect { value ->
launch {
val res = operation(value)
accumulator.update { acc -> acc + res }
}
}
}
return accumulator.get()
}
simon.vergauwen
11/26/2022, 4:19 PMUnordered
chiroptical
11/26/2022, 4:22 PMparMap
+ fold
solution. It’s pretty readable too