I'm getting `Flow invariant is violated` trying to...
# coroutines
d
I'm getting
Flow invariant is violated
trying to flow lines from aws kotlin s3 sdk getObject. I've tried nesting the
flow
at all different levels and get stream closed if it's nested (altho getting rid of the
use
and any other auto close may fix those as I suspect any outside the flow go through their
finally
blocks when we're returning the flow)
Copy code
fun <T : Any> S3Client.flowObject(
    bucketName: String,
    key: String,
    deserializer: (String) -> T
): Flow<T> {
    val request = GetObjectRequest {
        this.bucket = bucketName
        this.key = key
    }
    return flow {
        this@flowObject.getObject(request) { response ->
            checkNotNull(response.body, { "Failed to get object from S3: $bucketName/$key" }).let { inputStream ->
                inputStream.toInputStream().bufferedReader(Charsets.UTF_8).useLines { lines ->
                    lines.forEach { emit(deserializer(it)) }
                }
            }
        }
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
}
any obvious fixes or better explanations?
y
I think that means
getObject
runs in a different coroutine. Simplest solution is to use a
channelFlow
and use
send
instead of
emit
1
thank you color 1
d
@Sam’s new book says the same thing (I was looking at that in parallel w submitting this)
very nice 1
a
Or you can flowOf(request).flatMap { getObject(it)}
Writing from mobile, sorry
y
I don't think that would work because
getObject
is written in callback-style. What you can do is first convert
getObject
to a suspend function using
suspendCoroutine
, then use it normally
a
Ah okay, then yep, then yep a callback wraper would be needed