I have a event bus type of stream/Flow, where Even...
# coroutines
u
I have a event bus type of stream/Flow, where Events have id. I need to replay last item of a chosen id to subscriber on subscription. If I only use replay(1) and filter { it == id } at subscriber, then only totally last item is replayed, so susbscribers who filter a different id, wont get their respective replays. I need a replay per id. (Alternative is to emit a map of all events and filter that down, but I hope this can be done more cleanly) Thanks
s
You’d probably need a reactive
groupBy
operator that creates Flows per
id
and each group then would replay(1) their stream of data. A
groupBy
for a Flow does not (yet) exist.
u
Would you mind giving an example in rjxava? GroupBy always confused me, I need to collect it right? Doesnt that imply completion?
s
Copy code
items
            .groupBy { it.id }
            .flatMap { group ->
                val id = group.key!!
                .... create and return a new stream based on 'id'
            }
^^^ This is just an idea ^^^ not sure if it will work. 🙂
u
Yes thats exactly what confused me 😄 I'd expect groupBy to emit a map<KEY, VALUE>
where it seems you get a for loop on all entries in the map
s
But maybe you can code it out in your own extension function on a Flow.
u
wouldnt this be cleaner?
Copy code
val relay = BehaviourRelay.create(emptyMap())

fun foo() {
   relay.accept(relay.lastValue[someId] = newEvent(someId))
}


...

relay.map { optionalOf(it[someId]) }.filter { it.isNotEmpty }
s
This seems to work. No need for synchronization of the map, since the coroutines guarantee a sequential order of execution.
Copy code
suspend fun <T, K> Flow<T>.groupBy(
    getKey: (T) -> K
): (K) -> Flow<T> {
    val latestValuesForGroups = mutableMapOf<K, T>()

    return { groupKey: K ->
        flow {
            latestValuesForGroups[groupKey]?.let { emit(it) }

            this@groupBy.collect {
                val key = getKey(it)
                if (key == groupKey) {
                    latestValuesForGroups[key] = it
                    emit(it)
                }
            }
        }
    }
}

fun main() {
    var items = listOf(
        1 to "one",
        2 to "two",
        1 to "ten",
        1 to "eleven",
        3 to "thirtyfour",
        4 to "four",
        2 to "twenty",
        3 to "three",
        2 to "twohundred",
        1 to "thousand",
        4 to "forty",
        5 to "five",
        2 to "twenty"
    )

    //Just an example source-flow
    val sourceFlow = flow {
        for (item in items) {
            emit(item)
        }
    }

    runBlocking {
        // Group by the 'first' of each Pair.
        val groupedFlows = sourceFlow.groupBy(Pair<*,*>::first)

        // Get a grouped-flow where 'first == 1'
        val flowsOfOne1 = groupedFlows(1)

        flowsOfOne1.collect {
            println(it)
        }
        println("-----------")

        // Get another grouped-flow where 'first == 1'
        val flowsOfOne2 = groupedFlows(1)

        flowsOfOne2.collect {
            println(it)
        }
        println("-----------")

        // Get a grouped-flow where 'first == 2'
        val flowsOfTwo = groupedFlows(2)

        flowsOfTwo.collect {
            println(it)
        }
        println("-----------")
    }
}
u
doesnt the map need to be immutable?
s
No, the latest value needs to be put into it given the group's key.