Hi all, can anyone help me understand why this ter...
# coroutines
a
Hi all, can anyone help me understand why this terminal collector does not seem to cancel the flow to which it's attached? https://pl.kotl.in/wB5Pt1jsQ
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.Executors


fun main(args: Array<String>) = runBlocking {
    val requests = flow {
        var i = 0
        val serverDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
        val handlerDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
        launch(serverDispatcher) {
            while (true) {
                i += 1
                val result = i
                launch(handlerDispatcher) {
                    delay(500)
                    launch(serverDispatcher) {
                        emit(result)
                    }
                }
            }
        }
    }
    
    // I want this to:
    //
    // 1. Suspend while collecting each emitted value.
    // 2. Continue to collect until seeing a value >= 4
    // 3. At that point continue.
    // 4. Cancel the flow (thereby closing the server's socket).
    //
    // But instead it suspends, the while loop keeps running and indicating it's emitting values, and the onEach print
    // statement never executes.
    requests.takeWhile { it < 4 }.onEach { println("got $it") }.collect()
}
a
You're launching into the outer runBlocking scope, not one scoped to the flow
Most variants on that sort of bug end up tripping a runtime safety check in the flow implementation and throw
e
I'm assuming the flow block completes, resulting in an empty flow, and the executor + running coroutines are no longer referenced, so they're free to get garbage collected before any
emit
(which would trip the runtime safety checks)
a
Yea the runtime safety checks are never hit, and values are actually emitted. I think you're right Adam that I'm launching into the wrong context, but I can't figure out how to launch to the correct one.
a
I'm not sure why this isn't
Copy code
flow {
  var i = 0
  while (true) {
    emit(++i)
  }
}
what are the thread pools intended to do in here? I think the intent got accidentally optimized out in simplifying for the example
a
It's a test fake for a server.
serverDispatcher
is accepting the sockets,
handlerDispatcher
is where the coroutines that read requests and write responses are intended to dispatch. Come to think of it this is all a bit much...
a
If you really want those in the mix here I'd probably try to pull them out of this flow into other dependencies with their own suspend functions that this flow invokes. The core
flow {}
builder goes way out of its way to get you not to break context preservation by emitting from some other dispatcher or concurrent child job, which puts you either into
channelFlow {}
territory or your own reimplementation of it if you start hopping dispatchers
and if this is a test fake it does sound more than a little overcomplicated
a
Ah thank you looks like
channelFlow
would be the thing I'd want.
Assuming I actually want any of this
Which I probably do not hah