Hi all. Need some help. What is the best way to re...
# coroutines
i
Hi all. Need some help. What is the best way to return result of Listener callback function as result of my function which create it? Should I use channel to achive this or something like that?
Copy code
fun someFunction() : SomeResult {
    val someObj =  SomeObj(SomeListenr { result ->
              SomeResult(result) // <- WANT THIS
        }
    )
    someObj.start()
}
g
You don’t have return
what exactly do you want to achive? Is it related to coroutines or more general question?
i
I'm think it more general but I try to solve in in courutine context. Yes I don't have return because I don't shure hot to return SomeObject from someFunctuion()
Actualy it look more like this
Copy code
fun someFunction() : SomeResult {
 launch(UI) {
    val someObj =  SomeObj(SomeListenr { result ->
              SomeResult(result) // <- WANT THIS
        }
    )
    someObj.start()
}
return SomeResult from listener
}
i
@uliluckas wow, looking like what I want. Thx! Check it)
u
You still need to figure out how to bridge between the suspending and none suspending world.
I.e. do you want someFunction to wait for the callback?
i
Yes and yes
u
Then you could call the suspending wrapper from runBlocking. But as it says, it will block your thread, waiting for the callback
i
SomeObj(SomeListenr objects from library But operation working really fast so I should have no problems with bloking thread
g
There is also suspendCancellableCoroutine from Kotlinx.Coroutines that allows you to support cancellation (if your callback supports this)
i
@gildor Thanks for the advice, I will know)
Missed that need to collect the result of callback to list. Here is my solution(Partly compleated). Maybe there is some better way to do that.
Copy code
private suspend fun baseFindCamera(): List<SearchSDK.SearchResult> = suspendCoroutine { cont ->
        val list: ArrayList<SearchSDK.SearchResult> = ArrayList()
        lateinit var searchSDK: SearchSDK
        var isResumed = false
        val timer = Timer()
        var task = timerTask {}

        launch(UI) {
            searchSDK = SearchSDK(SearchSDK.ISearchResult {
                result ->
                Log.d(TAG, "searchSDK get result ${result.uid}")
                if(!isResumed) {
                    if (!result.uid.isEmpty())
                        list.add(result)
                    else
                        cont.resumeWithException(NullPointerException())
                    task.cancel()
                    task = timerTask {
                        cont.resume(list)
                        isResumed = true
                        timer.cancel()
                        timer.purge()
                        searchSDK.stop()
                    }
                    timer.schedule(task, 2000)
                }
            })
            searchSDK.search2()
        }
    }

    override fun findCameras(): Either<Failure, List<MyCameraEntity>> {
        val emptyCamera = MyCameraEntity("", "", "", "")
        val list = runBlocking(CommonPool) { baseFindCamera() }
        list.forEach {r ->
            Log.d(TAG, "uid ${r.uid} | ip ${r.ip} | port ${r.port} | name ${r.name} | version ${r.version} ")
        }
        return Either.Right(listOf(emptyCamera))
    }
u
This is hard to read inside the thread. Could you provide a gist or slack-snippet? You could even post to https://try.kotlinlang.org/
Without understanding what you are doing yet, I would think that a
launch
inside
suspendCoroutine
is probably not what you want. Do one thing at a time. Wrap your callback into a coroutine. Then take that coroutine and do other stuff ... schedule to UI thread, stop the sdk, ...
g
Agree with Uli, in general your snippet doesn’t look correct, all thos runBlocking, launch(UI) etc.
u
something like this should be your starting point. Then you do your loop and list building, using this suspending
search
function. Your timer task probably reduces to a simple delay:
Copy code
private suspend fun search(): SearchSDK.SearchResult = suspendCoroutine { cont ->
        searchSDK = SearchSDK(SearchSDK.ISearchResult { result ->
            if (!result.uid.isEmpty())
                cont.resumeWith(result)
            else
                cont.resumeWithException(NullPointerException())
        } 
        searchSDK.search2()
    }
}
Please ignore my last post. Just looked at your code again on a big screen. So you are getting continuous callbacks form your search which you want to collect until there is silence for 2 seconds, right?
multiple values can be represented by channels in the coroutine world. You could try something like this:
Copy code
private suspend fun findMyCameras(): Channel<SearchSDK.SearchResult> {
    val channel = Channel<SearchSDK.SearchResult>()
    val searchSDK = SearchSDK(SearchSDK.ISearchResult { result ->
        Log.d(TAG, "searchSDK get result ${result.uid}")
        if (!result.uid.isEmpty())
            channel.sendBlocking(result)
        else
            channel.close(NullPointerException())
    })
    launch(UI) {
        searchSDK.search2()
    }
    channel.invokeOnClose {
        searchSDK.stop()
    }
    return channel
}

fun findCameras(): Either<Authenticator.Failure, List<MyCameraEntity>> {
    val list: ArrayList<SearchSDK.SearchResult> = ArrayList()
    val emptyCamera = MyCameraEntity("", "", "", "")
    runBlocking(CommonPool) {
        findMyCameras().use { cameraChannel ->
            while (true) {
                val nextCamera = withTimeoutOrNull(2, TimeUnit.SECONDS) {
                    cameraChannel.receiveOrNull()x 
                }
                if (nextCamera == null) break
                list.add(result)
            }
        }
    }
    list.forEach { r ->c
        Log.d(TAG, "uid ${r.uid} | ip ${r.ip} | port ${r.port} | name ${r.name} | version ${r.version} ")
    }
    return Either.Right(listOf(emptyCamera))
}
Not sure if you need to catch the potential exception from
sendBlocking
i
@uli Thanks for your help!) I'll use implementation you suggest.
u
be aware though, it is not tested at all. Especially installing
invokeOnClose
was a wild guess. Maybe `searchSDK.stop()`needs to be called from a try catch around
channel.sendBlocking(result)
if you get it to work, please let us know 🙂
i
Yes It works! But I'm replace findMyCameras().use with .consume because use marked as deprecated. invokeOnClose - seems also working but need more tests.)
@gildor @uli Working version for me looking like that https://gist.github.com/ioKun/115bb4799e0bcbab429aacb13fd8c738 Thx for help again! First time using Gist.
g
I think you don't need internal launch for send, looks like unnecessary overhead, I would just use channel with buffer and/or use offer instead of send (depends on required semantics, check offer and send docs and channel buffering docs)
i
@gildor Ok! *.offer() looks like what i need. I'll try it but for some reasons not always return the full data set. ( Also working a little bit slowly then launch { send() })
g
Sure it's not guaranteed, because if channel is full and you don't suspend, channel cannot receive this data, but even send returns instantly in case of buffered channel
How do you measure speed? Creating one more coroutine on each even and especially dispatching of this coroutine should be more expensive than just send to a channel directly
i
@gildor Actually speed not actualy really problem in my case. And I'll prefer to make more clean code. Test it - calculate system time ms diff before and after, 5 times in row 2403/2581/2128/2178/2184 ms - send() wrapped with launch{} 2899/2863/2134/2127/2169 ms - using offer() But as I say offer also not return not full result And this is problem. (Have no experience with channels - maybe with offer() need to use other type of Channel instead of Channel<T>() or change code which consume data)
g
As s I mentioned, in case of offer you need channel with buffer to cache events until delivery
5 measurements without any warming cannot be used for such benchmarking, I would just ignore those results, especially for this simple code, for microbenchmarking you need proper test harness
i
I'm dont try to prove something about this functions. Just check it, for interest. Left to read about the channels 🙂
u
That's why I used sendBlocking in my example. Your performance issues probably come from using a rendezvous channel. If you pass a buffer size to the channel constructor, sendBlocking will not block unless the buffer is full. But opposed to offer, it will also not loose data if the buffer is full. This is a lot cleaner then using launch, as with buffered channels, you just queue the data, not the whole coroutine to process it.
i
used launch because was't know about buffer for channel. And with default (RendezvousChannel) sendBlocking cause error. Now when changed to buffered channel. Both functions offer and sendBlocking working good.
g
Why do you use sendBlocking in general?
i
Currently I'm using offer. Send blocking was in the original exapmle by Uli and HiSearchSDK was out of launch(UI) corutine. But HiSearchSDK must be initialized inside main thread only so after that I move it inide launch(UI) and it cause problem because sendBlocking should not call from the main tread. So I change it to send wrapped with one more launch. And finilly you tell me about offer() and channel with buffer.
g
I think it’s pretty rare when you need sendBlocking, only probably for tests or if you want to use coroutines from some existing blocking code
u
I guess, sending to a channel from a callback ( i.e. not suspending) with backpressure is a usecase but only makes sense on a background thread
g
yes, as an adapter it will work, but usually writing coroutine wrapper for existing (even blocking) callback API would be more clean solution