``` Spent the last 2 hours trying to figure out w...
# coroutines
l
Copy code
Spent the last 2 hours trying to figure out why this won't work (won't print anything) when I have newFixedThreadPoolContext(1) but works if I have 2 or more threads.  Can someone explain why that is. Maybe I am not using callbackFlow correctly?


 
   @Test
    fun `read range`() {
        val ctx = newFixedThreadPoolContext( 1, "tThread") /// test only works if 2 or more
        val scope = CoroutineScope(ctx)
        runBlocking {
            repeat(500) {
                repo.save("test$it")
            }
            scope.launch {
                repo.readRange().collect {
                    println(Belief.fromBytes(it))
                }
            }.join()
        }
    }
    
        fun readRange(): Flow<ByteArray> {
        return callbackFlow {
            database.readAsync { tr ->
               // reference: <https://apple.github.io/foundationdb/javadoc/com/apple/foundationdb/async/AsyncIterable.html>
               // will interate all values in the database
                val itr = tr.getRange(path.range()).iterator()

                // hasNext() is blocking (will do network call and bring back a few items)
                while (itr.hasNext()) { 
                    sendBlocking(itr.next().value)
                }
                channel.close()
                CompletableFuture.completedFuture(Unit)
            }
            awaitClose { }
        } // end of callbackFlow
    }
g
just a question to make you sample more clear Why do you need scope.launch?
You can just do:
Copy code
repo.readRange().onNext { println(Belief.fromBytes(it)) }.collect()
it deadlocks because of sendBlocking
you block your only thread, so it cannot process next event and dispatch coroutine
essentially you problem is that readRange is blocking
you probably can fix it with:
Copy code
callbackFlow { }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
You also can use your newFixedThreadPoolContext(1), but you cannot reuse it for anything else, I would just use IO dispatcher instead
also, you may make it non-blocking, by adding buffer to your flow + buffer(Channel.UNLIMITED) (but it’s not perfectly save of course, it will not provide you any backpressure for your read from db and may crash with OOM)
so maybe using special thread in this case not bad idea
l
don't need scope.launch I added that trying to figure out the problem / experimenting.
callbackflow never yields the thread and there is no way to make it yield, that is the problem, there is no way to suspend it so that it can make progress inside the .collect. That library suppose to be async it has onHasNext() that returns a future to indicate a value is ready (instead of blocking to wait for it). Problem is I can't use that inside callbackflow because I can't suspend. Apparently only way around this is to have 2 threads so it can make progress
will use FlowOn thanks Andrey
g
You can get rid of additional thread, but only using unlimited buffer, which in case of socket may be a problem, I would just use additional thread
Also problem is actually comes from library API itself, if it would provide some back pressure mechanism it would be possible to implement without blocking for hasNext and for sendBlocking
You actually can implement something non-blocking with onHasNext and future, but you need more custom code, I'm thinking about something like an internal channel where you emit this future and await it outside of callback, or by launching a coroutine from callback. But not sure that it would be faster on practice