ursus
06/21/2019, 3:28 PMursus
06/21/2019, 4:00 PMadams2
06/21/2019, 4:34 PMkioba
06/24/2019, 4:32 PMhttps://youtu.be/64rQ9GKphTg▾
class ChatViewModel(private val messageManager: MessageManager): ViewModel {
private val binder: PublishProcessor<MessageEvent> = PublishProcessor.create()
private val state =
binder
.flatMap{ messageManager.editMessage(it.messageId, it.newText) }
// #1
.replay(1)
.autoConnect(0)
fun editMessageClick(messageId: Long, newText: String) {
binder.onNext(MessageEvent(messageId, newText))
}
override fun state(): Observable = state
}
class MessageManager {
fun editMessage(messageId, newText) =
database.update(messageId, newText, State.SENDING)
.flatMap { apiClient.editMessage(...)}
.flatMap { database.update(messageId, newText, State.SENT )}
.asEvents()
}
data class MessageEvent(val messageId: Long, val newText: String)
jw
06/25/2019, 4:29 PMA.skipUntil(B.ignoreElements().concatWith(just(Unit)))
kioba
06/25/2019, 4:46 PMA
while B
is triggeringmyanmarking
06/26/2019, 11:41 AMreturn Observable.just(1)
.doOnSubscribe{
this.disposable = observable.just(2)
.subscribe({...}, {...})
}
rook
07/01/2019, 1:13 PMBiFunction
, but I can’t figure out what specifically I’ve done incorrectly
val one = Observable.fromIterable(listOf(1,2,3))
val two = Observable.fromIterable(listOf(4,5,6))
Observable.combineLatest(one, two, BiFunction{ p1:Int, p2:Int -> p1+p2})
fangzhzh
07/03/2019, 11:53 PMfun printCurrentThread(tag: String) = println("$tag: ${Thread.currentThread().name.substringBefore("-")}")
fun main(args: Array<String>) {
printCurrentThread("main thread name")
val subject = BehaviorSubject.create(Unit)
subject
.subscribeOn(Schedulers.computation())
.subscribe {
printCurrentThread("subscribe onNext")
}
Thread.sleep(100L)
subject.onNext(Unit)
Thread.sleep(100L)
}
// The author said the code gives
main thread name: main
subscribe onNext: RxComputationScheduler
subscribe onNext: main
The author explains:
Correct answer is A because the default value is based on the subscribeOn (computation thread) and .onNext() of the subject is called from the main thread. Therefore the second emission will ignore the subscribeOn, change thread and will be emitted at the main thread.
But from my running, it gives
main thread name: main
subscribe onNext: main
my question is that is there really a Where the first RxComputationScheduler onNext()
?Joan Colmenero
07/04/2019, 9:57 PMJoan Colmenero
07/08/2019, 3:59 PMivano
07/12/2019, 12:47 PMursus
07/15/2019, 12:29 AMreline
07/15/2019, 5:25 PMfun parseObservable(f: File): Observable<Item> { ... }
Observable.fromIterable(List<File>)
.concatMapEagerDelayError({ parseObservable(it) }, true)
.toList()
.subscribeOn(<http://Schedulers.io|Schedulers.io>())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { items -> ... }
Daniel Rodak
07/18/2019, 7:14 AMactionIntent()
returns Observable and loadFile()
returns Completable. I want to zip them and return Boolean value of the observable. Is this the right way to fit Completable?pteale
07/24/2019, 8:40 AMJoe
07/25/2019, 10:02 PMPublisher<T>
, and am doing buildPublisher().subscribe(subscriber)
from an "outside of rx" thread/context. To free up that thread, I'd like to move the buildPublisher()
call "into rx". Currently have the following, but not sure if there's a clearer, less error prone, or otherwise better way:
Flowable.create<T>({ emitter ->
try {
buildPublisher().subscribe(
object : Subscriber<T> {
override fun onComplete() {emitter.onComplete() }
override fun onSubscribe(s: Subscription) { s.request(Long.MAX_VALUE) }
override fun onNext(t: T) { emitter.onNext(t) }
override fun onError(t: Throwable) { emitter.onError(t) }
}
} catch (e: Exception) { emitter.onError(e) }
}, BackpressureStrategy.BUFFER)
.subscribeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe(subscriber)
tseisel
07/29/2019, 9:32 AMFlowable
that emits values and multiple errors (for example, after merging back the stream with flatMapSingle
), how can I prevent errors from terminating the Flowable
, so that I can print errors along values ?ursus
08/02/2019, 1:45 PMam414
08/15/2019, 9:06 PMval data = MutableLiveData<RegisterResponse>()
disposables.add(
ServiceRepo.register(firstName, lastName, mobileno, email, password)
.subscribeOn(<http://Schedulers.io|Schedulers.io>())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
data.value = it
}, { throwable ->
val error: HttpException = throwable as HttpException
if (error.code() == 400) {
val jObj = error.response().errorBody() as RegisterResponse//the issue here, i can't cast it
data.value = jObj
}
})
)
blakelee
08/16/2019, 9:20 PMilyagulya
08/19/2019, 4:08 PMval a: Completable
val b: Completable
val c: Completable
val merged = Completable.merge(listOf(a,b))
I need to subscribe to both merged
and c
but if merged
completes before the c
, I need to unsubscribe from c
.blakelee
08/19/2019, 4:19 PMfun doThings(): Maybe<Boolean> {
Log.d("TEST", "HIT")
return Maybe.just(true)
}
return Maybe.just("")
.map { false }
.switchIfEmpty(doThings())
My log message still shows even though my Maybe is not emptyam414
08/23/2019, 8:23 PMdisposables.add(
ServiceRepo.getProductReviews(context, pruductId)
.subscribeOn(<http://Schedulers.io|Schedulers.io>())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
//how i can get them all here ♥
}, { throwable ->
Log.e("tag", throwable.message)
})
)
thanh
08/27/2019, 6:40 PMBMG
08/29/2019, 7:45 AMSingle.fromCallable {
getUser().isVerified
}
.flatMapObservable {
if(it) Observable.just(it)
else getUserObservable().map { it.isVerified }
}
.subscribe ({
if(it) view.showVerified
})
Grant Park
08/29/2019, 6:43 PM.doOnError
of upstream Singles will be invokedFredrik Larsen
09/04/2019, 12:00 PM@Test
fun `validate test`() {
val scheduler = TestScheduler()
val upstream = Flowable.fromCallable { "foo" }
.subscribeOn(scheduler)
.observeOn(scheduler)
.delay(2, TimeUnit.SECONDS)
val implementation = upstream
.map { true }
.startWith(false)
.subscribeOn(scheduler)
.observeOn(scheduler)
.doOnEach { println(it) }
.test()
implementation
.assertNotTerminated()
.assertNoValues()
scheduler.advanceTimeBy(1, TimeUnit.SECONDS)
implementation
.assertNotTerminated()
.assertValues(false)
scheduler.advanceTimeBy(2, TimeUnit.SECONDS)
implementation
.assertNotComplete()
.assertValues(false, true) /* assertion fails*/
/*
* Output:
*
* OnNextNotification[false]
*
* java.lang.AssertionError: Value count differs; expected: 2 [false, true] but was: 1 [false] (latch = 1, values = 1, errors = 0, completions = 0)
* Expected :2 [false, true]
* Actual :1 [false] (latch = 1, values = 1, errors = 0, completions = 0)
*/
}
BMG
09/04/2019, 2:17 PMblockingGet()
?alexsullivan114
09/16/2019, 1:54 PMalexsullivan114
09/16/2019, 1:54 PMstarke
09/16/2019, 5:54 PMsubscribe
multiple times and potentially deal with managing multiple disposables doesn’t add much value