Hi all, looking for some direction for my use case...
# coroutines
s
Hi all, looking for some direction for my use case. I have (I believe) a “hot” stream of data that I am consuming from Kafka. The framework provides me a
Flux
as input:
Copy code
fun consume(flux: Flux<Message<*>>) = TODO()
I would like to filter the messages and internally “publish” it to any relevant consumers. Is the proper pattern here to share something like a
BroadcastChannel(Channel.CONFLATED)
? It seems that
SharedFlow
might be an option when it releases as well? I believe I would normally use something like Project Reactor’s
EmitterProcessor
but I was wondering if I could do it with something more “native” to Kotlin. Also what would be the pattern for consuming the channel? Something like:
Copy code
class OtherConsumer(val channel: BroadcastChannel<Message<*>>) : CoroutineScope by CoroutineScope(Dispatchers.Default) {
  init {
    launch {
      channel.asFlow().onEach { doSomethingAsync() }.collect()
    }
  }
}