Bruno_
06/27/2020, 1:06 PMursus
06/29/2020, 12:14 AMsubscriberOn(Schedulers.single())
but that obviously fails, when inside that chain somebody switches schedulersdimsuz
07/07/2020, 6:51 PMpublishSubject.concatMap { Observable.delay(3000) }
If subject will very quickly emit a lot of items, what happens? Will they get buffered up to a certain point?
Or not at all? Can one expect that quickly emitting 3 items will be OK, but 1 000 000 items will throw MissingBackrpressure?
Where's that point when OK becomes an exception? :)
I tried searching javadoc for this info, but didn't succeed. If this is documented, please point me to the place in the docs.ursus
07/09/2020, 2:11 AMBruno_
07/10/2020, 10:37 AMelizarov
07/21/2020, 11:05 AMFlow.buffer
operator that lets the publisher produce faster than consumer, but puts a backpressure on it when buffer overflows. So like onBackpressureBuffer
, but without an error on overflow. What can I do?Joan Colmenero
07/27/2020, 11:04 AMObservable
and Single
at the same time?
I have this
Observable.zip(repo1.getStream(), repoSingle.getStream().toObservable(), (repo1, repo2) -> repo1)
But it's not calling, the thing is that I'd like to execute both and subscribe to both in the same subscribe
it works if I create two separate subscribe though.
Also I need to use the result of the repo1.getStream()
to make some calls from the result like if is returning an user I'd like to do user -> user.getFoo().
ThanksJoan Colmenero
07/27/2020, 12:05 PMObservable
let's call it repoObservable
and another one that returns a Single
let's call it repoSingle
so the thing is, everything before was calling only the repoObservable
but now we realised that we need that repoSingle
to be called as well, so what I'm trying to do is to subscribe both in the same Observable
I've tried to make it with two subscribe and it works, but I wanted to check if it's possible to make it in one observer.
What I'm trying is
repoObservable.getStream().zipWith(repoSingle.get().toObservable(), (BiFunction<User, User, Object>)(user, user1) -> {
updateUiWithUser(user); <-- should be from the
repoObservable.getStream()
showMessage();
}
but the thing is that it's not called, I wanted to know if is there any possibility to avoid doing something like :
repoObservable.observeOn(AndroidSheculers.mainThread())
.subscribe(...)
repoSingle.observeOn(AndroidSchedules.mainThread())
.subscribe(...)
Sam
07/28/2020, 5:08 AMObservable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
dimsuz
09/23/2020, 3:28 PMitemsObservable.delay { triggerObservable /* Observable<Unit> */ }
how can I delay only starting with the second item?
I.e. I want to immediately emit the first item and after that emit only after triggerObservable
fires. Somehow zip onto itself with 1-shifted or having AtomicBoolean
set to true until first item fires, or something like this? Maybe I miss something more elegant.iex
10/05/2020, 3:02 PMCompletable
) subscription on click?
I developed the habit lately to use a PublishSubject
trigger and subscribe to it in init
, but saw code again that uses the "subscription on tap" pattern and I'm wondering whether my pattern is overkill?oday
10/11/2020, 8:59 AMStavFX
10/19/2020, 9:21 PMfun <T> Observable<T>.foo(): ConnectableObservable<T> {
return someOperators...
.replay(1)
.someMoreOperators...
}
expensivebelly
10/27/2020, 9:46 PMCache
?
Cache<K, Observable<T>>
Then you can replayingShare()
each observable within the cache (https://github.com/JakeWharton/RxReplayingShare)dephinera
10/31/2020, 7:55 AMursus
11/04/2020, 11:49 PMGabe Kauffman
11/18/2020, 11:45 PMSami Eljabali
12/02/2020, 4:53 AMiex
12/02/2020, 2:41 PMSingle
with success/error state. Thinking about how to implement this. Something like
val successObservable: Observable<SomeType>
val errorObservable: Observable<AnotherType> // AnotherType can be mapped into a Throwable
request.flatMap {
// First event of successObservable OR errorObservable. successObservable -> just(event), errorObservable -> error(error)
}
Not sure what operators to use here, zip
with take(1)
and startsWith(none)
crossed my mind but it doesn't feel right...oday
12/03/2020, 9:47 PModay
12/08/2020, 10:31 AMpurchaseFlowStore
.getTransaction(transactionId)
.takeUntil { it.state == "done" || it.state == "error" || it.state == "authorized" }
.doOnError {
logger.debug("TransactionInfo error! $it")
}
.repeatWhen { Observable.timer(5, TimeUnit.SECONDS) }
.viewModelSubscription({
logger.debug("TransactionInfo is ${it.state}")
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
ShowRequestFailedDialog()
}
}
}, {
logger.debug("TransactionInfo error! $it")
})
oday
12/08/2020, 12:29 PMObservable.interval(2000, TimeUnit.MILLISECONDS)
.repeatUntil {
System.currentTimeMillis() - startTime > 10000
true
}
.subscribe {
purchaseFlowStore
.getTransaction(transactionId)
.viewModelSubscription({
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
showErrorDialog()
}
}
}, {
showErrorDialog()
})
}
Maciek
12/11/2020, 4:54 PMdoWork
during the work being done for the first entity I want it to be queued and wait for it to be finished. At first, I was trying to figure out some clever internal queue based on subject, but failed. Then I thought that simple semaphore will do the job just fine (but also will block the thread). Do you see any problems with such implementations or can it be achieved with similar simplicity with streams without thread blocking?
class Foo {
private val sem = Semaphore(permit = 1)
fun doWork(): Completable =
Completable.fromCallable { sem.acquire() }
.andThen { doWorkInternal() }
.doOnError { sem.release() }
.doOnComplete { sem.release() }
}
Sam
01/20/2021, 7:09 AMJintin
01/27/2021, 3:52 PMSingle.just
is a hot observable or not.
I feel it's not but seems many people think it is. Is there any rule we can follow how to determine it?Grant Park
03/02/2021, 10:52 PMAndy Gibel
03/26/2021, 6:34 PMAndy Gibel
03/29/2021, 8:06 PMMatias Reparaz
08/20/2021, 5:47 PMfun getResources(): Single<Resources>
and with that I need to do 2 things, 1️⃣ a flatMap with another call fun doSomething(r: Resources): Single<OtherThing>
and 2️⃣ send a metric if some condition occurs fun sendMetric(r: Resources): Completable
.
I’m looking for something better than this:
getResources()
.doOnSuccess {
if (someCondition(it)) sendMetric(it).subscribe()
}.flatMap { doSomething(it) }
Note that if sendMetrics
fails it’s not critical and I don’t want to interrupt the doSomething
flow. Is there a more elegant way to do this?Keith Mayoral
05/06/2022, 12:05 AMSingle.amb(sourcesList)
which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to amb()
which errors out immediately if the first source to complete errored out)Keith Mayoral
05/06/2022, 12:05 AMSingle.amb(sourcesList)
which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to amb()
which errors out immediately if the first source to complete errored out)ephemient
05/08/2022, 7:18 AMdevKshitijJain
06/04/2022, 6:24 PMephemient
06/04/2022, 9:54 PMSingle.amb(sourcesList).retry()
or equivalent with onErrorResumeNext
, one source failing cancels everything else that's currently in-flight, forcing them to be re-subscribed on retry/resume