Espen
05/06/2024, 7:01 AMfun put(stream: InputStream): Flow<Progress>
I have a working test-case, but the API is not what I need/want. See thread 👇
The gist of my problem seems to be the wiring of the asynchronous execution correctly between the upload IO coroutine/thread and the Flow collector..
If you have some time to help me out, I'd be super thankful, I spent my whole Sunday and (much more) on this. 🙇Espen
05/06/2024, 7:02 AMEspen
05/06/2024, 7:06 AMInputStream
object and emitting from a MutableSharedFlow<Long>
CLOVIS
05/06/2024, 9:11 AMdata class ByteProgress(
val bytesRead: Long,
) : Progress.Loading.Quantified {
override val normalized: Double
get() = bytesRead.toDouble() / totalSize
}
In your upload loop, report your progress to the caller:
suspend fun upload(input: InputStream, output: OutputStream): … {
stream.forEachIndexed { byteIndex, it ->
report(ByteProgress(byteIndex)) // ←
output.write(it)
}
}
And finally, the caller can extract progress events:
fun put(stream: InputStream): Flow<Progress> = flow {
val reporter = StateFlowProgressReporter()
withContext(reporter.asCoroutineContext()) { // intercepts calls to 'report'
upload(stream, …)
}
emitAll(reporter.progress)
}
Espen
05/06/2024, 9:14 AMEspen
05/06/2024, 11:24 AMoverride suspend fun put(stream: InputStream, ...): Flow<Progress> = channelFlow {
val progressTracker = ProgressTracker(size)
launch {
progressTracker.observe().collect { send(it) }
}
doPut(ProgressStream.wrap(stream, progressTracker), name, contentType, size)
}.transformWhile {
emit(it)
it.percentage < 1F
}
Not super happy with it. Any hints on improving it would still be much appreciated 🙂Espen
05/06/2024, 11:31 AMDaniel Pitts
05/08/2024, 2:58 PMfun put(stream: InputStream, progress: (bytesRead: Long)->Unit)