``` produce<String>(capacity = 1024) { ...
# coroutines
b
Copy code
produce<String>(capacity = 1024) {
            input.forEachLine { async { send(it) } }
        }.map {
            async {
                val tokenized = tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ")
                tokenized + "\n"
            }
        }.map {
            output.appendText(it.await())
        }
g
I suppose that you use sequences, this will not work, sequence is sequential primitive
You have to get rid of forEachLine or write own suspend version
The source of this problem is that sequence extensions are not inlined (and they cannot be inlined due to lazy nature of sequence, and because of that you cannot use suspend functions and you have to run async on each item which in general not the best idea in terms of performance, but also you just cannot use map and other primitives properly
b
hmmm
s
Copy code
produce<String>(capacity = 1024) {
                val that = this
                listOf("1","2","3").asSequence().forEach { async { that.send(it) } }
            }
would a closure work?
b
forEachLine is coming from a file
and the file is too big to be loaded in memory
Copy code
val reader =        produce<String>(<http://Dispatchers.IO|Dispatchers.IO>, capacity = 1024) {
            input.forEachLine { runBlocking { send(it) } }
            this.close()
        }

        val writer = produce<String>(context, capacity=8) {
            reader.consumeEach {
                val tokenized = tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ")
                send(tokenized + "\n")
            }
        }

        writer.consumeEach {
            output.appendText(it)
        }
now I tried that, I runBlock on the send (because if capacity is 1024, it should really block only if it is full right?
s
runBlocking shouldnt be used here
b
the problem I had with using async, is that I would get billions of asyncs (at least that's what memory usage looked like)
s
maybe try: (untested)
Copy code
val that = this
input.forEachLine { that.send(it) }
b
if I use async, my problem is that I cannot close the stream either
if I do what you wrote
it says that suspension functions can only be used within coroutine body
(which is the reason why I found that runBlocking was working
s
offer?
input.forEachLine { offer(it) }
b
but then I lose the lines that weren't added…
so if I start buffering (which is likely as processing is much slower than file reading), then I loose lines randomly
s
you could check the result of offer() and loop until true with a delay/sleep ?
b
what is wrong with using runBlocking?
s
I was under the impression there should only one runblocking as it turns current thread in an event loop, I'm probably out of my depth here
from docs
Runs new coroutine and blocks current thread interruptibly until its completion. This function should not be used from coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests. The default CoroutineDispatcher for this builder in an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.
b
I see…
so it works, but only by luck…
s
Let us know what you find. I would figure since the library is not using coroutines and you're running in the dispatcher.IO pool it would be acceptable to Thread.sleep(10) until you are able to reoffer to forEachLine
but that's only a guess
b
I have to admit I am a bit lost with all of that… I wanted to do a [Read Lines]->[Process lines in parallel]->[Write processed lines]
I can make it work with map, except that then it is not parallel for the lines processing…
s
produce gives you a channel
pass the channel to consumers
repeat(8) { for(x in channel) { consume(x) } }
8 consumers
b
does that stop only when channel is closed?
s
Copy code
runBlocking {
            val channel = produce<Double>(capacity = 1024) {
                while(true) {
                    send(Math.random())
                }
                    
            }
            
            repeat(8) {
                launch {
                    for (item in channel) {
                        println(item)
                    }
                }
            }
        }
yes
Copy code
runBlocking {
            val channel = produce<Double>(capacity = 1024) {
                while(true) {
                    send(Math.random())
                }

            }

            repeat(8) {
                launch {
                    for (item in channel) {
                        println(item)
                    }
                }
            }

            delay(500)
            channel.cancel()
        }
a better example
Copy code
runBlocking {
            val channel = produce<Double>(capacity = 1024) {

                var x = 0
                while(true) {
                    send(Math.random())
                    x++
                    if(x > 100) {
                        break //closing
                    }
                }

            }

            repeat(8) {
                launch {
                    for (item in channel) {
                        println(item)
                    }
                }
            }

        }
        println("all done")
I think I recall an example like this in

https://www.youtube.com/watch?v=a3agLJQ6vt8

b
If I do that it just get stuck in the repeat
ooh I see why
because I was writting to a writter channel
s
how do you mean?
b
I was writing to a writechannel but not reading from it
Copy code
val reader = produce<String>(capacity = 1024) {
            input.useLines {
                val iterat = it.iterator()
                while (iterat.hasNext()) {
                    send(iterat.iterator().next())
                }
            }
            println("Can close now")
            this.close()
        }

        val writer = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            writerChannel.consumeEach {
                output.appendText(it)
            }
        }

        repeat(8) {
            println("Analyzer number $it")
            for (x in reader) {
                val tokenized = tokenizerFactory.create(pp.preProcess(x)).tokens.joinToString(" ")
                writerChannel.send(tokenized + "\n")
            }
        }

        writerChannel.close()
        writer.join()
that's what I have so far
but it prints:
Copy code
Analyzer number 0 (spent all the processing time here)
Can close now
Analyzer number 1 (go really quickly across all of the following)
Analyzer number 2
Analyzer number 3
Analyzer number 4
Analyzer number 5
Analyzer number 6
Analyzer number 7
I tried to put repeat(8) { launch { … }} but doesn't seem to improve
g
I believe for this use case channel just adds unnecessary overhead, because this operation is completely blocking by nature, so I would rewrite it in a sequential way, I’m not sure what you want to achive and what you need this channel, so I will show 2 examples
👍 2
b
[Reading Lines]->[n threads each processing a line]->[Writing Lines]
g
Copy code
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    output.bufferedWriter().use { out ->
        input.forEachLine { line ->
            val tokenized = tokenizerFactory.create(pp.preProcess(line)).tokens.joinToString(" ")
            out.write(tokenized + "\n")
        }
    }
}
also it significantly more efficient, because
appendText
opens file and close it on each operation, which cause huge overheaader
b
With the test file I go (with my solution) from 24s to 9s (on a 2 CPU machine)
so yes that seem to help
but then on the full file on the production machine, I end up with a single cpu used
the full run would take hours, but it seems to run as fast as what I had initially (really close to your solution except using appendText)
And I'm back to my initial idea to be able to use the 12 threads of that machine (24 w/ HT but it is disabled) to do the tokenizer part, which is what takes the most time in that whole thing
g
I see what you mean, it’s more efficient, but not parallel. If your processing is really heavy you can use async solution that read files line by line and multiple workers process it and than write, but it would make sense for really heavy processing
b
well I'm processing text files that are >20G
g
I understand, but I mean “processing” of single line
otherwise cost of multithread, atomics in channels may be higher or close to coast of your processing
and you will end up with solution that utilise all cores but will be not efficient and maybe not significantly faster or slower
b
yep that's exactly what happened to me 😄
g
and it make sense
b
so maybe I should do processing by blocks
take x thousands of lines
and have workers doing that
g
yeah, if you have a lot of files it’s not a problem, each core will just work on each file
for cases when you have one huge file maybe split it up to multiple would be a good solution
it’s actually usual problem of paralleling of any work, that parallel processing is efficient only in some usecases, you can find many talks in Google about this related to ForkJoinPool and parallel streams from Java 8, but general approach also applied to coroutines
One more addition to my code above, probably would be better to support coroutine cancellation for this code, so you can rewrite it like this:
Copy code
output.bufferedWriter().use { out ->
        input.bufferedReader().use { inp ->
            while (isActive) {
                val line = inp.readLine() ?: break
                out.write(line + "\n")
            }
        }
    }
}
take x thousands of lines
Or even more, depends on line length, because you pay significant price for IO operations, especially for opening/closing
b
Or using parallel streams:
Copy code
output.bufferedWriter().use { out ->
            input.bufferedReader().lines().parallel().forEach {
             out.write(tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ") + "\n")
            }
        }
This is by far the fastest option at least 10-15 times faster than anything else I tried. It uses more memory though… But I can spare some for that
g
did you compare it with single-thread version that just split file?
also, are you sure that order of lines is not important for your case? because forEach returns events not in the same order
Anyway, tho parallel streams are very optimized for paralleling some job I’m really not sure that it’s make sense for this use case, only if tokenizer is really heavy
but it’s relatively easy way to parallel it, but maybe split file and do processing on each of them may be even more efficient, you do not pay price of context switch
also you can do something similar with coroutines, but it would be probably less efficient, just becasuse parallel streams optimized for such tasks
b
order of line is not important no
tokenizer is pretty heavy
alsparallel stream have an issue with batch size that increase over time
works well
but yes splitting file once it is generated may be an option in the future
(the input file)
m
I had to solve something like this recently. I ended up writing https://bitbucket.org/marshallpierce/task-throttle/src/master/ to allow me to have bounded concurrency so I could have some i/o work happening before the CPU was ready for it so the CPU was never idle.
I can go into more detail if you're still looking for further optimization
b
neat! thanks. That's something I tried to do, but I made it slower than anything else