df
11/26/2020, 5:01 PMfun getContent(dropN: Long) = flow {
val counter = AtomicLong(dropN)
val resource = javaClass.getResource("/products.csv").toURI()
File(resource).useLines { lines ->
lines.filter { it.isNotEmpty() }
.drop(dropN.toInt())
.map { it.toLong() }
.chunked(100)
.onEach { counter.addAndGet(it.size.toLong()) }
.map { getContentByProductIds(it) }
.onEach { println("Processed ${counter.get()} lines from input file") }
.forEach { emit(it) }
}
}
The flow is collected by multiple co-routines and I see exactly the same lines printed multiple times.
Processed 100 lines from input file
Processed 100 lines from input file
Given the flow definition I would assume the second onEach should only be invoked when the first onEach was invoked before, which should always increment the counter. Why do I still see the same counter value multiple times?tseisel
11/26/2020, 5:32 PMshareIn
operator)df
11/26/2020, 6:44 PMFile(resource).useLines
(my example given) twice?!Dominaezzz
11/26/2020, 6:47 PMcounter
is being reset for each flow#Adam Powell
11/26/2020, 6:48 PMdf
11/26/2020, 6:51 PMDominaezzz
11/26/2020, 6:57 PMproduceIn(...).receiveAsFlow()
. Recommend looking up docs for these functions.df
11/26/2020, 6:59 PM