https://kotlinlang.org logo
#coroutines
Title
# coroutines
r

ribesg

03/25/2019, 10:14 AM
How would I implement a sequence doing HTTP request using the Ktor client? I get an error about restricted suspending functions when using
sequence { }
g

gildor

03/25/2019, 10:16 AM
sequence doesn’t support any kind async events, this is syncronous only abstraction
you can use Channels instead
r

ribesg

03/25/2019, 10:16 AM
Oh, ok
g

gildor

03/25/2019, 10:17 AM
In future you also can use cold streams, when they will be released, for now Channels is the only abstraction for data streams compatible with coroutines
r

ribesg

03/25/2019, 10:26 AM
I’m basically implementing pagination, I don’t know much about channels. Is there an example somewhere on how to use channels for that?
g

gildor

03/25/2019, 10:27 AM
briefly: instead of
sequence { }
use
CoroutineScope.produce { }
r

ribesg

03/25/2019, 10:29 AM
Yeah that’s what I get from those docs. So
send
would not be called until there’s a consumer, right?
g

gildor

03/25/2019, 10:29 AM
Depends on channel type
there are different ones, but this is default one, rendezvous channel, and it works as you described
f

Fredrik Larsen

03/25/2019, 12:43 PM
Sorry to highjack the thread, but you mentioned cold streams. Would they work similar to Rx observables?
g

gildor

03/25/2019, 1:34 PM
@Fredrik Larsen Yes, the main idea is the same, but based on coroutines More details: https://github.com/Kotlin/kotlinx.coroutines/issues/254
f

Fredrik Larsen

03/25/2019, 1:34 PM
Interesting. Thank you @gildor
r

ribesg

03/25/2019, 2:15 PM
@gildor when using a producer, it does one more request than needed because it waits for a
receive
when I call
send
, but when I call
send
I already did the next request. Is there a way to start the next request when
receive
is called, and not before, to prevent downloading potentially useless data?
b

bdawg.io

03/25/2019, 3:03 PM
I'm not entirely sure if
onSend
would get you that. @gildor do you happen to know if
onSend
is only a listener event or if it actually puts an item on the queue as well?
Is there a way to start the next request when
receive
is called, and not before
Copy code
fun CoroutineScope.loadPages() = produce<List<MyItem>> {
    var currentPage = 1
    while (true) {
        val page = loadPageNum(currentPage)
        if (page.isEmpty()) {
            break;
        }
        // await until item is received
        send(page)
        currentPage += 1
    }
}
r

ribesg

03/25/2019, 3:12 PM
@bdawg.io this does download the next page before it’s needed. Also `onSend`’s prototype is scary
b

bdawg.io

03/25/2019, 3:13 PM
@ribesg I'm not understanding why. Unless you're specifying a capacity on your producer, it should suspend until someone `receive`s it. That's why it's called a Rendezvous channel
I've never used
onSend
haha so I wasn't sure if it'd actually be useful or not lol
r

ribesg

03/25/2019, 3:16 PM
You’re calling loadPageNum before blocking on send
So the page is loaded before we start waiting for someone to consume the page
I don’t want to download it unless something wants it
b

bdawg.io

03/25/2019, 3:21 PM
That sounds like a cold stream. An alternative is to return a
Lazy<ReceiveChannel<YourType>>
and then have your consumer unwrap the value before consuming
Copy code
val pages by lazy { loadPages() }
pages.consumeEach { ... }
r

ribesg

03/25/2019, 3:25 PM
That doesn’t change anything as we’re still loading one more page than needed.
I totally need a cold stream here it seems, but there’s no such thing right now
b

bdawg.io

03/25/2019, 3:46 PM
Why doesn't it change anything? Only one page is loaded at a time and it doesn't get loaded until first access of the
Lazy
(for above, not until
pages.consumeEach
is called)?
r

ribesg

03/26/2019, 9:11 AM
@bdawg.io I’m not sure how to explain it better, but you always retrieve a page before calling
send
which is the blocking point. So you always have a unused page in memory, which may never be used. Your
lazy
trick only works on the first page.
g

gildor

03/26/2019, 9:11 AM
@ribesg Yes, exactly, this is how hot streams (channels) work and this will be properly solved only by cold streams
r

ribesg

03/26/2019, 9:13 AM
In the meantime, here’s my solution:
Copy code
interface PageIterator<T> {
        suspend fun next(): List<T>
    }
Copy code
object : PageIterator<MyType> {

            private var lastReturnedPage = -1
            private var done = false

            override suspend fun next(): List<MyType> {
                if (done) return emptyList()
                val res = doMyHttpRequest(++lastReturnedPage)
                done = lastReturnedPage == res.nbPages - 1
                return res.results
            }

        }
g

gildor

03/26/2019, 9:13 AM
Sure, this is your own solution for this
Actually this problem is described in first and second comment in Cold Streams issue and one of main use cases
r

ribesg

03/26/2019, 9:15 AM
Yes, I know that cold streams are the solution, but I needed an actual real solution for now, not a future one 🙂
g

gildor

03/26/2019, 9:16 AM
It’s not so a future, planned for April, as I heard But I agree, nothing wrong with own case-specific solution, especially if you don’t integrate with other data streams
r

ribesg

03/26/2019, 9:18 AM
A lot of things I need will be out very soon, that’s unfortunate but I need to progress in my implementation
g

gildor

03/26/2019, 9:21 AM
Again, I think it’s completely fine, some kind high level abstraction for streams required only if you have to manage multiple data streams and it may be much more tricky with ad-hoc solution
r

ribesg

03/26/2019, 9:23 AM
Yeah, in my case I just needed a
suspend
next
function doing an Http request, so I did just that. Hopefully I’ll be able to use existing things in the future.