https://kotlinlang.org logo
Title
y

Yoav Gross

10/02/2019, 2:10 PM
I’m trying to subscribe to
rxjava
events and consume them as
flow
, it seems i’m not subscribing correctly with flow or missing to correlating action to rx
subscribe
This is the
flow
usage, i’m expecting to collect each change to the data from the
flowable
i’m listening to:
runBlocking {
            getSubtitleFromProjectDataManager()
        }

private suspend fun getSubtitleFromProjectDataManager() {
        scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {

            val flow = projectDataManager.getProjectNameFlowable().asFlow()
            flow.collect {
                subtitle = it

            }
            launch(Dispatchers.Main) {
                view?.rebindToolbar()
            }
        }
    }
This is the rxjava code I’m trying to listen to. Indeed the event isn’t firing again when the data is changed, so I suspect as said before there is a subscription issue:
public Flowable<String> getProjectNameFlowable() {
        return RxJavaInterop.toV2Flowable(
                getSelectedProject()
                        .switchMap(projectEntity -> {
                                    Timber.e("fire projectEntity " + projectEntity.title());
                                    return Observable.just(projectEntity.title());
                                }
                        ));
    }
👋 1
l

louiscad

10/02/2019, 2:13 PM
You should use triple baticks for code snippets that are not just referecing a short single line.
y

Yoav Gross

10/02/2019, 2:15 PM
edited. thanks.
l

louiscad

10/02/2019, 2:18 PM
Also, I think decreasing the indent and making it match standards (4 spaces) would help readability for others that might want to help you.
r

reline

10/02/2019, 3:17 PM
This line won’t run until your flow completes because
flow.collect { ... }
is blocking.
launch(Dispatchers.Main) {
    view?.rebindToolbar()
}
I would also suggest that you wrap your suspend function body in
coroutineScope { ... }
to prevent leaks

https://youtu.be/a3agLJQ6vt8?list=LL-8jQIus7GSbj5dAU0jB7rg&amp;t=365

✔️ 1
y

Yoav Gross

10/02/2019, 6:33 PM
Thanks @reline, any idea how I make the flow listen to changes and emit?
What can i use other than collect?
r

reline

10/02/2019, 6:42 PM
You can still use collect here. I believe your goal is to set the subtitle and rebind the toolbar each time you get a new project? maybe do something like
flow.collect {
    withContext(Dispatchers.Main) {
        subtitle = it
        view?.rebindToolbar()
    }
}
also take a look here if you can use experimental APIs https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
😲 1
y

Yoav Gross

10/02/2019, 6:50 PM
Yes thats what im trying to do, migrate from rx but keep the idea of subscription, will give it a shot, thanks @reline!
👍 1
l

louiscad

10/02/2019, 8:44 PM
This:
flow.collect {
    withContext(Dispatchers.Main) {
        subtitle = it
        view?.rebindToolbar()
    }
}
switches dispatcher on each new value, which is inefficient. Switch dispatcher once for all at collect site (wrapping the whole
collect
call) if you want to do that. For flow source/emitter, there's the
flowOn
operator as said earlier.
👍 1
r

reline

10/02/2019, 8:51 PM
thanks for pointing out the differences Louis. If I’m not mistaken, won’t wrapping the
collect
call inside
withContext(Main)
cause it to observe on the main thread as well? how would you go about observing on IO and then consuming events on Main (without using
flowOn
)?
l

louiscad

10/02/2019, 9:13 PM
Flow using another dispatcher internally (including using
flowOn
) is an implementation detail. The Flow should be safe to collect on the main thread.
👍 1