https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Ali Albaali

01/11/2021, 10:53 AM
Hi, I'm trying to get progress when loading data in my application and I've come to two solutions One of them uses flow:
Copy code
suspend fun flowProgress(block: (Flow<Int>) -> Unit) {
    block(
        flow {
            do {
                val progress: Int // .... getting progress
                emit(progress)
            } while (true)
        }
    )
}
The other uses plain callback function:
Copy code
suspend fun intProgress(block: (Int) -> Unit) {
    do {
        val progress: Int // .... getting progress
        block(progress)
    } while (true)
}
Can somebody tell me what's the difference between these two and why should I go with one over the other?
g

gildor

01/11/2021, 10:54 AM
first one is lazy
but for this example no big difference Though, it’s very strange to have this flow as function callback, it wouldn’t be very convinient to use it from non-suspend lambda (you have to launch coroutine)
but if you also want to return result from this operation, both become quite questionable
a

Ali Albaali

01/11/2021, 10:57 AM
Thanks for your answer. They both gets called in a suspending function
What do you suggest then?
g

gildor

01/11/2021, 11:02 AM
They both gets called in a suspending function
Nope, because
block: (Flow<Int>)
is non-suspend lambda, so you cannot do this:
Copy code
flowProgress { it.collect { publishProgress() } }
I really don’t have perfect solution for this
a

Ali Albaali

01/11/2021, 11:04 AM
Copy code
fun flowProgress(block: suspend (Flow<Int>) -> Unit)
what if it's like that?
g

gildor

01/11/2021, 11:04 AM
Then it will block your progress
it’s definitely not what you want to do
because your flowProgress should be alos suspend
a

Ali Albaali

01/11/2021, 11:06 AM
oh sorry, intProgress and flowProgress both should be suspending functions.
g

gildor

01/11/2021, 11:06 AM
anyway, usually I use one of 2 solutions: 1. Just use your second (intProgress) solution, it’s simple enough for consumer to use 2. Convert whole function to flow builder, so to launch it you have to subscribe on returned Flow, which emits sealed class of Progress|Success|error
a

Ali Albaali

01/11/2021, 11:07 AM
the second point is what I'm trying to achieve by calling flowProgress
g

gildor

01/11/2021, 11:07 AM
if you just call suspend function, it will block until progress is complete, I really not sure that it will work (essentially your function will wait until progress block is complete)
But then it should return this flow as function result
no need to use lambda, it doesn’t improve API, instead makes it harder to use
Copy code
fun flowProgress(): Flow<Int> {
        return flow {
            do {
                val progress: Int // .... getting progress
                emit(progress)
            } while (true)
        }
}
Also function is not suspend anymore, because it’s not required
a

Ali Albaali

01/11/2021, 11:11 AM
What if it's implemented in an interface like this or it doesn't matter?
Copy code
interface Request{
        suspend fun progress(flow: Flow<Int>)
        suspend fun intProgress(value: Int)
}
g

gildor

01/11/2021, 11:11 AM
the problem (and also a good thing at the same time) with such API, is that consumer shouldn’t forget to subscribe on it., which is not a problem if flowProgress also returns some result (let’s say it computes something), but if flow used only for progress it’s extremely easy to forget about it. Let’s say you have function
updateAllData()
so usage will be:
Copy code
updateAllData()
but if it returns Flow, it will do nothing, user should do:
Copy code
updateAllData().collect()
What if it’s implemented in an interface like this or it doesn’t matter
What is the point of this interface?
I don’t think it’s needed
a

Ali Albaali

01/11/2021, 11:13 AM
in my case it's needed to support configuration for the request like this
Copy code
override suspend fun fetch(data: Url, config: HttpRequestConfig): Result<ByteReadChannel> = runCatching {
        with(config) {
            client.get(data) {
                val statement = HttpStatement(this, client)
                statement.execute { response ->
                    val channel = response.receive<ByteReadChannel>()
                    val contentLength = response.contentLength()

                    requireNotNull(contentLength) { "Content-Length header needs to be set by the server." }

                    progress(
                        flow {
                            var total = 0

                            var readBytes: Int

                            val buffer = ByteArray(contentLength.toInt())

                            do {
                                readBytes = channel.readAvailable(buffer, total, 4096)
                                total += readBytes
                                emit(total)
//                                intProgress(total)
                            } while (readBytes > 0)

                        }
                    )

                }

                builder()
            }
        }
    }

    interface HttpRequestConfig : Fetcher.Config {
        fun HttpRequestBuilder.builder()

        suspend fun progress(flow: Flow<Int>)
        suspend fun intProgress(value: Int)
    }
g

gildor

01/11/2021, 11:13 AM
also what this Request represents is not clear for me
Keep in mind, this code is blocking
a

Ali Albaali

01/11/2021, 11:14 AM
yeah I know that
g

gildor

01/11/2021, 11:14 AM
It’s not how Flow or suspend function should work, it never should block
a

Ali Albaali

01/11/2021, 11:14 AM
well the fetch function is blocking already
g

gildor

01/11/2021, 11:15 AM
what do you mean?
a

Ali Albaali

01/11/2021, 11:15 AM
It's perform network requests
g

gildor

01/11/2021, 11:15 AM
I don’t see how you return result
It’s perform network requests
It’s not necessary true, it depends on HTTP client implementation
and even if so, you should wrap it to IO dispatcher, though for HTTP better to use just non-blocking API
a

Ali Albaali

01/11/2021, 11:16 AM
Of course
at the end this all should be asynchronous
g

gildor

01/11/2021, 11:17 AM
what client do you use?
a

Ali Albaali

01/11/2021, 11:17 AM
Ktor
g

gildor

01/11/2021, 11:17 AM
ahh, it’s already non-blockling, that you are fine
a

Ali Albaali

01/11/2021, 11:18 AM
yeah I'm trying to provide progress to the consumer while requesting the data
g

gildor

01/11/2021, 11:18 AM
though, I’m not sure what you try to achieve with this code honestly, you return ByteReadChannel
so both my suggestions applied, wrap it to flow (but it will be very different API) or just provide lamba argument (standard or suspend, depending on case, but careful, suspend may slowdown your reading if client does heavy operations)
flow solution is much more flexible though
a

Ali Albaali

01/11/2021, 11:21 AM
Ok thank you so much for your help! I guess I'll go with the flow.
g

gildor

01/11/2021, 11:56 AM
But not your original flow solution with lambda, It flexible only when you return lazy flow which starts processing only on subscribe and returns sealed class for progress/result
Also it would require to use completely different approach comparing to your implementation above, which is suspend function which returns Result<T>
a

Ali Albaali

01/11/2021, 11:57 AM
When implementing the Config interface the flow doesn't get processed
until it's collected
g

gildor

01/11/2021, 11:58 AM
Also in most of cases there is shouldn't be a suspend functions which return Flow, though, there are cases which can be encoded with such api, I don't think it's a good solution and causes a lot of confusion for user
Not sure about what kind config interface you are talking though, there are many variants above
a

Ali Albaali

01/11/2021, 11:59 AM
Copy code
HttpRequestConfig
g

gildor

01/11/2021, 11:59 AM
In general it's completely right, Flow is not started until it's collected
I don't know what is HttpRequestConfig
a

Ali Albaali

01/11/2021, 12:00 PM
Copy code
interface HttpRequestConfig {
        fun HttpRequestBuilder.builder()
        suspend fun progress(flow: Flow<Int>)
    }
g

gildor

01/11/2021, 12:01 PM
This interface looks super strange for me, and I didn't suggest to use it
a

Ali Albaali

01/11/2021, 12:01 PM
I know
g

gildor

01/11/2021, 12:01 PM
There are many super strange things there
a

Ali Albaali

01/11/2021, 12:02 PM
This is part of my API Implementation
Don't worry about them.
g

gildor

01/11/2021, 12:03 PM
I don't know, I would use this solution anyway
a

Ali Albaali

01/11/2021, 4:35 PM
Copy code
public fun fetch(data: Url): Flow<Resource<ByteReadChannel>> = flow {

    try {

        val requestBuilder = HttpRequestBuilder().apply {
            url(data)
        }

        HttpStatement(requestBuilder, client).execute { response ->

            val channel = response.receive<ByteReadChannel>()

            val contentLength = response.contentLength()?.toInt()

            requireNotNull(contentLength) { "Content-Length header needs to be set by the server." }

            var totalBytes = 0

            var readBytes: Int

            val buffer = ByteArray(contentLength)

            do {

                readBytes = channel.readAvailable(buffer, totalBytes, 4096)

                totalBytes += readBytes

                emit(Resource.Loading(totalBytes.toFloat().calculatePercentage(contentLength)))

            } while (readBytes > 0)

            emit(Resource.Success(channel))
        }

    } catch (exception: Throwable) {
        emit(Resource.Failure(exception))
    }
}
This is what you were saying correct? You're right the code earlier looked weird. The thing is at the beginning it didn't support Loading progress, so I only implemented it with Result. However when adding the progress the whole implementation changes as you said.
4 Views