https://kotlinlang.org logo
Title
i

iLobanov

09/07/2018, 4:40 AM
Hi all. Need some help. What is the best way to return result of Listener callback function as result of my function which create it? Should I use channel to achive this or something like that?
fun someFunction() : SomeResult {
    val someObj =  SomeObj(SomeListenr { result ->
              SomeResult(result) // <- WANT THIS
        }
    )
    someObj.start()
}
g

gildor

09/07/2018, 4:48 AM
You don’t have return
what exactly do you want to achive? Is it related to coroutines or more general question?
i

iLobanov

09/07/2018, 4:59 AM
I'm think it more general but I try to solve in in courutine context. Yes I don't have return because I don't shure hot to return SomeObject from someFunctuion()
Actualy it look more like this
fun someFunction() : SomeResult {
 launch(UI) {
    val someObj =  SomeObj(SomeListenr { result ->
              SomeResult(result) // <- WANT THIS
        }
    )
    someObj.start()
}
return SomeResult from listener
}
i

iLobanov

09/07/2018, 5:03 AM
@uliluckas wow, looking like what I want. Thx! Check it)
u

uli

09/07/2018, 5:05 AM
You still need to figure out how to bridge between the suspending and none suspending world.
I.e. do you want someFunction to wait for the callback?
i

iLobanov

09/07/2018, 5:07 AM
Yes and yes
u

uli

09/07/2018, 5:08 AM
Then you could call the suspending wrapper from runBlocking. But as it says, it will block your thread, waiting for the callback
i

iLobanov

09/07/2018, 5:09 AM
SomeObj(SomeListenr objects from library But operation working really fast so I should have no problems with bloking thread
g

gildor

09/07/2018, 5:19 AM
There is also suspendCancellableCoroutine from Kotlinx.Coroutines that allows you to support cancellation (if your callback supports this)
i

iLobanov

09/07/2018, 5:22 AM
@gildor Thanks for the advice, I will know)
Missed that need to collect the result of callback to list. Here is my solution(Partly compleated). Maybe there is some better way to do that.
private suspend fun baseFindCamera(): List<SearchSDK.SearchResult> = suspendCoroutine { cont ->
        val list: ArrayList<SearchSDK.SearchResult> = ArrayList()
        lateinit var searchSDK: SearchSDK
        var isResumed = false
        val timer = Timer()
        var task = timerTask {}

        launch(UI) {
            searchSDK = SearchSDK(SearchSDK.ISearchResult {
                result ->
                Log.d(TAG, "searchSDK get result ${result.uid}")
                if(!isResumed) {
                    if (!result.uid.isEmpty())
                        list.add(result)
                    else
                        cont.resumeWithException(NullPointerException())
                    task.cancel()
                    task = timerTask {
                        cont.resume(list)
                        isResumed = true
                        timer.cancel()
                        timer.purge()
                        searchSDK.stop()
                    }
                    timer.schedule(task, 2000)
                }
            })
            searchSDK.search2()
        }
    }

    override fun findCameras(): Either<Failure, List<MyCameraEntity>> {
        val emptyCamera = MyCameraEntity("", "", "", "")
        val list = runBlocking(CommonPool) { baseFindCamera() }
        list.forEach {r ->
            Log.d(TAG, "uid ${r.uid} | ip ${r.ip} | port ${r.port} | name ${r.name} | version ${r.version} ")
        }
        return Either.Right(listOf(emptyCamera))
    }
u

uli

09/07/2018, 2:26 PM
This is hard to read inside the thread. Could you provide a gist or slack-snippet? You could even post to https://try.kotlinlang.org/
Without understanding what you are doing yet, I would think that a
launch
inside
suspendCoroutine
is probably not what you want. Do one thing at a time. Wrap your callback into a coroutine. Then take that coroutine and do other stuff ... schedule to UI thread, stop the sdk, ...
g

gildor

09/08/2018, 2:46 AM
Agree with Uli, in general your snippet doesn’t look correct, all thos runBlocking, launch(UI) etc.
u

uli

09/08/2018, 9:00 AM
something like this should be your starting point. Then you do your loop and list building, using this suspending
search
function. Your timer task probably reduces to a simple delay:
private suspend fun search(): SearchSDK.SearchResult = suspendCoroutine { cont ->
        searchSDK = SearchSDK(SearchSDK.ISearchResult { result ->
            if (!result.uid.isEmpty())
                cont.resumeWith(result)
            else
                cont.resumeWithException(NullPointerException())
        } 
        searchSDK.search2()
    }
}
Please ignore my last post. Just looked at your code again on a big screen. So you are getting continuous callbacks form your search which you want to collect until there is silence for 2 seconds, right?
multiple values can be represented by channels in the coroutine world. You could try something like this:
private suspend fun findMyCameras(): Channel<SearchSDK.SearchResult> {
    val channel = Channel<SearchSDK.SearchResult>()
    val searchSDK = SearchSDK(SearchSDK.ISearchResult { result ->
        Log.d(TAG, "searchSDK get result ${result.uid}")
        if (!result.uid.isEmpty())
            channel.sendBlocking(result)
        else
            channel.close(NullPointerException())
    })
    launch(UI) {
        searchSDK.search2()
    }
    channel.invokeOnClose {
        searchSDK.stop()
    }
    return channel
}

fun findCameras(): Either<Authenticator.Failure, List<MyCameraEntity>> {
    val list: ArrayList<SearchSDK.SearchResult> = ArrayList()
    val emptyCamera = MyCameraEntity("", "", "", "")
    runBlocking(CommonPool) {
        findMyCameras().use { cameraChannel ->
            while (true) {
                val nextCamera = withTimeoutOrNull(2, TimeUnit.SECONDS) {
                    cameraChannel.receiveOrNull()x 
                }
                if (nextCamera == null) break
                list.add(result)
            }
        }
    }
    list.forEach { r ->c
        Log.d(TAG, "uid ${r.uid} | ip ${r.ip} | port ${r.port} | name ${r.name} | version ${r.version} ")
    }
    return Either.Right(listOf(emptyCamera))
}
Not sure if you need to catch the potential exception from
sendBlocking
i

iLobanov

09/10/2018, 6:55 AM
@uli Thanks for your help!) I'll use implementation you suggest.
u

uli

09/10/2018, 8:00 AM
be aware though, it is not tested at all. Especially installing
invokeOnClose
was a wild guess. Maybe `searchSDK.stop()`needs to be called from a try catch around
channel.sendBlocking(result)
if you get it to work, please let us know 🙂
i

iLobanov

09/10/2018, 8:26 AM
Yes It works! But I'm replace findMyCameras().use with .consume because use marked as deprecated. invokeOnClose - seems also working but need more tests.)
@gildor @uli Working version for me looking like that https://gist.github.com/ioKun/115bb4799e0bcbab429aacb13fd8c738 Thx for help again! First time using Gist.
g

gildor

09/10/2018, 12:03 PM
I think you don't need internal launch for send, looks like unnecessary overhead, I would just use channel with buffer and/or use offer instead of send (depends on required semantics, check offer and send docs and channel buffering docs)
i

iLobanov

09/10/2018, 12:10 PM
@gildor Ok! *.offer() looks like what i need. I'll try it but for some reasons not always return the full data set. ( Also working a little bit slowly then launch { send() })
g

gildor

09/10/2018, 12:15 PM
Sure it's not guaranteed, because if channel is full and you don't suspend, channel cannot receive this data, but even send returns instantly in case of buffered channel
How do you measure speed? Creating one more coroutine on each even and especially dispatching of this coroutine should be more expensive than just send to a channel directly
i

iLobanov

09/10/2018, 12:41 PM
@gildor Actually speed not actualy really problem in my case. And I'll prefer to make more clean code. Test it - calculate system time ms diff before and after, 5 times in row 2403/2581/2128/2178/2184 ms - send() wrapped with launch{} 2899/2863/2134/2127/2169 ms - using offer() But as I say offer also not return not full result And this is problem. (Have no experience with channels - maybe with offer() need to use other type of Channel instead of Channel<T>() or change code which consume data)
g

gildor

09/10/2018, 12:45 PM
As s I mentioned, in case of offer you need channel with buffer to cache events until delivery
5 measurements without any warming cannot be used for such benchmarking, I would just ignore those results, especially for this simple code, for microbenchmarking you need proper test harness
i

iLobanov

09/10/2018, 12:51 PM
I'm dont try to prove something about this functions. Just check it, for interest. Left to read about the channels 🙂
u

uli

09/10/2018, 3:32 PM
That's why I used sendBlocking in my example. Your performance issues probably come from using a rendezvous channel. If you pass a buffer size to the channel constructor, sendBlocking will not block unless the buffer is full. But opposed to offer, it will also not loose data if the buffer is full. This is a lot cleaner then using launch, as with buffered channels, you just queue the data, not the whole coroutine to process it.
i

iLobanov

09/11/2018, 4:24 AM
used launch because was't know about buffer for channel. And with default (RendezvousChannel) sendBlocking cause error. Now when changed to buffered channel. Both functions offer and sendBlocking working good.
g

gildor

09/11/2018, 4:50 AM
Why do you use sendBlocking in general?
i

iLobanov

09/11/2018, 6:16 AM
Currently I'm using offer. Send blocking was in the original exapmle by Uli and HiSearchSDK was out of launch(UI) corutine. But HiSearchSDK must be initialized inside main thread only so after that I move it inide launch(UI) and it cause problem because sendBlocking should not call from the main tread. So I change it to send wrapped with one more launch. And finilly you tell me about offer() and channel with buffer.
g

gildor

09/11/2018, 6:19 AM
I think it’s pretty rare when you need sendBlocking, only probably for tests or if you want to use coroutines from some existing blocking code
u

uli

09/11/2018, 6:29 AM
I guess, sending to a channel from a callback ( i.e. not suspending) with backpressure is a usecase but only makes sense on a background thread
g

gildor

09/11/2018, 6:30 AM
yes, as an adapter it will work, but usually writing coroutine wrapper for existing (even blocking) callback API would be more clean solution