is there an idiomatic way to do this?
# coroutines
a
is there an idiomatic way to do this?
also I can't reference
::currentCollected
in HOFs but I can do
{currentCollected}
... any reason for this?
e
But why?
a
I am used to using this to reference fields, so I would think I would be able to use it to reference regular variables as well, but maybe I am misunderstanding it
and maybe it is just me but I think
::currentCollected
is cleaner
¯\_(ツ)_/¯
e
I don't understand what you are trying to achieve. Are you looking for some kind of
last()
operator? If so, please create a separate issue with your use-case at https://github.com/Kotlin/kotlinx.coroutines/issues
How this
currentCollected
is supposed to be used and what for?
a
something like this ... I don't even know if this would work since I am not familiar to with how
launch { ... }
modifying
currentValue
works... might need a separate wrapper object...
but
currentCollected
would be a way to access the latest output of the called flow in a non-suspending manner
perhaps it would be more appropriate to have a
latest()
method/property in a
BroadcastChannel
?
so if I wanted to do this I would do
Copy code
val bc = Flow<T>#broadcastIn(...)
functionThatNeedsBc(bc::latestValue)
actually should be a delegated property probably
message has been deleted
the reason I need to do this is to interact with the data of multiple flows ... one flow causes an event and the other flows I need to use to do calculations with data
j
That's what the compositional operators are for
a
@jw explain?
j
if you want the value of one flow in another, combine them in some way using an operator like zip or combineLatest
j
seems like Rx's withLatestFrom
a
hmm so think I should just add Rx as lib to my project? I know Roman Elizarov has written about how it is bad in comparison to coroutines because coroutines do not require a special syntax that needs to be memorized.
j
i think that miscategorizes the message he was trying to get across, but that would be focused on the
Completable
,
Single
, and
Maybe
types of the library, not `Observable`/`Flowable`.
1
e
@Andrew Gazelka please explain in more detail what exactly you are trying to achieve. How exactly are you using “track current”?
Have you used at what
combineLatest
does? Any chance it is what you are looking for?
a
Rx
combineLatest?
oh nvm
a
nah I think
Flow > ConflatedBroadcastChannel
and then
CBC.asFlow().single()
works for what I am trying to do @elizarov
g
what is Flow -> ConflatedBroadcastChannel
a
Copy code
val flow: Flow<T> = ...
val cbc = ConflatedBroadcastChannel<T>()
flow.consume  {
   cbc.send(it)
}
g
but what is the point of this code?
a
so that I can poll the latest value from cbc and suspend if there DNE a value yet
g
I think it’s kinda broken approach
a
yeah... it definitely isn't clean
g
I would wrap webscocket connection to flow using channelFlow
if you you also want to always keep latest value of this connection used
bufferSize = Conflated
such code with cbs.send() itself is not something bad by default of course, but with your use case I think it’s not really correct
a
how would I suspend if there is no latest val though?
g
Do you understand how combineLatest works?
a
Yes the issue is I might want to look at the same value twice
From one flow
I just want to look at the latest value
And I might poll that latest value multiple times
g
So you want to receive only new values and consume previous value?
a
I want to consume the most recent value the flow is on
And sometimes multiple times
g
isn’t it what combineLatest does?
if one flows is changed, the latest value from both will be emitted
actually, you even don’t need CBC for this
a
No my cases is even for one flow
You have a process which requires getting the latest value from the flow
g
I still don’t understand
a
Let’s say you are looking at current price of bitcoin
g
You have a process which requires getting the latest value from the flow
You cannot do this by default, but you can use Conflated channel
a
You are doing a periodic trade every 1 min
g
which allows to do that and cache current value
a
And need the price of bitcoin to do it
Yeah exactly a CBC right?
Or I guess just a CC maybe
g
Yes
and if you wrap it to flow with conflated buffer it will be safe to use
a
@gildor how would this work this will just return
1
over and over again
g
This will not work, because you restart stream each time when call .first()
a
what should I do then?
and if you wrap it to flow with conflated buffer it will be safe to use
I must be dumb at this point honestly...
g
See, first() is terminal operator, it means that it starts Flow, awaits 1 element and closes it
I just don’t understand how code that you show related to your WS sample
Let me illustrate how I got your case
Copy code
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combineLatest
import kotlinx.coroutines.flow.flowViaChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

// Create Flow abstraction on top of WS connection
// WS will be opened lazy on flow start and closed on Flow cancellation
fun wsFlow(id: String, delay: Long) = channelFlow {
    var i = 0

    while (true) {
        delay(delay) // Simulate WS service delay and new events
        i++
        send("$id:$i")
    }
    //TODO: Handle WS cancellation and reconnection
}

fun main() = runBlocking {
    val job = launch {
        
        // Combine 2 web sockets
        wsFlow("a", 200).combineLatest(wsFlow("b", 250)) { ws1, ws2 ->
            "$ws1 $ws2"
        }.collect {
            println(it)
        }
    }
    delay(2000)
    job.cancel()
}
a
The issue is for me the websocket is just used for UPDATING values... I have a periodic task which needs to look at the current values
imagine a websocket looking for updates of the current weather
and there is a screen on an app which needs to get the current weather
to get the current weather it needs to look at the last output value of the websocket
g
Where did you want to get current value? Load from server or just get latest cache value?
a
get latest cache value
(which is cached from the websocket)
however I also want to make sure I properly suspend if the value does not exist (until it exists), so this makes it harder
g
Use ConflatedBroadcastChannel than
a
and just
CBC.asFlow().single()
g
first()
a
is there an idiomatic way to turn a flow into a CBC?
g
Why would you turn flow into a CBC?
you need hot source
it really depends on how you want to manage it
there is a few ways. I would prefer cold Flow how I show above
a
my flow wraps around the websocket
g
without any caching on this level
and cache on level below
a
but you think a channelFlow is better?
g
channelFlow is just a way to work with callbacks like WS
a
ok well is my way bad then?
g
Also, it really depends, do you want to make this lazy or not and how you want to share connection
a
Flow > CBC > latest value
I am not sharing a connection—the function creates the connection
g
not sure how Flow > CBC will work in your case
a
and is only called one time in code
message has been deleted
g
But cbc will be empty until someone start connection
a
how is that an issue
g
If someone want to get current value it will suspend forever
and will not start connection
again, it maybe fine for your case
a
so getting the current value should occur after the connection is started (the flow is starting be collected)
right now I am basically having
fun run(weather: Flow<Weather>){}
g
no, you can get current value before, but you will not receive result until some other code will open this flow
a
yea how is that an issue
g
because
getCurrentValue
becoming `getCurrentValueIfSomeOtherCodeAlsoDidSomething”
of course if you rename to
getCurrentCachedValue
it make more sense, at least it clear that it’s only from cache
a
well if we are going full circle I think that function would be more appropriate on a channel than a flow
g
you can expose it as channel of course
it’s less secure and more error prone
I would instead hide channel in class
Like:
Copy code
interface WeatherSource {
    suspend fun latestCachedWeather(): String
    val weatherFlow: Flow<String>
}