rook
11/01/2021, 5:57 PMflowOn
. I’m working up against an API, so I’m not ruling that out, but the basic premise is
fun <T> CoroutineScope.someCall(): Flow<T?> {
val api = getThreadLockedApi()
api.apiCall()
.toFlow()
.onComplete { api.threadLockedCleanUp() }
.flowOn(coroutineContext)
.map {}
}
My consumer looks something like
object Consumer {
private val coroutineScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob())
val someCallFlow: Flow<MyType>
get() = coroutineScope.someCall()
}
I’m getting an exception of Flow context cannot contain job in it
. I get the sense that the argument for flowOn
must be a Dispatcher -only-. But I’m having a hard time isolating or reproducing the issue. Regardless of the core issue, the thing I’m trying to accomplish is ensure that the onComplete
block is run in the same thread as the someCall()
local block. Am I using the correct technique, or is there something I’m missing?ephemient
11/01/2021, 8:50 PMNick Allen
11/01/2021, 10:53 PMExecutors.newSingleThreadExecutor().asCoroutineDispatcher()
) and wrap the calls that need to run on that thread in withContext(mySingleThreadDispatcher)
.
Also, getting the flow and collecting the items from the flow are two different things. flowOn
affects the context in which the upstream flow is collected, it has no affect on the code you call in order to get a reference to the flow (it doesn't affect the thread that calls api.apiCall()
).ephemient
11/01/2021, 11:05 PMflow {
val api = getThreadLockedApi()
try {
emitAll(api.apiCall().toFlow())
} finally {
api.threadLockedCleanUp()
}
}
.flowOn(mySingleThreadDispatcher)
will run the suspend block on a given dispatcher, every time the resulting flow is collectedrook
11/01/2021, 11:19 PMrook
11/01/2021, 11:21 PMrook
11/01/2021, 11:22 PMephemient
11/01/2021, 11:22 PMemitAll
is a suspend function; cancelling a collector will cancel the suspend block which will fall through to the finally
rook
11/02/2021, 2:27 PM