Hello guys, any example about how to handle comple...
# coroutines
d
Hello guys, any example about how to handle complex concurrency with `actor`s?
t
Here is the playground to the sample code presented by Roman Elizarov at KotlinConf 2018: https://tinyurl.com/ycagcomy. It mainly showcases: - How to run multiple tasks in parallel using coroutines - How to communicate between coroutines - How to restrict the maximum number of workers You can watch the full presentation here:

https://www.youtube.com/watch?v=a3agLJQ6vt8

While the sample code in the playground does not use the
actor
function,
launch
with a reference to a
Channel
is functionally the same thing.
d
Yes, thanks, I already seen it before and watched again yesterday, while is true that
actor
is a
Channel
, the patterns is slightly different, since I don't think you would pass an actor as function param
t
What is your use case ?
d
I have an html
String
and some images on disk to inline in the html, I do the following steps: • Compress the images • Base64 the images • inline the images in html (
Document
from jsoup ) • Stringify the doc and deliver back Actually my class implements
CoroutineScope
from ViewModel’s scope ( constructor scope… :
CoroutineScope by ( scope + Default )
) and every step is a
val worker = actor {  }
. Actually I want to split some jobs in a N of workers and use the last worker with a “reverse debounce” operator ( when is received drop the current work and start again )
t
When creating an
actor
, the returned
Channel
matches the
capacity
parameter, which is
Channel.RendezVous
by default. This means that each step can process one element at the time. You can specify
capacity = N
to define a worker that can process up to
N
tasks in parallel.
If you need a reverse debounce actor, you could use the following structure to emulate one :
Copy code
fun CoroutineScope.reverseDebounceActor() = actor<String>(capacity = Channel.CONFLATED) {
    var currentJob: Job? = null

    for (element in channel) {
        currentJob?.cancel()
        currentJob = launch {
            // Do your work on element here...
            // Make sure that this work is cancellable!
        }
    }
}
d
Great, setting a capacity should make the trick, didn't think about that 🙂 actually I used an external channel with
UNLIMITED
. Something like
for element in list, send to queueChannel
, then
repeat( WORKERS_COUNT) queueChannel.receiverOrNull -> do async
In this way the elements are processed concurrently in the actor but delivered together to the next one ( since I realised is not a good idea to deliver one by one, since every time the UI is refreshed it bump to the top ). Have you got any better idea?
Copy code
private val imageCompressor = actor<List<EmbeddedImage>> {
        for (embeddedImages in channel) {
            val outputs = mutableListOf<ImageStream>()

            for (embeddedImage in embeddedImages) {
                imageSelector.send(embeddedImage)
            }

            (1..WORKERS_COUNT).forEachAsync {
                <process>
                outputs += embeddedImage to output
            }

            imageStringifier.send(outputs)
        }
    }

    private val imageSelector = Channel<EmbeddedImage>(UNLIMITED)
t
After reading the sample code again, I noticed that my explanation was wrong: the
capacity
parameter of
actor
actually controls the number of elements that can be queued, not the number of elements processed simultaneously by the actor. You need to create an actor for each required worker, just like in the example.
d
Yes, right: capacity = queue size. That’s why I didn’t tough about that 😛