bjonnh
03/11/2019, 4:05 AMproduce<String>(capacity = 1024) {
input.forEachLine { async { send(it) } }
}.map {
async {
val tokenized = tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ")
tokenized + "\n"
}
}.map {
output.appendText(it.await())
}
gildor
03/11/2019, 4:08 AMbjonnh
03/11/2019, 4:18 AMsitepodmatt
03/11/2019, 4:18 AMproduce<String>(capacity = 1024) {
val that = this
listOf("1","2","3").asSequence().forEach { async { that.send(it) } }
}
would a closure work?bjonnh
03/11/2019, 4:18 AMval reader = produce<String>(<http://Dispatchers.IO|Dispatchers.IO>, capacity = 1024) {
input.forEachLine { runBlocking { send(it) } }
this.close()
}
val writer = produce<String>(context, capacity=8) {
reader.consumeEach {
val tokenized = tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ")
send(tokenized + "\n")
}
}
writer.consumeEach {
output.appendText(it)
}
sitepodmatt
03/11/2019, 4:20 AMbjonnh
03/11/2019, 4:20 AMsitepodmatt
03/11/2019, 4:21 AMval that = this
input.forEachLine { that.send(it) }
bjonnh
03/11/2019, 4:22 AMsitepodmatt
03/11/2019, 4:25 AMbjonnh
03/11/2019, 4:26 AMsitepodmatt
03/11/2019, 4:30 AMbjonnh
03/11/2019, 4:30 AMsitepodmatt
03/11/2019, 4:33 AMbjonnh
03/11/2019, 4:34 AMsitepodmatt
03/11/2019, 4:39 AMbjonnh
03/11/2019, 4:45 AMsitepodmatt
03/11/2019, 4:46 AMbjonnh
03/11/2019, 4:47 AMsitepodmatt
03/11/2019, 4:48 AMrunBlocking {
val channel = produce<Double>(capacity = 1024) {
while(true) {
send(Math.random())
}
}
repeat(8) {
launch {
for (item in channel) {
println(item)
}
}
}
}
runBlocking {
val channel = produce<Double>(capacity = 1024) {
while(true) {
send(Math.random())
}
}
repeat(8) {
launch {
for (item in channel) {
println(item)
}
}
}
delay(500)
channel.cancel()
}
runBlocking {
val channel = produce<Double>(capacity = 1024) {
var x = 0
while(true) {
send(Math.random())
x++
if(x > 100) {
break //closing
}
}
}
repeat(8) {
launch {
for (item in channel) {
println(item)
}
}
}
}
println("all done")
bjonnh
03/11/2019, 4:53 AMsitepodmatt
03/11/2019, 4:57 AMbjonnh
03/11/2019, 4:59 AMval reader = produce<String>(capacity = 1024) {
input.useLines {
val iterat = it.iterator()
while (iterat.hasNext()) {
send(iterat.iterator().next())
}
}
println("Can close now")
this.close()
}
val writer = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
writerChannel.consumeEach {
output.appendText(it)
}
}
repeat(8) {
println("Analyzer number $it")
for (x in reader) {
val tokenized = tokenizerFactory.create(pp.preProcess(x)).tokens.joinToString(" ")
writerChannel.send(tokenized + "\n")
}
}
writerChannel.close()
writer.join()
Analyzer number 0 (spent all the processing time here)
Can close now
Analyzer number 1 (go really quickly across all of the following)
Analyzer number 2
Analyzer number 3
Analyzer number 4
Analyzer number 5
Analyzer number 6
Analyzer number 7
gildor
03/11/2019, 5:12 AMbjonnh
03/11/2019, 5:15 AMgildor
03/11/2019, 5:16 AMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) {
output.bufferedWriter().use { out ->
input.forEachLine { line ->
val tokenized = tokenizerFactory.create(pp.preProcess(line)).tokens.joinToString(" ")
out.write(tokenized + "\n")
}
}
}
appendText
opens file and close it on each operation, which cause huge overheaaderbjonnh
03/11/2019, 5:19 AMgildor
03/11/2019, 5:23 AMbjonnh
03/11/2019, 5:23 AMgildor
03/11/2019, 5:24 AMbjonnh
03/11/2019, 5:25 AMgildor
03/11/2019, 5:26 AMbjonnh
03/11/2019, 5:26 AMgildor
03/11/2019, 5:26 AMoutput.bufferedWriter().use { out ->
input.bufferedReader().use { inp ->
while (isActive) {
val line = inp.readLine() ?: break
out.write(line + "\n")
}
}
}
}
take x thousands of linesOr even more, depends on line length, because you pay significant price for IO operations, especially for opening/closing
bjonnh
03/11/2019, 7:41 AMoutput.bufferedWriter().use { out ->
input.bufferedReader().lines().parallel().forEach {
out.write(tokenizerFactory.create(pp.preProcess(it)).tokens.joinToString(" ") + "\n")
}
}
This is by far the fastest option at least 10-15 times faster than anything else I tried. It uses more memory though… But I can spare some for thatgildor
03/11/2019, 7:44 AMbjonnh
03/11/2019, 7:59 AMmp
03/11/2019, 12:47 PMbjonnh
03/11/2019, 3:56 PM