Why does flow stream emit a serial of items instea...
# coroutines
t
Why does flow stream emit a serial of items instead of only the latest one (like observable in rx) when collect (subscribe), Is there any way to work around this behaviour?
g
Hmmm, I’m super confused, what do you mean? Observable in Rx emits all elements, not only latest one, otherwise it would be useless, it’s main use case of Observable, emit stream of events Or maybe you means something else?
t
I mean I emit [1, 2, 3], first time call collect I get [1, 2, 3], second time call collect expected to get [3] but get [1, 2, 3]
g
Observable works exactly like that
Observable and Flow are cold streams
it means that when you create instance you can consume it only once and only from beginning, it doesn’t keep state or resources between start
you probably talk not about Observable, but about BehaviorSubject
Kotlinx.coroutines analogue of BehaviorSubject is ConflatedBroadcastChannel
t
thank
It blows my mind, thank
misunderstanding about observable for all the time
g
In theory you can of course implement Observable or Flow which behaves like that, but usually better to use hot stream primitives like BehaviorSubject or ConflatedBroadcastChannel
t
Like a BehaviorSubject then convert to observable?
g
BehaviorSubject implements Observable, but you can create a new immutable instance using
.hide()
Same with ConflatedBroadcastChannel you can use
.openSubscription()
to get ReceiveChannel or
asFlow()
to create Flow instance that backed by ConflatedBroadcastChannel
❤️ 2
t
@gildor
Copy code
@FlowPreview
public fun <T> flowCONFLATEDViaChannel(
    block: CoroutineScope.(channel: SendChannel<T>) -> Unit
): Flow<T> {
    return flow {
        coroutineScope {
            val channel = ConflatedBroadcastChannel<T>()
            launch {
                block(channel)
            }

            channel.consumeEach { value ->
                emit(value)
            }
        }
    }
}

fun main() {
    runBlocking {

        val flow = flowCONFLATEDViaChannel<String> { channel ->
            launch {
                repeat(3) {
                    delay(250)
                    val aaa = UUID.randomUUID().toString()
                    println("emit $aaa")
                    channel.offer(aaa)
                }
            }
        }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)

        delay(1000)

        launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            flow.collect {
                println("collect 1: $it")
            }
        }

        launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            delay(1000)
            flow.collect {
                println("collect 2: $it")
            }
        }

    }
}
Copy code
emit 20156fc9-d86f-4e73-a866-acf142866a5a
collect 1: 20156fc9-d86f-4e73-a866-acf142866a5a
emit ff6d37d3-787a-4a0a-8389-393df111b2ef
collect 1: ff6d37d3-787a-4a0a-8389-393df111b2ef
emit a70090cc-4dc4-405f-986e-7b205a1b537a
collect 1: a70090cc-4dc4-405f-986e-7b205a1b537a
emit b14678dc-75c3-43c3-9bb9-155137fbeee0
collect 2: b14678dc-75c3-43c3-9bb9-155137fbeee0
emit 1de80811-d027-4659-9560-1949a5e781ef
collect 2: 1de80811-d027-4659-9560-1949a5e781ef
emit c329e6cd-8867-4209-affe-87f08c72f012
collect 2: c329e6cd-8867-4209-affe-87f08c72f012
g
This code looks really wrong for me
not really sure what you want to achive
t
not sure why the second colector still receive 3 item, I’m expecting one
g
I see
but new flow starts again
conflated channel will be created again
flow can be used only once, you cannot share state between invocations
also, starting launch in flow is bad idea
t
so how do we achieve BehaviorSubject behaviour with flow
g
How would you achive BehaviorSubject with Flowable/Observable?
t
something that when we collect, we only get the latest item
g
BehaviorSubject analogue in kotlinx.coroutines is ConflatedBroadcastChannel
t
but that is a hot stream right?
g
Of course
same way as BehaviorSubject
My point that cold channels cannot behave as BehaviorSubject, if it behaves as BehaviorSubject it’s hot by definition
t
let me do some research
brb
g
Research? %)
but you still can collect values from Channel
t
investigate stuff about RxJava
g
Copy code
val channel = ConflatedBroadcastChannel<T>()
channel.asFlow().collect {

}
t
does that become a cold stream
g
No
t
or still hot?
g
of course backing channel is still hot
but it can be consumed using cold abstraction
exactly the same as in RxJava
you create Subject, but subscribe on Observable
t
what does that mean
consumed using cold abstraction
you mean this?
Copy code
val obs = BehaviorSubject.create<String>()
        .observeOn(<http://Schedulers.io|Schedulers.io>())
is this Observable cold or hot?
g
It’s hot, but it’s not exposed to subscriber
t
sr, a misconception about hot and cold, I though hot is thread blocking until the stream is disposed. turn out we just care about does the stream already have value or not is that correct?
g
hot means that stream already started and running, it may be some resource (like socket) or just some cached value (like BehaviorSubject), cold stream is lazy and started only when someone subscribe on it and finish when unsubscribe (and because of this can be used only once)
t
ironically I’m already read that, feel bad man
g
what does that mean
consumed using cold abstraction
It means that underlying stream of events is hot (which is represented by Channel in coroutines and by Subject in rxjava) but you subscribe on those event using Cold API (Flow in coroutines, Observable/Completable/Maybe/Flowable in RxJava)
t
so… my statement is correct?
about hot and cold
g
turn out we just care about does the stream already have value or not is that correct
This one? It’s correct, but not necessary to have (cache) value to be hot, it may be some hot resource (like InputStream, socket, user interface etc)
t
for example, there is a stream, if no one subscribes to it, if it already has value -> hot, if not -> cold
g
Cold cannot exist without subscribes, it will be started only when someone is subscribed
t
ok, are you an android dev? if yes do you use Room?
g
yes, no but plan
t
I use Room with RxJava, not sure how the dao method which return Observable<List<Foo>> is implemented
this should be a cold stream
g
Yes, probably
but not necessary
t
from what I see, every time I subscribe to this observable, I only get the latest value
g
But why do you worry about it
?
from what I see, every time I subscribe to this observable, I only get the latest value
make sense
t
so if there are 2 subscribers to this observable, when the underlining data change, it will trigger 2 query
2 re-query
g
yes
I only get the latest value
I believe you do not get latest value, this Flowable will requiest current value from database
t
that exactly what I’m worried about
g
so yeah 2 cold observvables, each with own instance
In RxJava you can easily share it with
Flowable.share()
, and subscribe on this Flowable, same with Flow, just use
Flow.broadcastIn
so underlying implementation is cold, but you convvert it to hot that supports multiple subscribers
t
ok, didn’t know that method exists
I realy confuse with jake implementation on flow extension for SqlDelight
g
Why?
t
he using flowViaChannel which is hot
g
Because this is an adapter for existing listener
you cannot call
emit
from non-suspend function
t
so even if no one is subscribed to the flow, we still need to trigger a query
?
g
and you also need some background coroutine to keep reference and handle queryResultsChanged callbacks
no
in this case flow will be started when someone subscribed on it
t
why? channel is all hot right?
g
yes, but it will be created in this case only when flow is subscvirbed, this is just how flowViaChannel implemented
so it’s lazy, but when subscribed hot channel will be created under the hood
t
oh, I see, it is a flow wrap around a channel, not a channel converted to flow
g
It’s also more safe, there is no way to forget close this channel, because channel will be closed if consuming coroutine will be cancelled
t
good job Andrey, you blow my mind again
thank
g
You are welcome!
Want to add that Channels are still very important primitive to implement many use cases and even Flow operators See this article (especially “Declarative programming” part) https://medium.com/@elizarov/kotlin-flows-and-coroutines-256260fb3bdb