Andrew Gazelka
06/10/2019, 9:18 PMAndrew Gazelka
06/10/2019, 9:24 PM::currentCollected
in HOFs but I can do {currentCollected}
... any reason for this?elizarov
06/10/2019, 9:25 PMAndrew Gazelka
06/10/2019, 9:26 PMAndrew Gazelka
06/10/2019, 9:27 PM::currentCollected
is cleanerAndrew Gazelka
06/10/2019, 9:27 PMelizarov
06/10/2019, 9:29 PMlast()
operator? If so, please create a separate issue with your use-case at https://github.com/Kotlin/kotlinx.coroutines/issueselizarov
06/10/2019, 9:31 PMcurrentCollected
is supposed to be used and what for?Andrew Gazelka
06/10/2019, 9:35 PMlaunch { ... }
modifying currentValue
works... might need a separate wrapper object...Andrew Gazelka
06/10/2019, 9:36 PMcurrentCollected
would be a way to access the latest output of the called flow in a non-suspending mannerAndrew Gazelka
06/10/2019, 9:37 PMlatest()
method/property in a BroadcastChannel
?Andrew Gazelka
06/10/2019, 9:39 PMval bc = Flow<T>#broadcastIn(...)
functionThatNeedsBc(bc::latestValue)
Andrew Gazelka
06/10/2019, 9:50 PMAndrew Gazelka
06/10/2019, 11:06 PMAndrew Gazelka
06/10/2019, 11:09 PMAndrew Gazelka
06/10/2019, 11:21 PMjw
06/11/2019, 12:56 AMAndrew Gazelka
06/11/2019, 1:02 AMjw
06/11/2019, 1:03 AMAndrew Gazelka
06/11/2019, 1:06 AMjw
06/11/2019, 1:08 AMAndrew Gazelka
06/11/2019, 1:10 AMjw
06/11/2019, 1:11 AMCompletable
, Single
, and Maybe
types of the library, not `Observable`/`Flowable`.elizarov
06/11/2019, 5:31 AMelizarov
06/11/2019, 5:44 AMcombineLatest
does? Any chance it is what you are looking for?Andrew Gazelka
06/11/2019, 5:45 AMRx
combineLatest?Andrew Gazelka
06/11/2019, 5:46 AMAndrew Gazelka
06/11/2019, 5:51 AMFlow > ConflatedBroadcastChannel
and then CBC.asFlow().single()
works for what I am trying to do @elizarovgildor
06/11/2019, 5:51 AMAndrew Gazelka
06/11/2019, 5:52 AMval flow: Flow<T> = ...
val cbc = ConflatedBroadcastChannel<T>()
flow.consume {
cbc.send(it)
}
gildor
06/11/2019, 5:53 AMAndrew Gazelka
06/11/2019, 5:53 AMgildor
06/11/2019, 5:54 AMAndrew Gazelka
06/11/2019, 5:54 AMgildor
06/11/2019, 5:54 AMgildor
06/11/2019, 5:55 AMbufferSize = Conflated
gildor
06/11/2019, 5:56 AMAndrew Gazelka
06/11/2019, 6:04 AMgildor
06/11/2019, 6:19 AMAndrew Gazelka
06/11/2019, 6:31 AMAndrew Gazelka
06/11/2019, 6:31 AMAndrew Gazelka
06/11/2019, 6:31 AMAndrew Gazelka
06/11/2019, 6:31 AMgildor
06/11/2019, 6:41 AMAndrew Gazelka
06/11/2019, 6:46 AMAndrew Gazelka
06/11/2019, 6:46 AMgildor
06/11/2019, 6:47 AMgildor
06/11/2019, 6:48 AMgildor
06/11/2019, 6:48 AMAndrew Gazelka
06/11/2019, 6:48 AMAndrew Gazelka
06/11/2019, 6:48 AMgildor
06/11/2019, 6:48 AMAndrew Gazelka
06/11/2019, 6:49 AMgildor
06/11/2019, 6:49 AMYou have a process which requires getting the latest value from the flowYou cannot do this by default, but you can use Conflated channel
Andrew Gazelka
06/11/2019, 6:49 AMgildor
06/11/2019, 6:49 AMAndrew Gazelka
06/11/2019, 6:49 AMAndrew Gazelka
06/11/2019, 6:50 AMAndrew Gazelka
06/11/2019, 6:50 AMgildor
06/11/2019, 6:57 AMgildor
06/11/2019, 6:58 AMAndrew Gazelka
06/11/2019, 11:49 PM1
over and over againgildor
06/12/2019, 12:01 AMAndrew Gazelka
06/12/2019, 2:09 AMAndrew Gazelka
06/12/2019, 2:10 AMand if you wrap it to flow with conflated buffer it will be safe to use
Andrew Gazelka
06/12/2019, 2:40 AMgildor
06/12/2019, 2:45 AMgildor
06/12/2019, 2:45 AMgildor
06/12/2019, 2:46 AMgildor
06/12/2019, 3:03 AMimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combineLatest
import kotlinx.coroutines.flow.flowViaChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
// Create Flow abstraction on top of WS connection
// WS will be opened lazy on flow start and closed on Flow cancellation
fun wsFlow(id: String, delay: Long) = channelFlow {
var i = 0
while (true) {
delay(delay) // Simulate WS service delay and new events
i++
send("$id:$i")
}
//TODO: Handle WS cancellation and reconnection
}
fun main() = runBlocking {
val job = launch {
// Combine 2 web sockets
wsFlow("a", 200).combineLatest(wsFlow("b", 250)) { ws1, ws2 ->
"$ws1 $ws2"
}.collect {
println(it)
}
}
delay(2000)
job.cancel()
}
Andrew Gazelka
06/12/2019, 3:09 AMAndrew Gazelka
06/12/2019, 3:09 AMAndrew Gazelka
06/12/2019, 3:10 AMAndrew Gazelka
06/12/2019, 3:10 AMgildor
06/12/2019, 3:11 AMAndrew Gazelka
06/12/2019, 3:14 AMAndrew Gazelka
06/12/2019, 3:15 AMAndrew Gazelka
06/12/2019, 3:15 AMgildor
06/12/2019, 3:16 AMAndrew Gazelka
06/12/2019, 3:16 AMCBC.asFlow().single()
gildor
06/12/2019, 3:17 AMAndrew Gazelka
06/12/2019, 3:18 AMgildor
06/12/2019, 3:18 AMgildor
06/12/2019, 3:18 AMgildor
06/12/2019, 3:19 AMgildor
06/12/2019, 3:19 AMAndrew Gazelka
06/12/2019, 3:19 AMgildor
06/12/2019, 3:19 AMgildor
06/12/2019, 3:19 AMAndrew Gazelka
06/12/2019, 3:20 AMgildor
06/12/2019, 3:20 AMAndrew Gazelka
06/12/2019, 3:21 AMgildor
06/12/2019, 3:21 AMAndrew Gazelka
06/12/2019, 3:21 AMAndrew Gazelka
06/12/2019, 3:22 AMgildor
06/12/2019, 3:22 AMAndrew Gazelka
06/12/2019, 3:22 AMAndrew Gazelka
06/12/2019, 3:23 AMgildor
06/12/2019, 3:23 AMAndrew Gazelka
06/12/2019, 3:24 AMgildor
06/12/2019, 3:24 AMgildor
06/12/2019, 3:24 AMgildor
06/12/2019, 3:26 AMAndrew Gazelka
06/12/2019, 3:26 AMAndrew Gazelka
06/12/2019, 3:26 AMfun run(weather: Flow<Weather>){}
gildor
06/12/2019, 3:27 AMAndrew Gazelka
06/12/2019, 3:28 AMgildor
06/12/2019, 3:29 AMgetCurrentValue
becoming `getCurrentValueIfSomeOtherCodeAlsoDidSomething”gildor
06/12/2019, 3:29 AMgetCurrentCachedValue
it make more sense, at least it clear that it’s only from cacheAndrew Gazelka
06/12/2019, 3:30 AMgildor
06/12/2019, 3:32 AMgildor
06/12/2019, 3:32 AMgildor
06/12/2019, 3:32 AMgildor
06/12/2019, 3:33 AMinterface WeatherSource {
suspend fun latestCachedWeather(): String
val weatherFlow: Flow<String>
}