https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tolriq

05/19/2020, 12:31 PM
What would be the proper way to have a continuous source of elements that one could take elements from without closing? Like a pool of 10 items that require heavy computation, that is only refilled by the correct amount each time one is removed.
g

gildor

05/19/2020, 12:41 PM
chunked
operator for Flow (like existing one, but for List) would be perfect for this, but it doesn't exist yet, you can check issue tracker of Coroutines, there is some ad-hoc implementations as I remember
Or maybe you want to get up to 10, all the time, not chunks, so you could just limit it using flatMap concurrency param
t

Tolriq

05/19/2020, 12:51 PM
Yes up to X and usually will be 1 unless user want to see what will come next and then the X are consumed. Concept would be I pass a function to a consumer and as the consumer consume the items, new items are generated up to a limit to avoid pre generating all possible items at once.
So i need to be able to collect from different places even if never at the same time
g

gildor

05/20/2020, 3:45 AM
Yeah, you can use flatMap* operator wth concurrency param
this concurrency limits amount of simultaneously running flows
so you processing could be represented as flow (even with 1 element), so flatMap will help limit amount of simultaneous processings
t

Tolriq

05/20/2020, 6:49 AM
Ok but how do I collect without closing then? I'm missing some conceptual stuff about Flow it seems, do you know any advanced tutorials? (And thanks for the help)
g

gildor

05/20/2020, 7:21 AM
Ok but how do I collect without closing then?
What do you mean? flatMap concurrency limits how many Flow which it produced are consuming at the same time. It doesn’t close, just doesn’t subscribe on new
t

Tolriq

05/20/2020, 7:27 AM
Everything seems terminal in Flow, what would be the equivalent of channels Poll ?
g

gildor

05/20/2020, 7:34 AM
Flow is cold stream
it doesn’t use poll strategy, it uses push
It’s hard to discuss for me particular use case without some example of code or just some scheme, what kind behaviour you expect
t

Tolriq

05/20/2020, 7:49 AM
The one I tried to explain at first, you proposed Flow so I tried to see how I can fit it 🙂 I'll try to explain better The need is that I need to pass "something" to a client that client needs to be able to take 1 up to X items from that something that will be removed. And that something needs to always have X items pre generated as the generation is heavy.
There's a few ways to achieve that but I'm pretty sure there's a "native" way to do that with channels or flow but just can't grasp it.
g

gildor

05/20/2020, 8:17 AM
something that will be removed
What does it mean?
t

Tolriq

05/20/2020, 8:22 AM
That's it's no more present in the source. Imagine the something as a diamond builder, creating a diamond takes times so you want to have some stock before selling them. But not too much in case no one purchase them. My first idea what to have a channel that the client pool and a loop that check the count of the pool and emit to it when under the X. But that sounds hacky and was wondering what is the proper way.
g

gildor

05/20/2020, 8:25 AM
still not sure, about semantics, you dont’t want emit values before you have some amount of them already in buffer?
t

Tolriq

05/20/2020, 8:30 AM
I can deal with both cases.
g

gildor

05/20/2020, 8:42 AM
With flow you can just write custom operator using transform
so it will cache required amount and emit when it’s ready to emit
t

Tolriq

05/20/2020, 8:45 AM
Ok but then previous question is back how do I collect only X without closing?
I really need to find a good tutorial I'm sure I'm just missing a piece of understanding
g

gildor

05/20/2020, 8:46 AM
you just collect it one by one
val buffer = mutableListOf<Something>() flow.collect { buffer += it if (buffer > 10) { emit(buffer.removeFirst()) } } something like this
t

Tolriq

05/20/2020, 8:49 AM
This does not support 2 collect ? Like for the diamonds, the one the guy is purchasing and another is looking at what will be available for purchase.
g

gildor

05/20/2020, 8:54 AM
you can just merge any amount of lfows
flow = merge(flow1, flow2) flow.collect { buffer += it    if (buffer > 10) {      emit(buffer.removeFirst())    } }
another is looking at what will be available for purchase
One, who is looking is collect, isn’t it?
t

Tolriq

05/20/2020, 8:57 AM
Yes it can collect (so remove) and I'll store at the shop level, or it can only look but not collect I can deal with both cases.
g

gildor

05/20/2020, 8:57 AM
or not collect, you can use transform who does this, sorry my example above was not exactly correct, it should be:
Copy code
val diamondMiner = flow { 
   while(true) {
     emit(mineDimond())
   }
}
val diamondStore = diamondMiner.transform { // your collector
    buffer += it
   if (buffer > 10) {
     //sends value to buyer
     emit(buffer.removeFirst())
   }
}

// purchase idamngs
diamondStore.collect {
  purchase(it)
}
t

Tolriq

05/20/2020, 9:03 AM
Ok and so the one who want to look just look at the buffer. Sounds simple. Just last question, can I cancel the collect scope without cancelleling the store?
g

gildor

05/20/2020, 9:07 AM
no
okay, it’s more complicated
see, Flow itself is single consumer abstraction, so if you start collecting it you have to close it or collect until it will end
t

Tolriq

05/20/2020, 9:08 AM
That's what I had understood before having doubts 😛
g

gildor

05/20/2020, 9:09 AM
now there are no operators to share flow
but work on them in progress
t

Tolriq

05/20/2020, 9:10 AM
The close reuse is not that important it would avoid regenerating the X items on migration to another client but this is not a frequent thing.
g

gildor

05/20/2020, 9:10 AM
share means that you can have 1 flow and just create some sort of shared flow, which can be closed without closing your original flow
so now it possible to use BrodactastChannel and convert it to flow
so consumers will use flow, but under the hood it will be channel, which will not be cancelled when flow is canceleed
t

Tolriq

05/20/2020, 9:11 AM
From recent post from elizarov it's all deprecated to something that will come 🙂
g

gildor

05/20/2020, 9:11 AM
It’s correct
but broadcast channel is still not deprecated
t

Tolriq

05/20/2020, 9:11 AM
I suppose I'll wait a little for the shared flow stuff 🙂
g

gildor

05/20/2020, 9:12 AM
it’s experimental and probably most of it usecases will be replaced with Flow and share operators
you can wait
t

Tolriq

05/20/2020, 9:12 AM
There's no urgency in implementing this for me I think I'll wait now that I understand better thanks a lot for the details.
g

gildor

05/20/2020, 9:12 AM
or you can implement everything what we discussed with RxJava or other feature-full Reactive library
it has all required operators
but transform will be not so easy to write
t

Tolriq

05/20/2020, 9:13 AM
I'm full coroutines with actors and channels those I understand 😛 Won't add RX just for that
g

gildor

05/20/2020, 9:16 AM
but if you have channels, what is probelm to use BroadcastChannel?
it will be quite easy later convert to Flow with sharing, if it would require
t

Tolriq

05/20/2020, 9:21 AM
I need to use BroadcastChannel + asFlow ? As I don't know how to refill the channel.
Anyway don't want to take too much time from you, you already helped a lot.
g

gildor

05/20/2020, 9:21 AM
You do not refill the channel
channel is your mine with miner
it just produces diamonds
and continue do this even if client is cancelled
Anyway don’t want to take too much time from you, you already helped a lot.
Hope it helped Reactive approach requires a bit different mindset, so it needs time to adapt tasks to it, but it has many advantages Like in your case consumer is simple, all caching logic is hidden above, no need to request particular amount of items
t

Tolriq

05/20/2020, 9:24 AM
I have the basic mindset for simple cases 🙂 But still far from handing all complex cases with multiple endpoints 😞
2 Views