Luis Munoz
05/29/2020, 7:18 AMSpent the last 2 hours trying to figure out why this won't work (won't print anything) when I have newFixedThreadPoolContext(1) but works if I have 2 or more threads. Can someone explain why that is. Maybe I am not using callbackFlow correctly?
@Test
fun `read range`() {
val ctx = newFixedThreadPoolContext( 1, "tThread") /// test only works if 2 or more
val scope = CoroutineScope(ctx)
runBlocking {
repeat(500) {
repo.save("test$it")
}
scope.launch {
repo.readRange().collect {
println(Belief.fromBytes(it))
}
}.join()
}
}
fun readRange(): Flow<ByteArray> {
return callbackFlow {
database.readAsync { tr ->
// reference: <https://apple.github.io/foundationdb/javadoc/com/apple/foundationdb/async/AsyncIterable.html>
// will interate all values in the database
val itr = tr.getRange(path.range()).iterator()
// hasNext() is blocking (will do network call and bring back a few items)
while (itr.hasNext()) {
sendBlocking(itr.next().value)
}
channel.close()
CompletableFuture.completedFuture(Unit)
}
awaitClose { }
} // end of callbackFlow
}
gildor
05/29/2020, 7:50 AMgildor
05/29/2020, 7:51 AMrepo.readRange().onNext { println(Belief.fromBytes(it)) }.collect()
gildor
05/29/2020, 7:54 AMgildor
05/29/2020, 7:54 AMgildor
05/29/2020, 7:56 AMgildor
05/29/2020, 7:57 AMcallbackFlow { }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
You also can use your newFixedThreadPoolContext(1), but you cannot reuse it for anything else, I would just use IO dispatcher insteadgildor
05/29/2020, 8:11 AMgildor
05/29/2020, 8:16 AMLuis Munoz
05/29/2020, 4:43 PMLuis Munoz
05/29/2020, 4:45 PMLuis Munoz
05/29/2020, 4:46 PMgildor
05/29/2020, 5:42 PMgildor
05/29/2020, 5:44 PMgildor
05/29/2020, 5:47 PM