I guess this is a workaround although it is kinda...
# announcements
a
I guess this is a workaround although it is kinda jank
g
Yes, suspending delegates are not available now at least because properties which do not support suspend modifier, at least for now Do you have particular use case for this? I'm not really sure what you doing in your code and why do you need delegate there instead of simple assignment, just:
val a = aAsync.await()
, without any delegation
a
alright so I am taking two `Flow`s of data and I am passing them into a HOF which will do calculations over time and want the latest value of the
Flow
(which will update over time) ... the reason for suspending at first is to wait until the first value from `Flow`s are returned
g
I mean that usually, if you want to do some async operation only once (as lazy works), you can just create async, save it to some field and than clients just call await() on the same instance, each of them will receive the same result, await will be run only once
Hmm, hard to say without example of code, not really sure that you can use Flow for such case, but maybe you could give a bit more context
What is HOF?
Just if you have 2 Flows and want somehow combine their data, shouldn't you use some zip or combineLatest operator, depending on case
a
HOF is a higher order function
also I am not zipping data
I am essentially turning the two flows into sources which can be polled from another long-running suspending function
g
Polled? Hmm, but it's cold stream, there is no polling of data, it's push based source
Looks that you need some Conflated broadcast channel to poll data
But hard to day without some sample
a
I think that makes sense, I am using a websocket to get prices of currencies
so initially it is a cold stream (new websocket created with initial function) but it should then be converted to a hot stream when referenced by functions which require polling
@gildor a ConflatedBroadcastChannel looks nice but I need a good way to suspend until the first value is received... any ideas?
g
Yes, use receive()
a
@gildor but
receive()
removes the last element... what if someone called
receive()
twice for one value? ...
g
But you cannot remove element from ConflatedBroadcastChannel
ConflatedBroadcastChannel.receive() you will just get current value
a
there is no
receive()
method in
ConflatedBroadcastChannel
I mean I guess I could use Mutexes but aren't those generally seen as bad...
g
ConflatedBroadcastChannel.openSubscription().receive()
or even better just ConflatedBroadcastChannel.value
a
but I need to suspend until the first value is received
so
ConflatedBroadcastChannel.value
does not work
g
ah you right, I forgot that you want await for the first value, just
ConflatedBroadcastChannel.openSubscription().receive()
than
a
wouldn't opening a new subscription each time be resource intensive?
or it just seems weird... probably just me tho
g
or it just seems weird
It’s not weird, this stream of events, you have to open it, even if this stream is actually observable data
a
but you are opening it every time you need to poll a value
g
But careful, you probably also want to close subscription, so to make it a bit safer you can use flow instead:
Copy code
ConflatedBroadcastChannel.asFlow().single() // or singleOrNull() if you don't want to throw exception
but you are opening it every time you need to poll a value (edited)
yes, but what do you expect? Or you await it and than you need some primitive, or you just get current value
I still not sure about your case, but it’s way how data streams work, not only reactive
a
my use case is I have two websockets for data (prices) which is essentially an event-driven and makes sense to use a
Flow
(even though normally hot data... but the web socket is created with a function so it is self-contained and is still cold) ... then I am using the two flows in another function which does period calculation given the current price... the
Flow
is used for updates in price.
g
period calculation given the current price
You shouldn’t use pull for this
or just wait for new data, or request this data by doing force update (just create a method for this)
a
I am getting the data from a websocket and I do not control the API
g
So than you don’t need any kind polling
a
however, once I get the data from the event-driven web socket I need to access it in a polling-like way
g
why?
if you want to cache use conflatedbroadcast channel + flow
a
the formula needs to request what the current last value of data is
yeah ^^^
using CBC and flow is fine
g
request what the current last value of data
but you said that you cannot force request last data you do not control API
a
I am using a web socket... then have a black box in my program which looks at data in my web socket and lets me request the last value
g
so what is problem with CBC, flow and combineLatest?
a
none I think...
which is what I was trying to say... don't think I was clear tho
g
ah, sorry
I just thought that we discussing how to do that
Did you see kotlinx.corotuines 1.3 release notes? https://github.com/Kotlin/kotlinx.coroutines/releases
channelFlow is what you need to wrap webscoket
with
bufferSize = Channel.CONFLATED
param
than use combine latest for 2 flows to get updates when one flows is changed using combineLatest: https://github.com/Kotlin/kotlinx.coroutines/issues/1193
l
Also, #coroutines 🙂