guys, I want to ask for some help, I feel my probl...
# coroutines
s
guys, I want to ask for some help, I feel my problem is very simple but I'm struggling for a while now to find the right way to implement it. the pattern I'm trying to build is: I have a list of items. I want to 'process' the items with <n> concurrency and get a list of all the results back after everything is processed. Right now my attempt is • put all items in channel 'todo' • launch n workers that pull out of channel 'todo' and push the result of their processing into channel 'done' • collect all results from channel 'done' In the code snippet there is a simple example. It works, except for the "log progress" part. If I try to get channel.count() , the channel seems to get emptied. Which is by design, if I understand the docs correctly. But how else would I get the channels current number of items? Generally, is this correctly implemented for this pattern? I love kotlin, but for concurrency, there is waaay to many different ways to do things. All the contexts, dispatchers, channels, flows(?), async, jobs, consumer, etc etc ... Compared to e.g. golang, the concurrency is very complex.
o
I don't think
count
works how you think it does: "The operation is terminal. This function consumes all elements of the original ReceiveChannel."
probably the only appropriate way to track this is going to be adding an
atomic
counter, or having a separate coroutine that you explicitly send message through and that manages its own counter
s
sorry, I have just edited my original text, I was aware of that it says this in the docs. But, first of all, what good is count() then, if it consumes all elements?
I see... yes I could use a counter, true!
o
think of it as the end result of a stream operation, which is why it is now deprecated to be replaced with Flow
e.g.
channel.filter { x % 2 == 0 }.count()
s
I see, thank you! So going forward, it is recommended to use flow instead of channel for this pattern?
o
I think Flow will better lend itself to this type of work, yes. The counter integration might be a little difficult though
s
I think using an atomic counter will work for now! Thanks a lot Octavia!
e
@seb Just FYI, if you know how to do it in GoLang you can usually it port it to Kotlin line-by-line — all the same primitives AFAIK are available. What primitive are you missing?
j
todo. FlowOn(done)
s
@elizarov Sorry maybe my statement wasn't clear, I'm not missing any primitives - in contrast - I feel they are overwhelmingly many. In golang, there's goroutines, channels, waitgroup and that's more or less about it. In Kotlin, there's many more things, there's coroutines, contexts, dispatchers, jobs, suspend functions, actors, flows, async/await, promises..... Don't get me wrong, as I haven't really fully learned it, I'm not saying this is good or bad, but from my perspective, Kotlin's concurrency is more complex to learn than golang's!
However, I actually do miss one thing, as we discussed in this thread, how to get the current number of items in a channel? In golang, you use len(channel). However, in Kotlin, channel.count() will empty the channel... 🙂 Anyway, Octavia suggested helpfully to use a atomic counter to track my channels progress...
o
I think the mistake that you are making is that contexts, dispatchers, jobs, actors, flows, etc. are primitives. The only concurrency primitive is the continuation / suspend functions. The other parts I could consider "primitives", but they are another category, because they do not come with the language itself, they are a library. And actors/async/promises are all built on jobs/contexts, so I would hardly call them primitives. They are higher level helper functions.
s
Fair enough, primitive is the wrong term, I should use concept. However my statement holds that there's a large amount of concepts compared to golang. I'm not trying to fight any pointless language wars here, and I'm happy with Kotlin, but this one confuses me a lot 🙂
e
@seb I’d be very interested to see an example code in GoLang that is using
len(chan)
to solve the problem you are facing. We did not add this ability to get the number of items in a channel because we did not see any use-case for it. We do have
chan.isEmpty
, though (there were a couple of uses identified).
As for a number of concepts we have in Kotlin there’s one thing we are clearly missing now is a systemic documentation explaining their relation. In software abstractions are build in layers — higher level abstractions are built on top of lower-level ones, etc. Suspension is the lowest level. You have it GoLang, too, but you never work with it directly, unless you interop with some native code or write GoLang runtime itself. Then channels are built on top of it. Channels are good, but they are also somewhat low-level just like imperative loops and arrays — you can do anything with them, but it takes a lot of repetitive and error-prone code. In practice you solve typical problems day after day and that is where higher level abstractions like collections and flows help. They allow you to express what you want to achieve in declarative fashion without having to spell out all the steps in a nitty-gritty detail every time. We are still building those implementations using flow for typical every-day tasks people face.
s
@elizarov thanks for the explanation! thanks for all the work on Kotlin and co-routines, it's very appreciated ! For "I’d be very interested to see an example code in GoLang that is using
len(chan)
to solve the problem you are facing.": just in my original example here in this thread. A number of workers are processing items from a channel of tasks. While processing is ongoing, I want to know how many items are left. In golang, I would launch one extra co-routine that would do while (len(chan) > 0) log(len(chan)) Of course I can use an atomic counter to solve this, but that is like saying in order to know how much items are in a standard collection, I should count how many I put in...
e
@seb I really would love to see some working GoLang code. I’m really puzzled at how the code using
len(chan)
actually works. Can you write some small self-contained demo?
s
I will do!
e
I see. You only use
len(chane) > 0
. That is what
Channel.isEmpty
does in Kotlin. I was under impression that you actually somehow use the result of
len
other than comparing it with zero.
Also, the code would not actually work all the time. Look at lines 44-45:
Copy code
for len(channel) > 0 { // line 44
		item := <-channel  // line 45
Consider a case where just one task remains the channel and two workers execute like 44 successfully. Now one of them receives item from the channel at line 45 and the other waits forever.
The correct solution for both Go and Kotlin is to use
for
loop over the channel that works until the channel is closed. You need to close the channel at line 20.
The progress report at line 55 is the only correct usage, since it is non-functional.
s
Fair enough, but the use-case for len(chan) was not in the worker part but in the log progress part!
Copy code
fmt.Printf("%v items left\n", len(channel))
Isn't that a fair use case , to get to know how many items are left in a channel?
e
As we can see here, even ability to do
len(chan) > 0
(
isEmpty
) leads to easy-to-write-but-incorrect code. That was one of the reasons we’ve even considered removing
isEmpty
, but …
It is not really a use-case, since it is a non-functional, but rather a diagnostic feature. It does not benefit from any kind of integration into the channel itself, as it can be implemented by a separate atomic variable. Note, that a separate variable gives you much more flexibility and control. For example, with a separate variable you can count how many tasks you’ve completed executing, as opposed to counting how many tasks you’ve started executing.
I, for one, is usually much more interested to see the number of “completed” tasks in my logs.
s
I see what you mean... I understand, that makes sense! However, I guess I'm not the only 'noob' out there getting confused by a lack of any method to get the number of items in a channel, and .count() empties the channel, which is quite non-intuitive I think 🙂 Anyway, myself, I understand it now! Thanks a lot for the clarifications, Roman!
e
count()
is deprecated and will be removed in the future.
s
👍
e
Also, we do hope than in the future you will not ever need to write the code with such a low-level primitive as channel, just like you don’t generally write in assembly nowadays.
^ There’s still a long road to that, though
s
👍 cool! Thanks for the great work!