https://kotlinlang.org logo
#rx
Title
u

ursus

11/13/2018, 4:29 AM
I wanna ask about thread safety in rxjava, where do I need to add it manually? I have a hunch that subscribers, i.e. the onNext lambdas are not, right?
u

ursus

11/13/2018, 4:46 AM
I googled that, are you sure its for rxjava2? (or does it not matter)
g

gildor

11/13/2018, 4:48 AM
Yes, it’s still true for RxJava2
or mostly true with possible changes, but situation is the same
u

ursus

11/13/2018, 4:51 AM
is there a way to make subscribe serialized with lambdas or do I have to use SerializedObserver instance?
g

gildor

11/13/2018, 4:52 AM
What do you mean “with lambdas”?
u

ursus

11/13/2018, 4:52 AM
.subscribe { ... }
g

gildor

11/13/2018, 4:57 AM
I see what you mean, no I think you have to use SerializedObserver explicitly
u

ursus

11/13/2018, 4:57 AM
Copy code
.subscribe(new SerializedObserver<>(new Observer<Activation>() {
                    @Override public void onSubscribe(Disposable d) {
                        
                    }

                    @Override public void onNext(Activation activation) {

                    }

                    @Override public void onError(Throwable e) {

                    }

                    @Override public void onComplete() {

                    }
                }));
g

gildor

11/13/2018, 4:57 AM
Or use SerializedSubject
u

ursus

11/13/2018, 4:57 AM
okay ill create an extension, thanks
g

gildor

11/13/2018, 4:58 AM
But it really depends on you case and what exactly do you want to syncronize
u

ursus

11/13/2018, 5:00 AM
well I have specific usecase of, I have a queue of actions, which is a singlethread scheduler, I use a relay and observeOn and everything is good, in subscribe I then pass the action to state machine, which determines if that action is okay in a given state and mutates its state
however because of a stupid C api I am wrapping I need one specific action to be on the same thread as the C callback, so I cannot post it to my queue, so I need something like this
Copy code
queuedProcessActionRelay
                .observeOn(scheduler)
                .mergeWith(directProcessActionRelay)
                .throwingSubscribe {
                    synchronized(this) {
                        stateMachine2.process(it)
                    }
                }
so as you can see queued always comes from one thread, so no synchronization necessary, but this direct unlucky dude can come in at any time, hence I need synchronization around shared mutable state - state machine
k

kioba

11/13/2018, 12:30 PM
stateMachine isn’t the same problem as a database? you could do something like this?
Copy code
queuedProcessActionRelay
    .mergeWith(directProcessActionRelay)
    .flatmap {
        stateMachineObserver.processOnSubscribe(it)
             .subscribeOn(synchronizedScheduler)
    }.subscribe()
stateMachineObserver
just wraps the stateMachine and runs it when subscription happens, You also could check which Scheduler you are running on
u

ursus

11/13/2018, 1:39 PM
unfortunately that direct needs to run on the caller thread, cannot change it as its a stupid C api under the hood
whats a synchronized scheduler anyways?
k

kioba

11/13/2018, 1:44 PM
well you can move the
directProcessActionRelay
into the flatmap as well. synchronized scheduler is only a scheduler where all your state machine code will run