Manu Eder
03/04/2021, 4:50 PM// f1 is a flow of somewhat random numbers, no deeper meaning
val f1 : Flow<Int> = flowOf(1, 1, 2, 3, 5, 8, 3, 1, 4, 5, 9, 4, 3, 7, 0, 7, 7, 4, 1, 5)
/*
Let's imagine that we are told f1 consists of "packets", which start with a length field followed by as many numbers
as indicated in the length field. We want to transform f1 into a flow where each element is the sum of the numbers
in the "payload" of one packet.
*/
runBlocking {
println("package sums:")
f1.transformImperatively<Int, Int> {
while (true) {
if (hasNext()) {
emit(sum(next()))
} else {
break
}
}
}.collect(::println)
}
code for sum:
suspend fun (FlowIterator<Int>).sum( count : Int ) : Int {
var s : Int = 0
for (i in 1..count) {
if( hasNext() ) {
s += next()
} else {
break
}
}
return s
}
It's not finished, more a proof-of-concept.
(In particular, I haven't had time yet to try and understand flow cancellation, or the context preservation properties expected of flows, so don't expect it to behave correctly in that regard yet. Though I would very much expect it to be possible to make it behave correctly in that sense as well.)
I would be happy about any kind pointers to discussions about similar things, reasons why something like this isn't or shouldn't be in kotlinx.coroutines, reasons why it might break down later on, etc.ephemient
03/04/2021, 5:17 PMval flow: Flow<T> = TODO()
val channel = flow.produceIn(this@coroutineScope)
val iterator = channel.iterator()
iterator.hasNext()
iterator.next()
Manu Eder
03/04/2021, 5:21 PMephemient
03/04/2021, 5:24 PMflow { }
Manu Eder
03/04/2021, 5:25 PMephemient
03/04/2021, 5:28 PMflow {
val channel = inputFlow.produceIn(this@coroutineScope)
val iterator = channel.iterator()
while (iterator.hasNext()) {
emit(transform(iterator.next()))
}
}
Manu Eder
03/04/2021, 5:30 PMephemient
03/04/2021, 5:30 PMManu Eder
03/04/2021, 5:31 PMephemient
03/04/2021, 5:32 PMManu Eder
03/04/2021, 5:35 PMephemient
03/04/2021, 10:22 PM.buffer(0)
, it would be a rendezvous channel which makes every single send/receive blocking, waiting for the receiver/sender. but yes, it would be still be "hot" and the sender run until the next suspension point where most (but not all) flows are "cold" and pull-basedManu Eder
03/05/2021, 1:36 PMproduce { theFlow.collect { send(it) } }
, like you wrote).
But having read up on Channels and Flows and the reasoning behind Flows, I feel like this is the "wrong" implementation, and I suspect that's the reason why this function and the "imperative flow transformation functions" (or whatever you want to call them), that can be built from it are not in kotlinx.coroutines.
The distinction between channelBasedTransformImperatively
and what I feel is the correctTransformImperatively
is exactly that
theFlow.channelBasedTransformImperatively { ... } == theFlow.buffer(0).correctTransformImperatively { ... }
(if you use capacity 0 for the channel in the implementation of channelBasedTransformImperatively
, .buffer(N)
otherwise, I guess).
Note that in general theFlow.buffer(0) != theFlow
.
If I find the time I might submit a bug report and see what the Kotlin devs say.
There are a few bug reports on the github repo requesting specific flow combinators, which I suspect people would find easy to write themselves, if they had "permission" to write them imperatively, i.e. if there was a sanctioned, advertised transformImperatively
that people could use.
Thanks in any case for taking the time to go back and forth here.
(Oh, and just to make sure we are not talking past each other: the thing that makes this interesting is that the transform function would not have to work element-wise but would instead get access to the iterator.
The implementation you worte earlier would be just inputFlow.buffer(0).map(transform)
.
I assumed you knew what I meant.
The actual implementation would be:
suspend fun <T,R> Flow<T>.channelBasedTransformImperatively( transform : FlowContextAndIterator<T,R> ) {
val inputFlow = this
flow {
val flowContext = this
val channel = inputFlow.produceIn(this@coroutineScope)
val iterator = channel.iterator()
FlowContextAndIterator(flowContext, iterator).transform()
}
}
(Where FlowContextAndIterator
implements both FlowContext<T> and Iterator<R> by delegation to the passed objects, so gives acces both to next() and to emit().))araqnid
03/05/2021, 6:42 PMManu Eder
03/05/2021, 7:53 PMIn the early versions of the library, we had only channels and we tried to implement various transformations of asynchronous sequences as functions that take one channel as an argument and return another channel as a result. It means that, for example, aoperator would run in its own coroutine.filter
The performance of such an operator was far from great, especially compared to just writing anThe implementation of transformImperatively using channels suffers from this same problem. The slightly-more-complex-to-write variant in the playground does not need any synchronization primitives, because instatement. In a hindsight, it is not surprising, because a channel is a synchronization primitive. Any channel, even an implementation that is optimized for a single producer and a single consumer, must support concurrent communicating coroutines and a data transfer between them needs synchronization, which is expensive in modern multicore systems. When you start building your application architecture on top of the asynchronous data streams, the need to have transformations naturally appears, and the channel costs start to accrue.if
flow ( producer ).transformImperatively( transformer ).collect( ... )
the producer and the transformer will never be running at the same time.lines
function in the playground linked above, which turns a Flow<Char> into a Flow<Flow<Char>>, where the inner flows are the individual lines of the text in the original flow.
If you think about it, unless you want to buffer full arbitrary-length lines, then when collecting the Flow<Flow<Char>> there is the constraint that you have to collect all the inner flows immediately, otherwise this can't work.
The lines function checks for this and throws if you violate that constraint. (There's nothing else it can do really.)
So if you write someFlow.lines().collect()
for example, that will throw.
someFlow.lines().collect { println(it.count()) }
is fine.
Now the point: someFlow.lines().buffer(0).whatever().collect { ... }
will also throw, no matter what whatever()
does.
So with the channel-based implementation of transformImperatively, which is equivalent to .buffer(0).transformImperatively { ... }
, you could not use channelBasedTransformImperatively
after lines
at all.Flow<T>
to Flow<Flow<T>>
to exhibit this kind of behaviour, that it must be forbidden to call buffer after the grouping operator and that you have to collect the inner flows. These kind of grouping operators become easy to write with transformImperatively
. It would be a pity if you couldn't use transformImperatively
after them any more.