ursus
06/28/2019, 7:05 PMstreetsofboston
06/28/2019, 7:09 PMgroupBy
operator that creates Flows per id
and each group then would replay(1) their stream of data. A groupBy
for a Flow does not (yet) exist.ursus
06/28/2019, 7:18 PMstreetsofboston
06/28/2019, 7:22 PMitems
.groupBy { it.id }
.flatMap { group ->
val id = group.key!!
.... create and return a new stream based on 'id'
}
streetsofboston
06/28/2019, 7:26 PMursus
06/28/2019, 7:27 PMursus
06/28/2019, 7:27 PMstreetsofboston
06/28/2019, 7:28 PMursus
06/28/2019, 7:32 PMval relay = BehaviourRelay.create(emptyMap())
fun foo() {
relay.accept(relay.lastValue[someId] = newEvent(someId))
}
...
relay.map { optionalOf(it[someId]) }.filter { it.isNotEmpty }
streetsofboston
06/28/2019, 8:08 PMsuspend fun <T, K> Flow<T>.groupBy(
getKey: (T) -> K
): (K) -> Flow<T> {
val latestValuesForGroups = mutableMapOf<K, T>()
return { groupKey: K ->
flow {
latestValuesForGroups[groupKey]?.let { emit(it) }
this@groupBy.collect {
val key = getKey(it)
if (key == groupKey) {
latestValuesForGroups[key] = it
emit(it)
}
}
}
}
}
fun main() {
var items = listOf(
1 to "one",
2 to "two",
1 to "ten",
1 to "eleven",
3 to "thirtyfour",
4 to "four",
2 to "twenty",
3 to "three",
2 to "twohundred",
1 to "thousand",
4 to "forty",
5 to "five",
2 to "twenty"
)
//Just an example source-flow
val sourceFlow = flow {
for (item in items) {
emit(item)
}
}
runBlocking {
// Group by the 'first' of each Pair.
val groupedFlows = sourceFlow.groupBy(Pair<*,*>::first)
// Get a grouped-flow where 'first == 1'
val flowsOfOne1 = groupedFlows(1)
flowsOfOne1.collect {
println(it)
}
println("-----------")
// Get another grouped-flow where 'first == 1'
val flowsOfOne2 = groupedFlows(1)
flowsOfOne2.collect {
println(it)
}
println("-----------")
// Get a grouped-flow where 'first == 2'
val flowsOfTwo = groupedFlows(2)
flowsOfTwo.collect {
println(it)
}
println("-----------")
}
}
ursus
06/28/2019, 8:18 PMstreetsofboston
06/28/2019, 8:23 PM