Don Mitchell
06/12/2025, 8:32 PMFlow invariant is violated trying to flow lines from aws kotlin s3 sdk getObject. I've tried nesting the flow at all different levels and get stream closed if it's nested (altho getting rid of the use and any other auto close may fix those as I suspect any outside the flow go through their finally blocks when we're returning the flow)
fun <T : Any> S3Client.flowObject(
bucketName: String,
key: String,
deserializer: (String) -> T
): Flow<T> {
val request = GetObjectRequest {
this.bucket = bucketName
this.key = key
}
return flow {
this@flowObject.getObject(request) { response ->
checkNotNull(response.body, { "Failed to get object from S3: $bucketName/$key" }).let { inputStream ->
inputStream.toInputStream().bufferedReader(Charsets.UTF_8).useLines { lines ->
lines.forEach { emit(deserializer(it)) }
}
}
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
}
any obvious fixes or better explanations?Youssef Shoaib [MOD]
06/12/2025, 8:34 PMgetObject runs in a different coroutine. Simplest solution is to use a channelFlow and use send instead of emitDon Mitchell
06/12/2025, 8:35 PMAlexandru Caraus
06/16/2025, 2:54 PMAlexandru Caraus
06/16/2025, 2:55 PMYoussef Shoaib [MOD]
06/16/2025, 3:42 PMgetObject is written in callback-style.
What you can do is first convert getObject to a suspend function using suspendCoroutine, then use it normallyAlexandru Caraus
06/18/2025, 3:02 PM