fangzhzh
08/07/2019, 9:44 PMkotlin
//
var subject = PublishSubject.create<Location>()
subject
.buffer(1L, SECONDS)
.subscribe( {
list ->
list.lastOrNull()?.let {}
}, {}, {})
The subscribe's on Next is called every 1 second, and it process only the first one. This is helpful that I have too many log and I just want print every seconds
How I do it in coroutines? ThanksDominaezzz
08/07/2019, 9:56 PMfangzhzh
08/07/2019, 9:56 PMDominaezzz
08/07/2019, 9:58 PMflowOf(Location(), .....)
.conflate()
.delayEach(1000)
.collect { lastItem ->
// Do stuff
}
flowOf(Location(), .....)
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
var subject = PublishSubject.create<Location>()
subject.asFlow()
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
This is immediately usable. 🙂fangzhzh
08/07/2019, 10:06 PMsubject.onNext(location)
should works, is it correct?LocationListener
Dominaezzz
08/07/2019, 10:14 PMBroadcastChannel
!val subject = BroadcastChannel()
// This is a subscription.
subject.asFlow()
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
// This is `onNext`
subject.send(location)
fangzhzh
08/07/2019, 10:21 PMDominaezzz
08/07/2019, 10:22 PMsubject.close()
once/if you're done publishing. 😅fangzhzh
08/07/2019, 10:25 PMDominaezzz
08/07/2019, 10:42 PMgildor
08/08/2019, 1:00 AMfangzhzh
08/08/2019, 1:31 AMprivate val sensorListener = object : SensorEventListener {
override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {
}
override fun onSensorChanged(event: SensorEvent?) {
event?.let {
localScope.launch {
subject.send(it)
}
}
}
}
localScope.launch {
subject.asFlow()
.debounce(1000)
.collect { event ->
logger.debug("ELDServiceDaemon accelerometer sensor ${event?.values?.get(0)}" +
"${event?.values?.get(1)} ${event?.values?.get(2)}")
}
}
gildor
08/08/2019, 1:47 AMsubject
? Looks like a channelsample(1000)
insteadfangzhzh
08/08/2019, 1:52 AMgildor
08/08/2019, 1:55 AMback to rxjava wayCould you elaborate, do you use both flow and rxjava?
fangzhzh
08/08/2019, 1:55 AMprivate val subject = PublishSubject.create<SensorEvent>()
//....
disposable.add(subject
.buffer(1L, TimeUnit.SECONDS)
.subscribe { events ->
logger.debug("ELDServiceDaemon accelerometer sensor ${event[0]?.values?.get(0)}" +
"${event[0]?.values?.get(1)} ${event[0]?.values?.get(2)}")
})
// ...
override fun onSensorChanged(event: SensorEvent?) {
subject.onNext(it)
}
gildor
08/08/2019, 1:57 AMfangzhzh
08/08/2019, 1:57 AMprivate val flowSubject = BroadcastChannel<SensorEvent>(1000)
///...
localScope.launch {
flowSubject.asFlow()
.sample(1000)
.collect { event ->
logger.debug("ELDServiceDaemon flowSubject accelerometer sensor ${event?.values?.get(0)}" +
"${event?.values?.get(1)} ${event?.values?.get(2)}")
}
}
//...
localScope.launch {
flowSubject.send(it)
}
gildor
08/08/2019, 1:57 AMfangzhzh
08/08/2019, 1:58 AMgildor
08/08/2019, 1:58 AMfangzhzh
08/08/2019, 2:00 AMbetter to wrap callback to Flow (or to Observable, depends on what is better for you) directly, it will be more safe, no need to close and expose global state
regarding this, could you pls elaberate more or a link to an articlegildor
08/08/2019, 2:01 AMsample()
operatorfangzhzh
08/08/2019, 2:03 AM