I’m trying to subscribe to `rxjava` events and con...
# coroutines
y
I’m trying to subscribe to
rxjava
events and consume them as
flow
, it seems i’m not subscribing correctly with flow or missing to correlating action to rx
subscribe
This is the
flow
usage, i’m expecting to collect each change to the data from the
flowable
i’m listening to:
Copy code
runBlocking {
            getSubtitleFromProjectDataManager()
        }

private suspend fun getSubtitleFromProjectDataManager() {
        scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {

            val flow = projectDataManager.getProjectNameFlowable().asFlow()
            flow.collect {
                subtitle = it

            }
            launch(Dispatchers.Main) {
                view?.rebindToolbar()
            }
        }
    }
This is the rxjava code I’m trying to listen to. Indeed the event isn’t firing again when the data is changed, so I suspect as said before there is a subscription issue:
Copy code
public Flowable<String> getProjectNameFlowable() {
        return RxJavaInterop.toV2Flowable(
                getSelectedProject()
                        .switchMap(projectEntity -> {
                                    Timber.e("fire projectEntity " + projectEntity.title());
                                    return Observable.just(projectEntity.title());
                                }
                        ));
    }
👋 1
l
You should use triple baticks for code snippets that are not just referecing a short single line.
y
edited. thanks.
l
Also, I think decreasing the indent and making it match standards (4 spaces) would help readability for others that might want to help you.
r
This line won’t run until your flow completes because
flow.collect { ... }
is blocking.
Copy code
launch(Dispatchers.Main) {
    view?.rebindToolbar()
}
I would also suggest that you wrap your suspend function body in
coroutineScope { ... }
to prevent leaks

https://youtu.be/a3agLJQ6vt8?list=LL-8jQIus7GSbj5dAU0jB7rg&amp;t=365

✔️ 1
y
Thanks @reline, any idea how I make the flow listen to changes and emit?
What can i use other than collect?
r
You can still use collect here. I believe your goal is to set the subtitle and rebind the toolbar each time you get a new project? maybe do something like
Copy code
flow.collect {
    withContext(Dispatchers.Main) {
        subtitle = it
        view?.rebindToolbar()
    }
}
also take a look here if you can use experimental APIs https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
😲 1
y
Yes thats what im trying to do, migrate from rx but keep the idea of subscription, will give it a shot, thanks @reline!
👍 1
l
This:
Copy code
flow.collect {
    withContext(Dispatchers.Main) {
        subtitle = it
        view?.rebindToolbar()
    }
}
switches dispatcher on each new value, which is inefficient. Switch dispatcher once for all at collect site (wrapping the whole
collect
call) if you want to do that. For flow source/emitter, there's the
flowOn
operator as said earlier.
👍 1
r
thanks for pointing out the differences Louis. If I’m not mistaken, won’t wrapping the
collect
call inside
withContext(Main)
cause it to observe on the main thread as well? how would you go about observing on IO and then consuming events on Main (without using
flowOn
)?
l
Flow using another dispatcher internally (including using
flowOn
) is an implementation detail. The Flow should be safe to collect on the main thread.
👍 1