What would be the proper way to have a continuous ...
# coroutines
t
What would be the proper way to have a continuous source of elements that one could take elements from without closing? Like a pool of 10 items that require heavy computation, that is only refilled by the correct amount each time one is removed.
g
chunked
operator for Flow (like existing one, but for List) would be perfect for this, but it doesn't exist yet, you can check issue tracker of Coroutines, there is some ad-hoc implementations as I remember
Or maybe you want to get up to 10, all the time, not chunks, so you could just limit it using flatMap concurrency param
t
Yes up to X and usually will be 1 unless user want to see what will come next and then the X are consumed. Concept would be I pass a function to a consumer and as the consumer consume the items, new items are generated up to a limit to avoid pre generating all possible items at once.
So i need to be able to collect from different places even if never at the same time
g
Yeah, you can use flatMap* operator wth concurrency param
this concurrency limits amount of simultaneously running flows
so you processing could be represented as flow (even with 1 element), so flatMap will help limit amount of simultaneous processings
t
Ok but how do I collect without closing then? I'm missing some conceptual stuff about Flow it seems, do you know any advanced tutorials? (And thanks for the help)
g
Ok but how do I collect without closing then?
What do you mean? flatMap concurrency limits how many Flow which it produced are consuming at the same time. It doesn’t close, just doesn’t subscribe on new
t
Everything seems terminal in Flow, what would be the equivalent of channels Poll ?
g
Flow is cold stream
it doesn’t use poll strategy, it uses push
It’s hard to discuss for me particular use case without some example of code or just some scheme, what kind behaviour you expect
t
The one I tried to explain at first, you proposed Flow so I tried to see how I can fit it 🙂 I'll try to explain better The need is that I need to pass "something" to a client that client needs to be able to take 1 up to X items from that something that will be removed. And that something needs to always have X items pre generated as the generation is heavy.
There's a few ways to achieve that but I'm pretty sure there's a "native" way to do that with channels or flow but just can't grasp it.
g
something that will be removed
What does it mean?
t
That's it's no more present in the source. Imagine the something as a diamond builder, creating a diamond takes times so you want to have some stock before selling them. But not too much in case no one purchase them. My first idea what to have a channel that the client pool and a loop that check the count of the pool and emit to it when under the X. But that sounds hacky and was wondering what is the proper way.
g
still not sure, about semantics, you dont’t want emit values before you have some amount of them already in buffer?
t
I can deal with both cases.
g
With flow you can just write custom operator using transform
so it will cache required amount and emit when it’s ready to emit
t
Ok but then previous question is back how do I collect only X without closing?
I really need to find a good tutorial I'm sure I'm just missing a piece of understanding
g
you just collect it one by one
val buffer = mutableListOf<Something>() flow.collect { buffer += it if (buffer > 10) { emit(buffer.removeFirst()) } } something like this
t
This does not support 2 collect ? Like for the diamonds, the one the guy is purchasing and another is looking at what will be available for purchase.
g
you can just merge any amount of lfows
flow = merge(flow1, flow2) flow.collect { buffer += it    if (buffer > 10) {      emit(buffer.removeFirst())    } }
another is looking at what will be available for purchase
One, who is looking is collect, isn’t it?
t
Yes it can collect (so remove) and I'll store at the shop level, or it can only look but not collect I can deal with both cases.
g
or not collect, you can use transform who does this, sorry my example above was not exactly correct, it should be:
Copy code
val diamondMiner = flow { 
   while(true) {
     emit(mineDimond())
   }
}
val diamondStore = diamondMiner.transform { // your collector
    buffer += it
   if (buffer > 10) {
     //sends value to buyer
     emit(buffer.removeFirst())
   }
}

// purchase idamngs
diamondStore.collect {
  purchase(it)
}
t
Ok and so the one who want to look just look at the buffer. Sounds simple. Just last question, can I cancel the collect scope without cancelleling the store?
g
no
okay, it’s more complicated
see, Flow itself is single consumer abstraction, so if you start collecting it you have to close it or collect until it will end
t
That's what I had understood before having doubts 😛
g
now there are no operators to share flow
but work on them in progress
t
The close reuse is not that important it would avoid regenerating the X items on migration to another client but this is not a frequent thing.
g
share means that you can have 1 flow and just create some sort of shared flow, which can be closed without closing your original flow
so now it possible to use BrodactastChannel and convert it to flow
so consumers will use flow, but under the hood it will be channel, which will not be cancelled when flow is canceleed
t
From recent post from elizarov it's all deprecated to something that will come 🙂
g
It’s correct
but broadcast channel is still not deprecated
t
I suppose I'll wait a little for the shared flow stuff 🙂
g
it’s experimental and probably most of it usecases will be replaced with Flow and share operators
you can wait
t
There's no urgency in implementing this for me I think I'll wait now that I understand better thanks a lot for the details.
g
or you can implement everything what we discussed with RxJava or other feature-full Reactive library
it has all required operators
but transform will be not so easy to write
t
I'm full coroutines with actors and channels those I understand 😛 Won't add RX just for that
g
but if you have channels, what is probelm to use BroadcastChannel?
it will be quite easy later convert to Flow with sharing, if it would require
t
I need to use BroadcastChannel + asFlow ? As I don't know how to refill the channel.
Anyway don't want to take too much time from you, you already helped a lot.
g
You do not refill the channel
channel is your mine with miner
it just produces diamonds
and continue do this even if client is cancelled
Anyway don’t want to take too much time from you, you already helped a lot.
Hope it helped Reactive approach requires a bit different mindset, so it needs time to adapt tasks to it, but it has many advantages Like in your case consumer is simple, all caching logic is hidden above, no need to request particular amount of items
t
I have the basic mindset for simple cases 🙂 But still far from handing all complex cases with multiple endpoints 😞