Hi, I'm trying to get progress when loading data i...
# coroutines
a
Hi, I'm trying to get progress when loading data in my application and I've come to two solutions One of them uses flow:
Copy code
suspend fun flowProgress(block: (Flow<Int>) -> Unit) {
    block(
        flow {
            do {
                val progress: Int // .... getting progress
                emit(progress)
            } while (true)
        }
    )
}
The other uses plain callback function:
Copy code
suspend fun intProgress(block: (Int) -> Unit) {
    do {
        val progress: Int // .... getting progress
        block(progress)
    } while (true)
}
Can somebody tell me what's the difference between these two and why should I go with one over the other?
g
first one is lazy
but for this example no big difference Though, it’s very strange to have this flow as function callback, it wouldn’t be very convinient to use it from non-suspend lambda (you have to launch coroutine)
but if you also want to return result from this operation, both become quite questionable
a
Thanks for your answer. They both gets called in a suspending function
What do you suggest then?
g
They both gets called in a suspending function
Nope, because
block: (Flow<Int>)
is non-suspend lambda, so you cannot do this:
Copy code
flowProgress { it.collect { publishProgress() } }
I really don’t have perfect solution for this
a
Copy code
fun flowProgress(block: suspend (Flow<Int>) -> Unit)
what if it's like that?
g
Then it will block your progress
it’s definitely not what you want to do
because your flowProgress should be alos suspend
a
oh sorry, intProgress and flowProgress both should be suspending functions.
g
anyway, usually I use one of 2 solutions: 1. Just use your second (intProgress) solution, it’s simple enough for consumer to use 2. Convert whole function to flow builder, so to launch it you have to subscribe on returned Flow, which emits sealed class of Progress|Success|error
a
the second point is what I'm trying to achieve by calling flowProgress
g
if you just call suspend function, it will block until progress is complete, I really not sure that it will work (essentially your function will wait until progress block is complete)
But then it should return this flow as function result
no need to use lambda, it doesn’t improve API, instead makes it harder to use
Copy code
fun flowProgress(): Flow<Int> {
        return flow {
            do {
                val progress: Int // .... getting progress
                emit(progress)
            } while (true)
        }
}
Also function is not suspend anymore, because it’s not required
a
What if it's implemented in an interface like this or it doesn't matter?
Copy code
interface Request{
        suspend fun progress(flow: Flow<Int>)
        suspend fun intProgress(value: Int)
}
g
the problem (and also a good thing at the same time) with such API, is that consumer shouldn’t forget to subscribe on it., which is not a problem if flowProgress also returns some result (let’s say it computes something), but if flow used only for progress it’s extremely easy to forget about it. Let’s say you have function
updateAllData()
so usage will be:
Copy code
updateAllData()
but if it returns Flow, it will do nothing, user should do:
Copy code
updateAllData().collect()
What if it’s implemented in an interface like this or it doesn’t matter
What is the point of this interface?
I don’t think it’s needed
a
in my case it's needed to support configuration for the request like this
Copy code
override suspend fun fetch(data: Url, config: HttpRequestConfig): Result<ByteReadChannel> = runCatching {
        with(config) {
            client.get(data) {
                val statement = HttpStatement(this, client)
                statement.execute { response ->
                    val channel = response.receive<ByteReadChannel>()
                    val contentLength = response.contentLength()

                    requireNotNull(contentLength) { "Content-Length header needs to be set by the server." }

                    progress(
                        flow {
                            var total = 0

                            var readBytes: Int

                            val buffer = ByteArray(contentLength.toInt())

                            do {
                                readBytes = channel.readAvailable(buffer, total, 4096)
                                total += readBytes
                                emit(total)
//                                intProgress(total)
                            } while (readBytes > 0)

                        }
                    )

                }

                builder()
            }
        }
    }

    interface HttpRequestConfig : Fetcher.Config {
        fun HttpRequestBuilder.builder()

        suspend fun progress(flow: Flow<Int>)
        suspend fun intProgress(value: Int)
    }
g
also what this Request represents is not clear for me
Keep in mind, this code is blocking
a
yeah I know that
g
It’s not how Flow or suspend function should work, it never should block
a
well the fetch function is blocking already
g
what do you mean?
a
It's perform network requests
g
I don’t see how you return result
It’s perform network requests
It’s not necessary true, it depends on HTTP client implementation
and even if so, you should wrap it to IO dispatcher, though for HTTP better to use just non-blocking API
a
Of course
at the end this all should be asynchronous
g
what client do you use?
a
Ktor
g
ahh, it’s already non-blockling, that you are fine
a
yeah I'm trying to provide progress to the consumer while requesting the data
g
though, I’m not sure what you try to achieve with this code honestly, you return ByteReadChannel
so both my suggestions applied, wrap it to flow (but it will be very different API) or just provide lamba argument (standard or suspend, depending on case, but careful, suspend may slowdown your reading if client does heavy operations)
flow solution is much more flexible though
a
Ok thank you so much for your help! I guess I'll go with the flow.
g
But not your original flow solution with lambda, It flexible only when you return lazy flow which starts processing only on subscribe and returns sealed class for progress/result
Also it would require to use completely different approach comparing to your implementation above, which is suspend function which returns Result<T>
a
When implementing the Config interface the flow doesn't get processed
until it's collected
g
Also in most of cases there is shouldn't be a suspend functions which return Flow, though, there are cases which can be encoded with such api, I don't think it's a good solution and causes a lot of confusion for user
Not sure about what kind config interface you are talking though, there are many variants above
a
Copy code
HttpRequestConfig
g
In general it's completely right, Flow is not started until it's collected
I don't know what is HttpRequestConfig
a
Copy code
interface HttpRequestConfig {
        fun HttpRequestBuilder.builder()
        suspend fun progress(flow: Flow<Int>)
    }
g
This interface looks super strange for me, and I didn't suggest to use it
a
I know
g
There are many super strange things there
a
This is part of my API Implementation
Don't worry about them.
g
I don't know, I would use this solution anyway
a
Copy code
public fun fetch(data: Url): Flow<Resource<ByteReadChannel>> = flow {

    try {

        val requestBuilder = HttpRequestBuilder().apply {
            url(data)
        }

        HttpStatement(requestBuilder, client).execute { response ->

            val channel = response.receive<ByteReadChannel>()

            val contentLength = response.contentLength()?.toInt()

            requireNotNull(contentLength) { "Content-Length header needs to be set by the server." }

            var totalBytes = 0

            var readBytes: Int

            val buffer = ByteArray(contentLength)

            do {

                readBytes = channel.readAvailable(buffer, totalBytes, 4096)

                totalBytes += readBytes

                emit(Resource.Loading(totalBytes.toFloat().calculatePercentage(contentLength)))

            } while (readBytes > 0)

            emit(Resource.Success(channel))
        }

    } catch (exception: Throwable) {
        emit(Resource.Failure(exception))
    }
}
This is what you were saying correct? You're right the code earlier looked weird. The thing is at the beginning it didn't support Loading progress, so I only implemented it with Result. However when adding the progress the whole implementation changes as you said.