praful0203
05/29/2020, 6:45 AMdimsuz
06/18/2020, 10:26 AMval events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducerFactory = { currentState: List<String>, event: String ->
Single.fromCallable { /* do work */ currentState.plus(event) }
}
events
.scan(
initialState,
{ currentStateSingle, event ->
currentStateSingle
.flatMap { currentState -> reducerFactory( currentState, event) }
// required to avoid resubscription on all previously emitted single's on each new scan iteration
.cache() // (1)
}
)
.flatMapSingle { it }
.subscribe { state -> println("state updated to $state") }
What bothers me is that each event will add a new "flatMap" (marked as (1)
) to an existing chain and it will grow unbounedly consuming memory and never disposing intermediate Singles
while after they emitted a new state they are not needed and can be safely disposed. The only other option I see is to have state
being held atomically outside the rx chain and chainging it from within the operators, but this feels a bit messy.antoniomarin
06/24/2020, 11:31 AMCompletable
flow for login, and at some point in the middle I need to call AppAuth
library to performTokenRequest
. That library uses callbacks for results, so I’m curious how to adapt it to my architecture? Idea is that when callback is successful I resume with doOnSuccess
or doNext
to the next Completable
function.
private fun login(
authorisationResponse: AuthorisationResponse
): Completable {
return Completable.defer {
// do some validations..
if (authorisationResponse == null) {
return@defer Completable.error(...)
}
// app auth stuff
val tokenRequest = authorizationResponse.createTokenExchangeRequest()
val clientAuthentication ....
// performing token request, AppAuth library works with callbacks, so I'm
// curious how to "pause" my Rx flow until I get result from that callback
// and then I need to continue completable
return@defer ???
authorisationService.performTokenRequest(tokenRequest,clientAuthentication)
{ response, ex ->
// callbacks
if (response != null) {
//... I'm going to save token here
authStateManager.saveToken(response)
}
.doOnSuccess or .doNext (continueLoginFlow)
}
}
private fun continueLoginFlow(): Completable {
return authStateManager.getToken()
.map { ... }
.flatMap { ... }
.doOnSuccess { ... }
...
...
}
dimsuz
06/25/2020, 5:54 PMval subject = PublishSubject.create<Int>()
Observable.merge(
Observable.just(1,2,3).doOnSubscribe { println("subscribed to observable") },
subject.doOnSubscribe { println("subscribed to subject") }
).subscribe {
println("received $it")
}
prints
subscribed to observable
received 1
received 2
received 3
subscribed to subject
Can I somehow ensure that subscrption to merge arguments happens first and only then emittion starts? I.e. I want this to print
subscribed to observable
subscribed to subject
received 1, 2, 3
Why: for example I want to post to subject from within Observable.just(1,3,4).doOnLast { subject.onNext(88) }
Bruno_
06/27/2020, 1:06 PMursus
06/29/2020, 12:14 AMsubscriberOn(Schedulers.single())
but that obviously fails, when inside that chain somebody switches schedulersdimsuz
07/07/2020, 6:51 PMpublishSubject.concatMap { Observable.delay(3000) }
If subject will very quickly emit a lot of items, what happens? Will they get buffered up to a certain point?
Or not at all? Can one expect that quickly emitting 3 items will be OK, but 1 000 000 items will throw MissingBackrpressure?
Where's that point when OK becomes an exception? :)
I tried searching javadoc for this info, but didn't succeed. If this is documented, please point me to the place in the docs.ursus
07/09/2020, 2:11 AMBruno_
07/10/2020, 10:37 AMelizarov
07/21/2020, 11:05 AMFlow.buffer
operator that lets the publisher produce faster than consumer, but puts a backpressure on it when buffer overflows. So like onBackpressureBuffer
, but without an error on overflow. What can I do?Joan Colmenero
07/27/2020, 11:04 AMObservable
and Single
at the same time?
I have this
Observable.zip(repo1.getStream(), repoSingle.getStream().toObservable(), (repo1, repo2) -> repo1)
But it's not calling, the thing is that I'd like to execute both and subscribe to both in the same subscribe
it works if I create two separate subscribe though.
Also I need to use the result of the repo1.getStream()
to make some calls from the result like if is returning an user I'd like to do user -> user.getFoo().
ThanksJoan Colmenero
07/27/2020, 12:05 PMObservable
let's call it repoObservable
and another one that returns a Single
let's call it repoSingle
so the thing is, everything before was calling only the repoObservable
but now we realised that we need that repoSingle
to be called as well, so what I'm trying to do is to subscribe both in the same Observable
I've tried to make it with two subscribe and it works, but I wanted to check if it's possible to make it in one observer.
What I'm trying is
repoObservable.getStream().zipWith(repoSingle.get().toObservable(), (BiFunction<User, User, Object>)(user, user1) -> {
updateUiWithUser(user); <-- should be from the
repoObservable.getStream()
showMessage();
}
but the thing is that it's not called, I wanted to know if is there any possibility to avoid doing something like :
repoObservable.observeOn(AndroidSheculers.mainThread())
.subscribe(...)
repoSingle.observeOn(AndroidSchedules.mainThread())
.subscribe(...)
Sam
07/28/2020, 5:08 AMObservable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
dimsuz
09/23/2020, 3:28 PMitemsObservable.delay { triggerObservable /* Observable<Unit> */ }
how can I delay only starting with the second item?
I.e. I want to immediately emit the first item and after that emit only after triggerObservable
fires. Somehow zip onto itself with 1-shifted or having AtomicBoolean
set to true until first item fires, or something like this? Maybe I miss something more elegant.iex
10/05/2020, 3:02 PMCompletable
) subscription on click?
I developed the habit lately to use a PublishSubject
trigger and subscribe to it in init
, but saw code again that uses the "subscription on tap" pattern and I'm wondering whether my pattern is overkill?oday
10/11/2020, 8:59 AMStavFX
10/19/2020, 9:21 PMfun <T> Observable<T>.foo(): ConnectableObservable<T> {
return someOperators...
.replay(1)
.someMoreOperators...
}
expensivebelly
10/27/2020, 9:46 PMCache
?
Cache<K, Observable<T>>
Then you can replayingShare()
each observable within the cache (https://github.com/JakeWharton/RxReplayingShare)dephinera
10/31/2020, 7:55 AMursus
11/04/2020, 11:49 PMGabe Kauffman
11/18/2020, 11:45 PMSami Eljabali
12/02/2020, 4:53 AMiex
12/02/2020, 2:41 PMSingle
with success/error state. Thinking about how to implement this. Something like
val successObservable: Observable<SomeType>
val errorObservable: Observable<AnotherType> // AnotherType can be mapped into a Throwable
request.flatMap {
// First event of successObservable OR errorObservable. successObservable -> just(event), errorObservable -> error(error)
}
Not sure what operators to use here, zip
with take(1)
and startsWith(none)
crossed my mind but it doesn't feel right...oday
12/03/2020, 9:47 PModay
12/08/2020, 10:31 AMpurchaseFlowStore
.getTransaction(transactionId)
.takeUntil { it.state == "done" || it.state == "error" || it.state == "authorized" }
.doOnError {
logger.debug("TransactionInfo error! $it")
}
.repeatWhen { Observable.timer(5, TimeUnit.SECONDS) }
.viewModelSubscription({
logger.debug("TransactionInfo is ${it.state}")
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
ShowRequestFailedDialog()
}
}
}, {
logger.debug("TransactionInfo error! $it")
})
oday
12/08/2020, 12:29 PMObservable.interval(2000, TimeUnit.MILLISECONDS)
.repeatUntil {
System.currentTimeMillis() - startTime > 10000
true
}
.subscribe {
purchaseFlowStore
.getTransaction(transactionId)
.viewModelSubscription({
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
showErrorDialog()
}
}
}, {
showErrorDialog()
})
}
Maciek
12/11/2020, 4:54 PMdoWork
during the work being done for the first entity I want it to be queued and wait for it to be finished. At first, I was trying to figure out some clever internal queue based on subject, but failed. Then I thought that simple semaphore will do the job just fine (but also will block the thread). Do you see any problems with such implementations or can it be achieved with similar simplicity with streams without thread blocking?
class Foo {
private val sem = Semaphore(permit = 1)
fun doWork(): Completable =
Completable.fromCallable { sem.acquire() }
.andThen { doWorkInternal() }
.doOnError { sem.release() }
.doOnComplete { sem.release() }
}
Sam
01/20/2021, 7:09 AMJintin
01/27/2021, 3:52 PMSingle.just
is a hot observable or not.
I feel it's not but seems many people think it is. Is there any rule we can follow how to determine it?Grant Park
03/02/2021, 10:52 PM