I’m struggling with `flowOn`. I’m working up again...
# coroutines
r
I’m struggling with
flowOn
. I’m working up against an API, so I’m not ruling that out, but the basic premise is
Copy code
fun <T> CoroutineScope.someCall(): Flow<T?> {
    val api = getThreadLockedApi()
    api.apiCall()
      .toFlow()
      .onComplete { api.threadLockedCleanUp() }
      .flowOn(coroutineContext)
      .map {}
}
My consumer looks something like
Copy code
object Consumer {
  private val coroutineScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob())

  val someCallFlow: Flow<MyType>
      get() = coroutineScope.someCall()
}
I’m getting an exception of
Flow context cannot contain job in it
. I get the sense that the argument for
flowOn
must be a Dispatcher -only-. But I’m having a hard time isolating or reproducing the issue. Regardless of the core issue, the thing I’m trying to accomplish is ensure that the
onComplete
block is run in the same thread as the
someCall()
local block. Am I using the correct technique, or is there something I’m missing?
e
even with the same Dispatcher you aren't guaranteed to run on the same Thread in general
n
If you are dealing with thread affinity, then I suggest you use a single threaded dispatcher (consider
Executors.newSingleThreadExecutor().asCoroutineDispatcher()
) and wrap the calls that need to run on that thread in
withContext(mySingleThreadDispatcher)
. Also, getting the flow and collecting the items from the flow are two different things.
flowOn
affects the context in which the upstream flow is collected, it has no affect on the code you call in order to get a reference to the flow (it doesn't affect the thread that calls
api.apiCall()
).
e
unclear if this is the desired behavior, but
Copy code
flow {
    val api = getThreadLockedApi()
    try {
        emitAll(api.apiCall().toFlow())
    } finally {
        api.threadLockedCleanUp()
    }
}
    .flowOn(mySingleThreadDispatcher)
will run the suspend block on a given dispatcher, every time the resulting flow is collected
r
Interesting. I’ll give that a shot. The flow generated by the api is hot, so the cancellation would come from cancelling the scope of the downstream subscription. How does that affect the nested flow?
I stumbled on something relatively close to that example, but I’m going to try to isolate the behavior the way you did @ephemient. Thanks for the help.
@Nick Allen Thank you! I may have to resort to a single-threaded dispatcher that way. It’s a bit of a tricky situation since the API developers didn’t manage the instancing/closing of the resource in a flow situation, so I have to do something of a dance to work around the limitation.
e
emitAll
is a suspend function; cancelling a collector will cancel the suspend block which will fall through to the
finally
💯 1
r
I thought that was the case, thank you @ephemient!