Hi! I've been struggling with a certain problem fo...
# codereview
e
Hi! I've been struggling with a certain problem for a bit too long now, and I can't seem to find a solution. The use case is pretty easy: I want to track progress of a file upload with an API similar to this:
fun put(stream: InputStream): Flow<Progress>
I have a working test-case, but the API is not what I need/want. See thread 👇 The gist of my problem seems to be the wiring of the asynchronous execution correctly between the upload IO coroutine/thread and the Flow collector.. If you have some time to help me out, I'd be super thankful, I spent my whole Sunday and (much more) on this. 🙇
Here's the test case with some comments on how the API should look like. Here's the method invoked.
Here's the progress tracking part. Basically wrapping an
InputStream
object and emitting from a
MutableSharedFlow<Long>
c
Hey! I don't have the time to study your code, but from afar it looks very similar to Pedestal Progress. Declare your custom progress information type:
Copy code
data class ByteProgress(
    val bytesRead: Long,
) : Progress.Loading.Quantified {
    override val normalized: Double
        get() = bytesRead.toDouble() / totalSize
}
In your upload loop, report your progress to the caller:
Copy code
suspend fun upload(input: InputStream, output: OutputStream): … {
    stream.forEachIndexed { byteIndex, it ->
        report(ByteProgress(byteIndex)) // ←
        output.write(it)
    }
}
And finally, the caller can extract progress events:
Copy code
fun put(stream: InputStream): Flow<Progress> = flow {
    val reporter = StateFlowProgressReporter()

    withContext(reporter.asCoroutineContext()) { // intercepts calls to 'report'
        upload(stream, …)
    }

    emitAll(reporter.progress)
}
e
That looks interesting! Will have a look, thanks! 🙇
Ok, I was able to solve it by adding a replay buffer and using a channelFlow.
Copy code
override suspend fun put(stream: InputStream, ...): Flow<Progress> = channelFlow {

    val progressTracker = ProgressTracker(size)
    launch {
        progressTracker.observe().collect { send(it) }
    }
    doPut(ProgressStream.wrap(stream, progressTracker), name, contentType, size)
}.transformWhile {
    emit(it)
    it.percentage < 1F
}
Not super happy with it. Any hints on improving it would still be much appreciated 🙂
Yeah I think it's flaky, depending on the buffer size 😞
d
Would a simple callback lambda be better?
Copy code
fun put(stream: InputStream, progress: (bytesRead: Long)->Unit)