https://kotlinlang.org logo
Join the conversationJoin Slack
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by Linen
rx
  • b

    Bruno_

    06/27/2020, 1:06 PM
    do you have any rules as to where to place subscribeOn/observeOn? some examples that come to my mind • e.g. subscribeOn io next to the io operation • always next to the subscribe • at the end of the function which creates the publisher • always right after creating the publisher by rules I mean conventions that you apply at work, in one of your projects or even your own ones
    m
    • 2
    • 1
  • u

    ursus

    06/29/2020, 12:14 AM
    Hi, is there a way to enforce only 1 subscriber per cold observable? I'm looking at
    subscriberOn(Schedulers.single())
    but that obviously fails, when inside that chain somebody switches schedulers
    m
    • 2
    • 4
  • d

    dimsuz

    07/07/2020, 6:51 PM
    What happens by default in absense of backpressure support? For example here:
    publishSubject.concatMap { Observable.delay(3000) }
    If subject will very quickly emit a lot of items, what happens? Will they get buffered up to a certain point? Or not at all? Can one expect that quickly emitting 3 items will be OK, but 1 000 000 items will throw MissingBackrpressure? Where's that point when OK becomes an exception? :) I tried searching javadoc for this info, but didn't succeed. If this is documented, please point me to the place in the docs.
    a
    u
    g
    • 4
    • 8
  • u

    ursus

    07/09/2020, 2:11 AM
    I think this maps nicely, as confirmation by user is a sort of emit in time, but I've never seen people do it like this
    g
    • 2
    • 5
  • b

    Bruno_

    07/10/2020, 10:37 AM
    Can't find the operator I need so I have to ask. I have a facade and I want to return something like a hot (already subscribed) completable from it. Any ideas? Basically I want to call the facade from a controller and just let it do it's work. But there's other use case in which I want to wait for the completion.
    a
    u
    • 3
    • 4
  • e

    elizarov

    07/21/2020, 11:05 AM
    Looking for help. I need an Rx analogue of
    Flow.buffer
    operator that lets the publisher produce faster than consumer, but puts a backpressure on it when buffer overflows. So like
    onBackpressureBuffer
    , but without an error on overflow. What can I do?
    a
    a
    +2
    • 5
    • 6
  • j

    Joan Colmenero

    07/27/2020, 11:04 AM
    Hello how do I subscribe to an
    Observable
    and
    Single
    at the same time? I have this
    Observable.zip(repo1.getStream(), repoSingle.getStream().toObservable(), (repo1, repo2) -> repo1)
    But it's not calling, the thing is that I'd like to execute both and subscribe to both in the same
    subscribe
    it works if I create two separate subscribe though. Also I need to use the result of the
    repo1.getStream()
    to make some calls from the result like if is returning an user I'd like to do user -> user.getFoo(). Thanks
    g
    • 2
    • 38
  • j

    Joan Colmenero

    07/27/2020, 12:05 PM
    Regarding last question because there are lot of comments and replies... My problem is that I have two repositories one that returns an
    Observable
    let's call it
    repoObservable
    and another one that returns a
    Single
    let's call it
    repoSingle
    so the thing is, everything before was calling only the
    repoObservable
    but now we realised that we need that
    repoSingle
    to be called as well, so what I'm trying to do is to subscribe both in the same
    Observable
    I've tried to make it with two subscribe and it works, but I wanted to check if it's possible to make it in one observer. What I'm trying is
    repoObservable.getStream().zipWith(repoSingle.get().toObservable(), (BiFunction<User, User, Object>)(user, user1) -> {
       updateUiWithUser(user); <-- should be from the 
    repoObservable.getStream()
       showMessage();
    }
    but the thing is that it's not called, I wanted to know if is there any possibility to avoid doing something like :
    repoObservable.observeOn(AndroidSheculers.mainThread())
    .subscribe(...)
    repoSingle.observeOn(AndroidSchedules.mainThread())
    .subscribe(...)
    g
    • 2
    • 3
  • s

    Sam

    07/28/2020, 5:08 AM
    Hello everyone, I want to detect my user is typing on EditText My logical: Send event to server when user type first letter and wait x period ( ex: 3s ) for sending next event to server. Ex: User A 1. Types the word 1 at 154ms => send event to server ->wait 50 ms 2. Types the word 2 at 155ms => no send anything to server 3. Types the word 3 at 156ms => no send anything to server 4. Types the word 4 at 250ms => send event to server ->wait 50 ms Does anyone have an idea about this? Thanks My research about that, maybe can use like this code, but I don’t want to use interval
    Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
    Subject<Long, Long> subjectLong = PublishSubject.create();
    
    clock.subscribe(subjectLong);
    z
    • 2
    • 2
  • d

    dimsuz

    09/23/2020, 3:28 PM
    Hi! In the following
    itemsObservable.delay { triggerObservable /* Observable<Unit> */ }
    how can I delay only starting with the second item? I.e. I want to immediately emit the first item and after that emit only after
    triggerObservable
    fires. Somehow zip onto itself with 1-shifted or having
    AtomicBoolean
    set to true until first item fires, or something like this? Maybe I miss something more elegant.
    g
    a
    • 3
    • 3
  • i

    iex

    10/05/2020, 3:02 PM
    Is it a good practice to create a new (
    Completable
    ) subscription on click? I developed the habit lately to use a
    PublishSubject
    trigger and subscribe to it in
    init
    , but saw code again that uses the "subscription on tap" pattern and I'm wondering whether my pattern is overkill?
    a
    u
    • 3
    • 5
  • o

    oday

    10/11/2020, 8:59 AM
    who will it prevent from using its Observer methods
    a
    c
    • 3
    • 5
  • s

    StavFX

    10/19/2020, 9:21 PM
    Hey, I'm trying to write an extension method that uses .replay internally. My problem is that if I don't follow .replay with autoConnect/refCount etc, and move on to other operators, then I lose the ConnectableObservable reference. I would like to not include refCount as part of the method impl, and return ConnectableObservable to let consumers decide. Is there a way to achieve this?
    fun <T> Observable<T>.foo(): ConnectableObservable<T> {
      return someOperators...
             .replay(1)
             .someMoreOperators...
    }
    • 1
    • 1
  • e

    expensivebelly

    10/27/2020, 9:46 PM
    Have you tried just using a
    Cache
    ?
    Cache<K, Observable<T>>
    Then you can
    replayingShare()
    each observable within the cache (https://github.com/JakeWharton/RxReplayingShare)
    s
    • 2
    • 1
  • d

    dephinera

    10/31/2020, 7:55 AM
    What are you missing from RxJava when using flow?
    a
    • 2
    • 3
  • u

    ursus

    11/04/2020, 11:49 PM
    (its basically -- reading from cache if possible, else fetching from network I wanna show a progressbar in both cases, however when its a cache hit, the emit comes too fast and progressbar just "blinks" and is bad UX )
    s
    • 2
    • 2
  • g

    Gabe Kauffman

    11/18/2020, 11:45 PM
    Maybe I shouldn't even be using PublishSubject here, not sure
    g
    s
    • 3
    • 14
  • s

    Sami Eljabali

    12/02/2020, 4:53 AM
    I'm trying to achieve the following in a Rx command: 1) Get all locations & their cached temperatures from db. 2) For every missing cached temp, fetch temp from api. 3) Aggregate temps & return. I did #1, leaving me with a mixed bag of areas with cached temperatures and areas needing api fetches. • How do I split this stream into 2? • How do I then aggregate the results into 1 list after the stream needing api fetches is done?
    g
    c
    • 3
    • 4
  • i

    iex

    12/02/2020, 2:41 PM
    I have 2 separate observables that emit respectively success and error for a specific request. I'm wrapping this in a new observable, that triggers the request and returns a
    Single
    with success/error state. Thinking about how to implement this. Something like
    val successObservable: Observable<SomeType>
    val errorObservable: Observable<AnotherType> // AnotherType can be mapped into a Throwable
    
    request.flatMap { 
        // First event of successObservable OR errorObservable. successObservable -> just(event), errorObservable -> error(error)
    }
    Not sure what operators to use here,
    zip
    with
    take(1)
    and
    startsWith(none)
    crossed my mind but it doesn't feel right...
    m
    • 2
    • 3
  • o

    oday

    12/03/2020, 9:47 PM
    how do I poll with Rx? I want to keep querying an endpoint until its content include some value, and I want to be querying it 2 seconds apart for example until that result is found
    b
    m
    • 3
    • 8
  • o

    oday

    12/08/2020, 10:31 AM
    purchaseFlowStore
                .getTransaction(transactionId)
                .takeUntil { it.state == "done" || it.state == "error" || it.state == "authorized" }
                .doOnError {
                    logger.debug("TransactionInfo error! $it")
                }
                .repeatWhen { Observable.timer(5, TimeUnit.SECONDS) }
                .viewModelSubscription({
                    logger.debug("TransactionInfo is ${it.state}")
    
                    when (it.state) {
                        "done", "authorized" -> {
                            getOrder()
                        }
                        "error" -> {
                            ShowRequestFailedDialog()
                        }
                    }
                }, {
                    logger.debug("TransactionInfo error! $it")
                })
    s
    • 2
    • 1
  • o

    oday

    12/08/2020, 12:29 PM
    there
    Observable.interval(2000, TimeUnit.MILLISECONDS)
                    .repeatUntil {
                        System.currentTimeMillis() - startTime > 10000
                        true
                    }
                    .subscribe {
                        purchaseFlowStore
                            .getTransaction(transactionId)
                            .viewModelSubscription({
                                when (it.state) {
                                    "done", "authorized" -> {
                                        getOrder()
                                    }
                                    "error" -> {
                                        showErrorDialog()
                                    }
                                }
                            }, {
                                showErrorDialog()
                            })
                    }
    m
    • 2
    • 3
  • m

    Maciek

    12/11/2020, 4:54 PM
    I've got a class that does some work and returns the completable. But the work must be sequential and synchronized. So if the work takes 10 seconds to complete and some other entity calls
    doWork
    during the work being done for the first entity I want it to be queued and wait for it to be finished. At first, I was trying to figure out some clever internal queue based on subject, but failed. Then I thought that simple semaphore will do the job just fine (but also will block the thread). Do you see any problems with such implementations or can it be achieved with similar simplicity with streams without thread blocking?
    class Foo {
    
    	private val sem = Semaphore(permit = 1)
    
    	fun doWork(): Completable = 
          Completable.fromCallable { sem.acquire() }
            .andThen { doWorkInternal() }
    	    .doOnError { sem.release() }
            .doOnComplete { sem.release() }
    }
    a
    s
    • 3
    • 2
  • s

    Sam

    01/20/2021, 7:09 AM
    Good morning everyone, I have to find a way to handle this logic : Step 1: get data from cache -> show on UI Step 2: call APIs and update cache Step 3: update UI with lasted Data -> so which way simplify can do it for updating UI? Thanks.
    r
    s
    • 3
    • 6
  • j

    Jintin

    01/27/2021, 3:52 PM
    hi guys, having a question on whether
    Single.just
    is a hot observable or not. I feel it's not but seems many people think it is. Is there any rule we can follow how to determine it?
    z
    • 2
    • 5
  • g

    Grant Park

    03/02/2021, 10:52 PM
    I have an observable that emits Timers. I also have an observable that emits Ints. How can I combine them such that an Int will be emitted followed by the timer? i.e. given an array of ints [1,2,3] and timers [2s, 4s, 3s] 1 (then wait 2 s) 2 (then wait 4 s) 3 (then wait 3 s)
    g
    • 2
    • 4
  • a

    Andy Gibel

    03/26/2021, 6:34 PM
    Given two streams A and B, I want to merge them such that the merge emits items from A only until B emits. At that point it emits B only. How can I do this? Thanks!
    e
    u
    • 3
    • 4
  • a

    Andy Gibel

    03/29/2021, 8:06 PM
    Is there a way to buffer stream outputs for some amount of time and return a list of emissions? Let's say I'm wrapping a callback from an API that calls the callback each time a device is found on the network. I'd like the client code to listen for events for N seconds and then return a SIngle<List<Device>> of elements detected in that time.
    u
    • 2
    • 2
  • m

    Matias Reparaz

    08/20/2021, 5:47 PM
    Hi, I have a
    fun getResources(): Single<Resources>
    and with that I need to do 2 things, 1️⃣ a flatMap with another call
    fun doSomething(r: Resources): Single<OtherThing>
    and 2️⃣ send a metric if some condition occurs
    fun sendMetric(r: Resources): Completable
    . I’m looking for something better than this:
    getResources()
        .doOnSuccess { 
          if (someCondition(it)) sendMetric(it).subscribe()
        }.flatMap { doSomething(it) }
    Note that if
    sendMetrics
    fails it’s not critical and I don’t want to interrupt the
    doSomething
    flow. Is there a more elegant way to do this?
    e
    • 2
    • 5
  • k

    Keith Mayoral

    05/06/2022, 12:05 AM
    Hey all, I’m trying to think of a way to have a RxJava2 operator that is something like
    Single.amb(sourcesList)
    which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to
    amb()
    which errors out immediately if the first source to complete errored out)
    e
    d
    • 3
    • 3
Powered by Linen
Title
k

Keith Mayoral

05/06/2022, 12:05 AM
Hey all, I’m trying to think of a way to have a RxJava2 operator that is something like
Single.amb(sourcesList)
which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to
amb()
which errors out immediately if the first source to complete errored out)
e

ephemient

05/08/2022, 7:18 AM
been ages since I've looked at rx (why not switch to kotlinx.coroutines?) and I haven't anything setup to test this, but but in principle this shouldn't be too difficult
rxjava2-single-amb.kt
d

devKshitijJain

06/04/2022, 6:24 PM
Can you not use ErrorResumeNext? I guess it continue with some default item if any of source throw error.
e

ephemient

06/04/2022, 9:54 PM
even if you do something like
Single.amb(sourcesList).retry()
or equivalent with
onErrorResumeNext
, one source failing cancels everything else that's currently in-flight, forcing them to be re-subscribed on retry/resume
View count: 7