so i watched this video <https://www.youtube.com/w...
# coroutines
b
so i watched this video

https://www.youtube.com/watch?v=a3agLJQ6vt8

and i tried to recreate what he was doing with downloading with an uploader. https://gist.github.com/bitkid/c66d198c2930bd4c5dbc753cf96b4c2f . i have uploadWorkers (inputchannel with files, output channel with uploadResponse), an uploadResultHandler which listenes to the uploadResponseChannel and a "main" method which combines the workers with handler. the problem i have now .. how do i write a main method for that. so how does the resulthandler know that he is done listening to uploadresults? i can't close the response channel in the worker .. basically i am confused 🙂
i came up with something like this
Copy code
object Uploader {
    @JvmStatic
    fun main(args: Array<String>) {
        val files = Channel<File>()
        GlobalScope.launch {
            uploader(files)
        }
        files.sendBlocking(File("a"))
    }
}
but this only waits until the file is sent .. not until the response has been processed
i mean what i can do is do uploading and processing the files in one step ... then i know when all sendBlocking() are done i am done
is there maybe a channel implementation which expects a certain amount elements?
r
Use
runBlocking
e.g.
Copy code
fun main(args: Array<String>) = runBlocking { ... }
b
that doesn't really solve the problem. so basically .. i have workloads (a finite amount) .. i submit it to a worker pool through a channel which processes the work and and then it submits a work done message to a different channel. this channel is then consumed by a different coroutine. how can i make sure that this coroutine got a message for each workload submitted to the pool? when i close the channel after all tasks have been submitted to the workers i might loose messages on the consuming side.
i mean i could use something like a countdownlatch or cyclicbarrier .. but i thought maybe coroutines has something OOTB.
r
I would simply move things around
Copy code
object Uploader {
    @JvmStatic
    fun main(args: Array<String>) = runBlocking {
        val files = Channel<File>()
        GlobalScope.launch {
            files.send(File("a"))
        }
        uploader(files)
    }
}
now your main function waits for
uploader()
to finish
I just realized your
uploader()
is an extension of
CoroutineScope
, correct?
b
y
maybe using in channels in this context is not the best abstraction
basically i want to upload a lot files concurrently with having control over the concurrency .. basically nr of threads. when using channels i can controll the concurrency by controlling the numbers of consumers of the channel. but i guess i could also just use a fixedThreadPool.asDispatcher() and withContext(pool) launch {}
i am also wondering .. when i submit lets say 100 items to a channel and then call channel.close() .. how can i make sure that the items have been processed on the reading side?
the approach i used here does what it should do .. with a countdown latch .. but i have the feeling somehow that's an anti-pattern https://gist.github.com/bitkid/68fd50575f35ab456f5b03a7746437f2
r
yeah, remove your latch. I whipped something up.
Copy code
val files = (1..10).map { File("$it") }
val channel = Channel<File>()

launch {
    files.forEach {
        channel.send(it)
    }
    channel.close()
}

val time = measureTimeMillis {
    coroutineScope {
        uploader(channel)
    }
}
println(time)
then add
responses.close()
inside
uploadWorker()
after your for loop
I’m not 100% sure if using
coroutineScope
like this here is the right way to go about it, but the only alternative I can think of is making
uploader()
suspending
Copy code
suspend fun uploader(files: ReceiveChannel<File>,
                            nrOfWorkers: Int = 4) = coroutineScope { ... }
b
I have a similar structure in my system, and using Channels. Basically have a single actor managing state (in memory or persisted). Send the Batch of items to it to manage, have it track what is outstanding per batch, and also ensure each completed item is processed by it. Essentially a reconciliation issue. It fans out to a limited number of workers (co-routines) to control concurrency, rather than just spinning up new co-routines per item
b
but when i call responses.close() after running the loop .. how can i make sure uploadResultHandler has everything processed already? plus, when i have multiple workers .. the worker potentially is still working and wants to sent his last result to responses .. when another worker already received the last element.
what i mean is .. even if you are the one getting the last file it doesnt mean that you are the last to finish processing
so uploadworker-1 gets element n-1 .. it takes 10 seconds to process .. uploadworker-2 gets element n .. but it only takes 1 second to process and it closes the channel so uploadworker-1 can't send the remaining result
so i need something external to synchronize the results
🤔 1