Coroutine concepts have been one of the more diffi...
# coroutines
p
Coroutine concepts have been one of the more difficult things to wrap my head around, so bear with me. Are there any good examples of mixing blocking i/o operations in coroutines? I am working with TarArchiveOutputStream from Apache commons-compress and it is not thread safe in the slightest. At the point you want to write something to it, you must create an entry, add it to the stream and close the entry as a single unit, before you can progress to another one of these cycles When I look at the description of
<http://Dispatchers.IO|Dispatchers.IO>
, it does not fit into the use case in the slightest, as you still have a thread pool backing it This is also a point at which coroutines start to confuse me some more. Even if this IO dispatcher was single threaded, any suspending operations that happen can do just that, which could result in improper interleaving of two coroutine instances wanting to create an entry in this TarArchiveOutputStream On the face, it would imply I should use runBlocking to ensure suspending operations don't suspect but get run to completion But that still leaves the fact there can be more than one thread in the IO dispatcher don't this at the same time. That then also implies I could create my own dedicated single thread executor service as a dispatcher Then with any coroutine launched inside of it, use runBlocking here to complete the create entry, write stream close entry elements But then this still tells me I am running afoul of the runBlocking rule that I should not use runBlocking from within a coroutine So overall, are there any good examples of using something like the IO dispatcher for i/o on something that is not thread safe?
j
I think you're mixing up 2 problems here: IO operations and thread-safety/atomicity. The fact that
TarArchiveOutputStream
is not thread safe isn't related to the fact that it's based on blocking IO.
any suspending operations that happen can do just that
Do what? Suspend? Yes, but if you're dealing with a blocking API, there is likely no suspend operation here. But in any case, I think what you're looking for is a coroutine Mutex. You can just do your operations under a mutex associated with your output stream (from any coroutine), and you'll be fine. You can also nest this under a
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
to solve the blocking-IO-in-coroutine problem (but these are really 2 different problems)
Something like this:
Copy code
val tarArchiveStream = TODO("whatever constructor or method gives you the tar archive stream")
val tarStreamMutex = Mutex()

withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    tarStreamMutex.withLock {
        // create an entry, add it to the stream, close the entry
    }
}
If you need this repeatedly throughout your code, you might want to create a wrapper for the
TarArchiveOutputStream
that contains the mutex inside and provides atomic operations as methods
That then also implies I could create my own dedicated single thread executor service as a dispatcher. Then with any coroutine launched inside of it, use runBlocking here to complete the create entry, write stream close entry elements. But then this still tells me I am running afoul of the runBlocking rule that I should not use runBlocking from within a coroutine
Indeed, you should not use
runBlocking
inside a coroutine. It's true that on a single-threaded dispatcher, it would kind of have the effect of locking, but it's not supposed to be used as a synchronization primitive. Mutex is meant for synchronization, so you should rather use this.
e
often Channel or Flow is a better way of ensuring that certain operations are not interleaved
for example, if you launch one coroutine that reads from a channel and performs the tar operations in a loop, then you can have any number of concurrent writers to the channel and it doesn't matter
j
Yes it's an option too, using an actor coroutine. But I guess it's more complicated to abstract that away than abstracting a mutex (because of the scoping of that coroutine)
u
Or simply use a thread.... If you do not want to suspend and do not want any concurrency whatsoever, what do you want to gain from using coroutines?
j
I believe @Phil Richardson wants concurrency (and maybe even parallelism) for the rest of the coroutine's code
e
one thing to note. even though code in one coroutine runs sequentially, it may still switch threads at any suspend point, depending on the dispatcher. if you need to avoid that (doesn't sound like it matters for the
TarArchiveOutputStream
case, but it might for other APIs use their own recursive locks or thread local variables, for example), then you either need to switch to a single-threaded dispatcher or ensure that there are no suspend points within your "critical section" (can be guaranteed by writing it in a non-suspend fun, for example)