how do i partition a sequential flow into predeter...
# coroutines
j
how do i partition a sequential flow into predetermined buckets of flows without triggering the collection?
s
Do you mean something similar to the
groupBy
operator for Rx Java?
j
so to reduce 4
Flow<T>
into 3
Flow<Flow<T>>
(keeping the outer interface of
Flow<*>
as a Unary Functor I would a) wait for Windowed selection, or b) decode and recode toFlow()
not having had a career detour in rx, i can't say for sure, but this is exactly implementing a groupby against undecoded bytes on disk as flows
i have a fixed width memory mapped file for which each record is a flow that decodes the columns sent in, and i create an index by decoding the flows once as needed for key fields. the index is likely to fit in system RAM but memoizing the entirity of the decoded data is less likely to fit in system RAM
i want the group-by behavior to stack so im interested in keeping the interface to Flow<Array<*>> for any number of group-by operations compounded
o
if you need to do something different enough, I would just declare an extension function on Flow that uses
transform
, which allows essentially arbitrary Flow<T> -> Flow<U> translations
s
In Rx, groupBy is defined as follows (more or less):
fun <A, K> Observable<A>.groupBy(keySelector: (A) -> K) : Observable<GroupedObservable<K,A>>
A
GroupedObservable
is just like a regular
Observable
, but has the extra
fun getKey(): K
method. Usually, right after the
groupBy
, you’d call a flatMap and return the emitted (Grouped)Observable. http://reactivex.io/RxJava/javadoc/rx/Observable.html#groupBy-rx.functions.Func1-rx.functions.Func1- You’d be able to write something similar using a
Flow
and a new
GroupedFlow
…. The
groupBy
does not (yet) exist for
Flow
s. The
groupBy
would not be a terminating operator, since it emits the contents of sub-flows, which terminate only when something starts `collect`ing it.
👍 1
j
i appear to be writing groupBy terminal operator and looking for the handle interface to manipulate flows prior to emitting the results
to be composable I am doing this as unary functors. Table in Table out. i have previosuly had no problem grouping an index once, but since mapping bytes is a one-time operation this was not composable
i dont see information how to trap a Flow before collection. i don't have any examples of transform to know if that's what im looking for.
it looks like for 4 source `Flow`<T>`` and 3 target groups I need to call flow.filter 3 times
s
Just to understand your use-case well, how would you write your partitioning if it were not `Flow`s, but plain
List
s or `Collection`s instead?
j
@streetsofboston this is the current learning excercise as of this moment. https://github.com/jnorthrup/columnar/blob/c1c07606c6f593ca3abbcfd629705cc9571cbb6d/src/test/java/com/fnreport/mapper/ColumnarTest.kt#L125 the answer to your quesstion actually exists in this source tree a couple weeks back, where i got as far as pivot->groupby using non-flow.
so basically with materialized fields its just a map of a map, to scan for clusters ina key and then rewrite the clusters as
List<T>
rows from
T
and come back later with a reducer
the problem with the "plain" list or collections is that we're not parrallel
so 2.5 million rows slogs along and eats ram with the added dimensions
it appears i cannot isolate the key generation from the flow grouping without read-amplification. the key generation does a full table scan and builds a map of clusters from decoded keys. each cluster also potentially costs a full tablescan following the key generation. I may be able to stuff the
Flow<Flow<Array<T>>>
or something like
Flow<Lazy<Array<Array<T>>>>
and preserve the Unary Operator interface (Table2) in this codebase Table1 is just the alias i use for columns that decode bytebuffers and Table2 is Flows of Table1 operations which should be more composable as Flows N deep if needed
s
Not sure if this would work for your use-case, but this kinda works as a ‘groupBy’ implementation:
Copy code
fun <A, K> Flow<A>.groupBy(keySelector: (A) -> K): Flow<Pair<K, Flow<A>>> {
    val upstream = this

    return flow {
        val downstream = this

        val groupMap: HashMap<K, SendChannel<A>> = HashMap()

        val error = try {
            upstream.collect { value: A ->
                val key = keySelector(value)

                groupMap.getOrPut(key) {
                    Channel<A>().also { consumer ->
                        val flow = flow { consumer.consumeEach { emit(it) } }
                        downstream.emit(key to flow)
                    }
                }.send(value)
            }

            null
        } catch (t: Throwable) {
            t
        }

        groupMap.values.forEach {
            it.close(error)
        }
    }
}

fun test() {
    val numFlow = flow {
        for (i in 1..10) {
            emit(i)
            delay(100)
        }
    }

    runBlocking {
        numFlow.groupBy { it % 2 == 0 }
            .flatMapMerge { (key, groupedFlow) ->
                groupedFlow.map { "$key and $it" }
            }
            .collect {
                println(it)
            }
    }
}
j
i think it's plausible that with flows i can re-parameterize the code to be lazy, sequence, or direct without much of a gap, but if i start wiring together channels the code begins to rapidly drop in usefulness outside of a narrow utility. i am already in a narrow band of application by choosing memory mapped fixed-width files upstream of the moving parts.
this is basically sqlite-liter