Tuan Kiet
05/28/2019, 2:08 AMgildor
05/28/2019, 2:12 AMTuan Kiet
05/28/2019, 2:15 AMgildor
05/28/2019, 2:16 AMgildor
05/28/2019, 2:16 AMgildor
05/28/2019, 2:16 AMgildor
05/28/2019, 2:16 AMgildor
05/28/2019, 2:17 AMTuan Kiet
05/28/2019, 2:22 AMTuan Kiet
05/28/2019, 2:22 AMTuan Kiet
05/28/2019, 2:23 AMgildor
05/28/2019, 2:43 AMTuan Kiet
05/28/2019, 3:25 AMgildor
05/28/2019, 3:39 AM.hide()
gildor
05/28/2019, 3:40 AM.openSubscription()
to get ReceiveChannel or asFlow()
to create Flow instance that backed by ConflatedBroadcastChannelTuan Kiet
06/03/2019, 8:09 AMTuan Kiet
06/03/2019, 8:09 AM@FlowPreview
public fun <T> flowCONFLATEDViaChannel(
block: CoroutineScope.(channel: SendChannel<T>) -> Unit
): Flow<T> {
return flow {
coroutineScope {
val channel = ConflatedBroadcastChannel<T>()
launch {
block(channel)
}
channel.consumeEach { value ->
emit(value)
}
}
}
}
fun main() {
runBlocking {
val flow = flowCONFLATEDViaChannel<String> { channel ->
launch {
repeat(3) {
delay(250)
val aaa = UUID.randomUUID().toString()
println("emit $aaa")
channel.offer(aaa)
}
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
delay(1000)
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
flow.collect {
println("collect 1: $it")
}
}
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
delay(1000)
flow.collect {
println("collect 2: $it")
}
}
}
}
Tuan Kiet
06/03/2019, 8:09 AMemit 20156fc9-d86f-4e73-a866-acf142866a5a
collect 1: 20156fc9-d86f-4e73-a866-acf142866a5a
emit ff6d37d3-787a-4a0a-8389-393df111b2ef
collect 1: ff6d37d3-787a-4a0a-8389-393df111b2ef
emit a70090cc-4dc4-405f-986e-7b205a1b537a
collect 1: a70090cc-4dc4-405f-986e-7b205a1b537a
emit b14678dc-75c3-43c3-9bb9-155137fbeee0
collect 2: b14678dc-75c3-43c3-9bb9-155137fbeee0
emit 1de80811-d027-4659-9560-1949a5e781ef
collect 2: 1de80811-d027-4659-9560-1949a5e781ef
emit c329e6cd-8867-4209-affe-87f08c72f012
collect 2: c329e6cd-8867-4209-affe-87f08c72f012
gildor
06/03/2019, 8:10 AMgildor
06/03/2019, 8:10 AMTuan Kiet
06/03/2019, 8:10 AMgildor
06/03/2019, 8:11 AMgildor
06/03/2019, 8:11 AMgildor
06/03/2019, 8:11 AMgildor
06/03/2019, 8:11 AMgildor
06/03/2019, 8:12 AMTuan Kiet
06/03/2019, 8:12 AMgildor
06/03/2019, 8:13 AMTuan Kiet
06/03/2019, 8:13 AMgildor
06/03/2019, 8:13 AMTuan Kiet
06/03/2019, 8:14 AMgildor
06/03/2019, 8:14 AMgildor
06/03/2019, 8:14 AMgildor
06/03/2019, 8:15 AMTuan Kiet
06/03/2019, 8:15 AMTuan Kiet
06/03/2019, 8:15 AMgildor
06/03/2019, 8:15 AMgildor
06/03/2019, 8:16 AMTuan Kiet
06/03/2019, 8:16 AMgildor
06/03/2019, 8:16 AMval channel = ConflatedBroadcastChannel<T>()
channel.asFlow().collect {
}
Tuan Kiet
06/03/2019, 8:16 AMgildor
06/03/2019, 8:16 AMTuan Kiet
06/03/2019, 8:16 AMgildor
06/03/2019, 8:17 AMgildor
06/03/2019, 8:17 AMgildor
06/03/2019, 8:17 AMgildor
06/03/2019, 8:17 AMTuan Kiet
06/03/2019, 8:35 AMconsumed using cold abstraction
Tuan Kiet
06/03/2019, 8:36 AMval obs = BehaviorSubject.create<String>()
.observeOn(<http://Schedulers.io|Schedulers.io>())
Tuan Kiet
06/03/2019, 8:36 AMgildor
06/03/2019, 8:38 AMTuan Kiet
06/03/2019, 8:46 AMgildor
06/03/2019, 8:48 AMgildor
06/03/2019, 8:49 AMTuan Kiet
06/03/2019, 8:51 AMgildor
06/03/2019, 8:51 AMwhat does that meanIt means that underlying stream of events is hot (which is represented by Channel in coroutines and by Subject in rxjava) but you subscribe on those event using Cold API (Flow in coroutines, Observable/Completable/Maybe/Flowable in RxJava)consumed using cold abstraction
Tuan Kiet
06/03/2019, 9:05 AMTuan Kiet
06/03/2019, 9:05 AMgildor
06/03/2019, 9:06 AMturn out we just care about does the stream already have value or not is that correctThis one? It’s correct, but not necessary to have (cache) value to be hot, it may be some hot resource (like InputStream, socket, user interface etc)
Tuan Kiet
06/03/2019, 9:06 AMgildor
06/03/2019, 9:08 AMTuan Kiet
06/03/2019, 9:09 AMgildor
06/03/2019, 9:09 AMTuan Kiet
06/03/2019, 9:10 AMTuan Kiet
06/03/2019, 9:11 AMgildor
06/03/2019, 9:11 AMgildor
06/03/2019, 9:11 AMTuan Kiet
06/03/2019, 9:12 AMgildor
06/03/2019, 9:12 AMgildor
06/03/2019, 9:12 AMgildor
06/03/2019, 9:12 AMfrom what I see, every time I subscribe to this observable, I only get the latest valuemake sense
Tuan Kiet
06/03/2019, 9:14 AMTuan Kiet
06/03/2019, 9:14 AMgildor
06/03/2019, 9:15 AMgildor
06/03/2019, 9:15 AMI only get the latest valueI believe you do not get latest value, this Flowable will requiest current value from database
Tuan Kiet
06/03/2019, 9:15 AMgildor
06/03/2019, 9:16 AMgildor
06/03/2019, 9:17 AMFlowable.share()
, and subscribe on this Flowable, same with Flow, just use Flow.broadcastIn
gildor
06/03/2019, 9:17 AMTuan Kiet
06/03/2019, 9:19 AMTuan Kiet
06/03/2019, 9:20 AMgildor
06/03/2019, 9:20 AMTuan Kiet
06/03/2019, 9:20 AMgildor
06/03/2019, 9:21 AMgildor
06/03/2019, 9:21 AMemit
from non-suspend functionTuan Kiet
06/03/2019, 9:22 AMTuan Kiet
06/03/2019, 9:22 AMgildor
06/03/2019, 9:22 AMgildor
06/03/2019, 9:22 AMgildor
06/03/2019, 9:22 AMTuan Kiet
06/03/2019, 9:22 AMgildor
06/03/2019, 9:23 AMgildor
06/03/2019, 9:25 AMTuan Kiet
06/03/2019, 9:26 AMgildor
06/03/2019, 9:28 AMTuan Kiet
06/03/2019, 9:28 AMTuan Kiet
06/03/2019, 9:28 AMgildor
06/03/2019, 9:29 AMgildor
06/03/2019, 9:31 AM