fangzhzh
08/07/2019, 9:44 PMkotlin
//
var subject = PublishSubject.create<Location>()
subject
.buffer(1L, SECONDS)
.subscribe( {
list ->
list.lastOrNull()?.let {}
}, {}, {})
The subscribe's on Next is called every 1 second, and it process only the first one. This is helpful that I have too many log and I just want print every seconds
How I do it in coroutines? ThanksDominaezzz
08/07/2019, 9:56 PMfangzhzh
08/07/2019, 9:56 PMDominaezzz
08/07/2019, 9:58 PMflowOf(Location(), .....)
.conflate()
.delayEach(1000)
.collect { lastItem ->
// Do stuff
}
Dominaezzz
08/07/2019, 9:58 PMDominaezzz
08/07/2019, 10:00 PMflowOf(Location(), .....)
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
Dominaezzz
08/07/2019, 10:00 PMDominaezzz
08/07/2019, 10:02 PMvar subject = PublishSubject.create<Location>()
subject.asFlow()
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
This is immediately usable. 🙂fangzhzh
08/07/2019, 10:06 PMfangzhzh
08/07/2019, 10:07 PMsubject.onNext(location)
should works, is it correct?fangzhzh
08/07/2019, 10:08 PMLocationListener
fangzhzh
08/07/2019, 10:12 PMDominaezzz
08/07/2019, 10:14 PMDominaezzz
08/07/2019, 10:15 PMDominaezzz
08/07/2019, 10:15 PMBroadcastChannel
!Dominaezzz
08/07/2019, 10:17 PMval subject = BroadcastChannel()
// This is a subscription.
subject.asFlow()
.debounce(1000)
.collect { firstItem ->
// Do stuff
}
// This is `onNext`
subject.send(location)
Dominaezzz
08/07/2019, 10:18 PMfangzhzh
08/07/2019, 10:21 PMfangzhzh
08/07/2019, 10:21 PMDominaezzz
08/07/2019, 10:22 PMsubject.close()
once/if you're done publishing. 😅fangzhzh
08/07/2019, 10:25 PMDominaezzz
08/07/2019, 10:42 PMgildor
08/08/2019, 1:00 AMgildor
08/08/2019, 1:04 AMfangzhzh
08/08/2019, 1:31 AMprivate val sensorListener = object : SensorEventListener {
override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {
}
override fun onSensorChanged(event: SensorEvent?) {
event?.let {
localScope.launch {
subject.send(it)
}
}
}
}
fangzhzh
08/08/2019, 1:31 AMlocalScope.launch {
subject.asFlow()
.debounce(1000)
.collect { event ->
logger.debug("ELDServiceDaemon accelerometer sensor ${event?.values?.get(0)}" +
"${event?.values?.get(1)} ${event?.values?.get(2)}")
}
}
fangzhzh
08/08/2019, 1:32 AMgildor
08/08/2019, 1:47 AMsubject
? Looks like a channelgildor
08/08/2019, 1:47 AMgildor
08/08/2019, 1:49 AMsample(1000)
insteadgildor
08/08/2019, 1:50 AMgildor
08/08/2019, 1:50 AMfangzhzh
08/08/2019, 1:52 AMfangzhzh
08/08/2019, 1:52 AMgildor
08/08/2019, 1:55 AMback to rxjava wayCould you elaborate, do you use both flow and rxjava?
fangzhzh
08/08/2019, 1:55 AMfangzhzh
08/08/2019, 1:56 AMprivate val subject = PublishSubject.create<SensorEvent>()
//....
disposable.add(subject
.buffer(1L, TimeUnit.SECONDS)
.subscribe { events ->
logger.debug("ELDServiceDaemon accelerometer sensor ${event[0]?.values?.get(0)}" +
"${event[0]?.values?.get(1)} ${event[0]?.values?.get(2)}")
})
// ...
override fun onSensorChanged(event: SensorEvent?) {
subject.onNext(it)
}
gildor
08/08/2019, 1:57 AMfangzhzh
08/08/2019, 1:57 AMprivate val flowSubject = BroadcastChannel<SensorEvent>(1000)
///...
localScope.launch {
flowSubject.asFlow()
.sample(1000)
.collect { event ->
logger.debug("ELDServiceDaemon flowSubject accelerometer sensor ${event?.values?.get(0)}" +
"${event?.values?.get(1)} ${event?.values?.get(2)}")
}
}
//...
localScope.launch {
flowSubject.send(it)
}
gildor
08/08/2019, 1:57 AMfangzhzh
08/08/2019, 1:58 AMgildor
08/08/2019, 1:58 AMgildor
08/08/2019, 1:58 AMfangzhzh
08/08/2019, 2:00 AMfangzhzh
08/08/2019, 2:00 AMfangzhzh
08/08/2019, 2:01 AMbetter to wrap callback to Flow (or to Observable, depends on what is better for you) directly, it will be more safe, no need to close and expose global state
regarding this, could you pls elaberate more or a link to an articlegildor
08/08/2019, 2:01 AMsample()
operatorgildor
08/08/2019, 2:02 AMgildor
08/08/2019, 2:03 AMfangzhzh
08/08/2019, 2:03 AMfangzhzh
08/08/2019, 2:03 AMfangzhzh
08/08/2019, 2:12 AM