Hey guys, im looking to migrate an old app from a ...
# coroutines
g
Hey guys, im looking to migrate an old app from a pub-sub system to a flow. I'd be grateful for any thoughts or feedback or suggestions: https://gist.github.com/Groostav/6aa431a28e8ac84e52c504b5004325a2
s
As far as the deadlock goes, I've written about one possible cause in the past: https://medium.com/better-programming/how-i-fell-in-kotlins-runblocking-deadlock-trap-and-how-you-can-avoid-it-db9e7c4909f1. The basic problem is caused by blocking a thread inside an existing coroutine. You've said that you want to apply backpressure to your native code by blocking its thread. In that case, you must make sure that the thread being blocked is not a thread that belongs to a coroutine dispatcher. This probably means making sure you don't call
startLargeWork()
from inside a coroutine. Give it its own thread if necessary.
👍 1
Once you have done that, you should be able to use a channelFlow in the way you described, or just create a channel and have the subscriber consume it as a flow. If you use channelFlow, remember to remove its default buffer so that you get immediate backpressure.
The producer side's SendChannel can use trySendBlocking to block the producer thread. It's essentially a shortcut for
runBlocking { trySend() }
, so same rules apply—never call it from a thread that belongs to a coroutine or coroutine dispatcher.
As far as stacktraces, you might want to look into Kotlin's stacktrace recovery feature. This will automatically reconstitute your asynchronous call traces when you're running in debug/test environments. You mentioned that your producer is running too fast for the consumer, so it sounds like by definition it's running in a separate thread. That means the producer can never truly be "up stack" from the consumer—stack trace recovery is the next best thing.
Finally you mentioned custom (stateful?) flow operators and subclassing. Subclassing a flow is generally a bad idea. The solution for building stateful operators is normally simpler—just use the
flow {…}
builder to wrap a call to
collect {…}
.
Copy code
fun <T> Flow<T>.myOperator() = flow {
  collect { upstream -> 
    emit(doStuffWith(upstream))
  }
}
I'm not sure if this solves the problem you were facing with your
paused
messages, but as a general pattern it's extremely powerful.
👍 1