jimn
11/27/2019, 7:55 PMoctylFractal
11/27/2019, 7:56 PMstreetsofboston
11/27/2019, 8:11 PMgroupBy
operator for Rx Java?jimn
11/27/2019, 8:15 PMFlow<T>
into 3 Flow<Flow<T>>
(keeping the outer interface of Flow<*>
as a Unary Functor I would a) wait for Windowed selection, or b) decode and recode toFlow()octylFractal
11/27/2019, 8:31 PMtransform
, which allows essentially arbitrary Flow<T> -> Flow<U> translationsstreetsofboston
11/27/2019, 8:33 PMfun <A, K> Observable<A>.groupBy(keySelector: (A) -> K) : Observable<GroupedObservable<K,A>>
A GroupedObservable
is just like a regular Observable
, but has the extra fun getKey(): K
method.
Usually, right after the groupBy
, you’d call a flatMap and return the emitted (Grouped)Observable.
http://reactivex.io/RxJava/javadoc/rx/Observable.html#groupBy-rx.functions.Func1-rx.functions.Func1-
You’d be able to write something similar using a Flow
and a new GroupedFlow
…. The groupBy
does not (yet) exist for Flow
s.
The groupBy
would not be a terminating operator, since it emits the contents of sub-flows, which terminate only when something starts `collect`ing it.jimn
11/27/2019, 8:44 PMstreetsofboston
11/27/2019, 9:10 PMList
s or `Collection`s instead?jimn
11/27/2019, 9:20 PMList<T>
rows from T
and come back later with a reducerFlow<Flow<Array<T>>>
or something like Flow<Lazy<Array<Array<T>>>>
and preserve the Unary Operator interface (Table2)
in this codebase Table1 is just the alias i use for columns that decode bytebuffers and Table2 is Flows of Table1 operations which should be more composable as Flows N deep if neededstreetsofboston
11/27/2019, 10:49 PMfun <A, K> Flow<A>.groupBy(keySelector: (A) -> K): Flow<Pair<K, Flow<A>>> {
val upstream = this
return flow {
val downstream = this
val groupMap: HashMap<K, SendChannel<A>> = HashMap()
val error = try {
upstream.collect { value: A ->
val key = keySelector(value)
groupMap.getOrPut(key) {
Channel<A>().also { consumer ->
val flow = flow { consumer.consumeEach { emit(it) } }
downstream.emit(key to flow)
}
}.send(value)
}
null
} catch (t: Throwable) {
t
}
groupMap.values.forEach {
it.close(error)
}
}
}
fun test() {
val numFlow = flow {
for (i in 1..10) {
emit(i)
delay(100)
}
}
runBlocking {
numFlow.groupBy { it % 2 == 0 }
.flatMapMerge { (key, groupedFlow) ->
groupedFlow.map { "$key and $it" }
}
.collect {
println(it)
}
}
}
jimn
11/28/2019, 2:15 PM