Don Mitchell
06/12/2025, 8:32 PMFlow invariant is violated
trying to flow lines from aws kotlin s3 sdk getObject. I've tried nesting the flow
at all different levels and get stream closed if it's nested (altho getting rid of the use
and any other auto close may fix those as I suspect any outside the flow go through their finally
blocks when we're returning the flow)
fun <T : Any> S3Client.flowObject(
bucketName: String,
key: String,
deserializer: (String) -> T
): Flow<T> {
val request = GetObjectRequest {
this.bucket = bucketName
this.key = key
}
return flow {
this@flowObject.getObject(request) { response ->
checkNotNull(response.body, { "Failed to get object from S3: $bucketName/$key" }).let { inputStream ->
inputStream.toInputStream().bufferedReader(Charsets.UTF_8).useLines { lines ->
lines.forEach { emit(deserializer(it)) }
}
}
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
}
any obvious fixes or better explanations?Youssef Shoaib [MOD]
06/12/2025, 8:34 PMgetObject
runs in a different coroutine. Simplest solution is to use a channelFlow
and use send
instead of emit
Don Mitchell
06/12/2025, 8:35 PMAlexandru Caraus
06/16/2025, 2:54 PMAlexandru Caraus
06/16/2025, 2:55 PMYoussef Shoaib [MOD]
06/16/2025, 3:42 PMgetObject
is written in callback-style.
What you can do is first convert getObject
to a suspend function using suspendCoroutine
, then use it normallyAlexandru Caraus
06/18/2025, 3:02 PM