https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tuan Kiet

05/28/2019, 2:08 AM
Why does flow stream emit a serial of items instead of only the latest one (like observable in rx) when collect (subscribe), Is there any way to work around this behaviour?
g

gildor

05/28/2019, 2:12 AM
Hmmm, I’m super confused, what do you mean? Observable in Rx emits all elements, not only latest one, otherwise it would be useless, it’s main use case of Observable, emit stream of events Or maybe you means something else?
t

Tuan Kiet

05/28/2019, 2:15 AM
I mean I emit [1, 2, 3], first time call collect I get [1, 2, 3], second time call collect expected to get [3] but get [1, 2, 3]
g

gildor

05/28/2019, 2:16 AM
Observable works exactly like that
Observable and Flow are cold streams
it means that when you create instance you can consume it only once and only from beginning, it doesn’t keep state or resources between start
you probably talk not about Observable, but about BehaviorSubject
Kotlinx.coroutines analogue of BehaviorSubject is ConflatedBroadcastChannel
t

Tuan Kiet

05/28/2019, 2:22 AM
thank
It blows my mind, thank
misunderstanding about observable for all the time
g

gildor

05/28/2019, 2:43 AM
In theory you can of course implement Observable or Flow which behaves like that, but usually better to use hot stream primitives like BehaviorSubject or ConflatedBroadcastChannel
t

Tuan Kiet

05/28/2019, 3:25 AM
Like a BehaviorSubject then convert to observable?
g

gildor

05/28/2019, 3:39 AM
BehaviorSubject implements Observable, but you can create a new immutable instance using
.hide()
Same with ConflatedBroadcastChannel you can use
.openSubscription()
to get ReceiveChannel or
asFlow()
to create Flow instance that backed by ConflatedBroadcastChannel
❤️ 2
t

Tuan Kiet

06/03/2019, 8:09 AM
@gildor
Copy code
@FlowPreview
public fun <T> flowCONFLATEDViaChannel(
    block: CoroutineScope.(channel: SendChannel<T>) -> Unit
): Flow<T> {
    return flow {
        coroutineScope {
            val channel = ConflatedBroadcastChannel<T>()
            launch {
                block(channel)
            }

            channel.consumeEach { value ->
                emit(value)
            }
        }
    }
}

fun main() {
    runBlocking {

        val flow = flowCONFLATEDViaChannel<String> { channel ->
            launch {
                repeat(3) {
                    delay(250)
                    val aaa = UUID.randomUUID().toString()
                    println("emit $aaa")
                    channel.offer(aaa)
                }
            }
        }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)

        delay(1000)

        launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            flow.collect {
                println("collect 1: $it")
            }
        }

        launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            delay(1000)
            flow.collect {
                println("collect 2: $it")
            }
        }

    }
}
Copy code
emit 20156fc9-d86f-4e73-a866-acf142866a5a
collect 1: 20156fc9-d86f-4e73-a866-acf142866a5a
emit ff6d37d3-787a-4a0a-8389-393df111b2ef
collect 1: ff6d37d3-787a-4a0a-8389-393df111b2ef
emit a70090cc-4dc4-405f-986e-7b205a1b537a
collect 1: a70090cc-4dc4-405f-986e-7b205a1b537a
emit b14678dc-75c3-43c3-9bb9-155137fbeee0
collect 2: b14678dc-75c3-43c3-9bb9-155137fbeee0
emit 1de80811-d027-4659-9560-1949a5e781ef
collect 2: 1de80811-d027-4659-9560-1949a5e781ef
emit c329e6cd-8867-4209-affe-87f08c72f012
collect 2: c329e6cd-8867-4209-affe-87f08c72f012
g

gildor

06/03/2019, 8:10 AM
This code looks really wrong for me
not really sure what you want to achive
t

Tuan Kiet

06/03/2019, 8:10 AM
not sure why the second colector still receive 3 item, I’m expecting one
g

gildor

06/03/2019, 8:11 AM
I see
but new flow starts again
conflated channel will be created again
flow can be used only once, you cannot share state between invocations
also, starting launch in flow is bad idea
t

Tuan Kiet

06/03/2019, 8:12 AM
so how do we achieve BehaviorSubject behaviour with flow
g

gildor

06/03/2019, 8:13 AM
How would you achive BehaviorSubject with Flowable/Observable?
t

Tuan Kiet

06/03/2019, 8:13 AM
something that when we collect, we only get the latest item
g

gildor

06/03/2019, 8:13 AM
BehaviorSubject analogue in kotlinx.coroutines is ConflatedBroadcastChannel
t

Tuan Kiet

06/03/2019, 8:14 AM
but that is a hot stream right?
g

gildor

06/03/2019, 8:14 AM
Of course
same way as BehaviorSubject
My point that cold channels cannot behave as BehaviorSubject, if it behaves as BehaviorSubject it’s hot by definition
t

Tuan Kiet

06/03/2019, 8:15 AM
let me do some research
brb
g

gildor

06/03/2019, 8:15 AM
Research? %)
but you still can collect values from Channel
t

Tuan Kiet

06/03/2019, 8:16 AM
investigate stuff about RxJava
g

gildor

06/03/2019, 8:16 AM
Copy code
val channel = ConflatedBroadcastChannel<T>()
channel.asFlow().collect {

}
t

Tuan Kiet

06/03/2019, 8:16 AM
does that become a cold stream
g

gildor

06/03/2019, 8:16 AM
No
t

Tuan Kiet

06/03/2019, 8:16 AM
or still hot?
g

gildor

06/03/2019, 8:17 AM
of course backing channel is still hot
but it can be consumed using cold abstraction
exactly the same as in RxJava
you create Subject, but subscribe on Observable
t

Tuan Kiet

06/03/2019, 8:35 AM
what does that mean
consumed using cold abstraction
you mean this?
Copy code
val obs = BehaviorSubject.create<String>()
        .observeOn(<http://Schedulers.io|Schedulers.io>())
is this Observable cold or hot?
g

gildor

06/03/2019, 8:38 AM
It’s hot, but it’s not exposed to subscriber
t

Tuan Kiet

06/03/2019, 8:46 AM
sr, a misconception about hot and cold, I though hot is thread blocking until the stream is disposed. turn out we just care about does the stream already have value or not is that correct?
g

gildor

06/03/2019, 8:48 AM
hot means that stream already started and running, it may be some resource (like socket) or just some cached value (like BehaviorSubject), cold stream is lazy and started only when someone subscribe on it and finish when unsubscribe (and because of this can be used only once)
t

Tuan Kiet

06/03/2019, 8:51 AM
ironically I’m already read that, feel bad man
g

gildor

06/03/2019, 8:51 AM
what does that mean
consumed using cold abstraction
It means that underlying stream of events is hot (which is represented by Channel in coroutines and by Subject in rxjava) but you subscribe on those event using Cold API (Flow in coroutines, Observable/Completable/Maybe/Flowable in RxJava)
t

Tuan Kiet

06/03/2019, 9:05 AM
so… my statement is correct?
about hot and cold
g

gildor

06/03/2019, 9:06 AM
turn out we just care about does the stream already have value or not is that correct
This one? It’s correct, but not necessary to have (cache) value to be hot, it may be some hot resource (like InputStream, socket, user interface etc)
t

Tuan Kiet

06/03/2019, 9:06 AM
for example, there is a stream, if no one subscribes to it, if it already has value -> hot, if not -> cold
g

gildor

06/03/2019, 9:08 AM
Cold cannot exist without subscribes, it will be started only when someone is subscribed
t

Tuan Kiet

06/03/2019, 9:09 AM
ok, are you an android dev? if yes do you use Room?
g

gildor

06/03/2019, 9:09 AM
yes, no but plan
t

Tuan Kiet

06/03/2019, 9:10 AM
I use Room with RxJava, not sure how the dao method which return Observable<List<Foo>> is implemented
this should be a cold stream
g

gildor

06/03/2019, 9:11 AM
Yes, probably
but not necessary
t

Tuan Kiet

06/03/2019, 9:12 AM
from what I see, every time I subscribe to this observable, I only get the latest value
g

gildor

06/03/2019, 9:12 AM
But why do you worry about it
?
from what I see, every time I subscribe to this observable, I only get the latest value
make sense
t

Tuan Kiet

06/03/2019, 9:14 AM
so if there are 2 subscribers to this observable, when the underlining data change, it will trigger 2 query
2 re-query
g

gildor

06/03/2019, 9:15 AM
yes
I only get the latest value
I believe you do not get latest value, this Flowable will requiest current value from database
t

Tuan Kiet

06/03/2019, 9:15 AM
that exactly what I’m worried about
g

gildor

06/03/2019, 9:16 AM
so yeah 2 cold observvables, each with own instance
In RxJava you can easily share it with
Flowable.share()
, and subscribe on this Flowable, same with Flow, just use
Flow.broadcastIn
so underlying implementation is cold, but you convvert it to hot that supports multiple subscribers
t

Tuan Kiet

06/03/2019, 9:19 AM
ok, didn’t know that method exists
I realy confuse with jake implementation on flow extension for SqlDelight
g

gildor

06/03/2019, 9:20 AM
Why?
t

Tuan Kiet

06/03/2019, 9:20 AM
he using flowViaChannel which is hot
g

gildor

06/03/2019, 9:21 AM
Because this is an adapter for existing listener
you cannot call
emit
from non-suspend function
t

Tuan Kiet

06/03/2019, 9:22 AM
so even if no one is subscribed to the flow, we still need to trigger a query
?
g

gildor

06/03/2019, 9:22 AM
and you also need some background coroutine to keep reference and handle queryResultsChanged callbacks
no
in this case flow will be started when someone subscribed on it
t

Tuan Kiet

06/03/2019, 9:22 AM
why? channel is all hot right?
g

gildor

06/03/2019, 9:23 AM
yes, but it will be created in this case only when flow is subscvirbed, this is just how flowViaChannel implemented
so it’s lazy, but when subscribed hot channel will be created under the hood
t

Tuan Kiet

06/03/2019, 9:26 AM
oh, I see, it is a flow wrap around a channel, not a channel converted to flow
g

gildor

06/03/2019, 9:28 AM
It’s also more safe, there is no way to forget close this channel, because channel will be closed if consuming coroutine will be cancelled
t

Tuan Kiet

06/03/2019, 9:28 AM
good job Andrey, you blow my mind again
thank
g

gildor

06/03/2019, 9:29 AM
You are welcome!
Want to add that Channels are still very important primitive to implement many use cases and even Flow operators See this article (especially “Declarative programming” part) https://medium.com/@elizarov/kotlin-flows-and-coroutines-256260fb3bdb
3 Views