Hi, I did search in this channel, and google, but ...
# coroutines
f
Hi, I did search in this channel, and google, but not found. SO my question is
Copy code
kotlin
//
    var subject = PublishSubject.create<Location>()

 subject
            .buffer(1L, SECONDS)
            .subscribe( {
            list ->
               list.lastOrNull()?.let {}
            }, {}, {})
The subscribe's on Next is called every 1 second, and it process only the first one. This is helpful that I have too many log and I just want print every seconds How I do it in coroutines? Thanks
d
So you have a stream of values and you only want the first item of a 1 second window?
f
yes
d
Copy code
flowOf(Location(), .....)
    .conflate()
    .delayEach(1000)
    .collect { lastItem ->
        // Do stuff
    }
Hold on, that's wrong. It gives last instead of first.
Copy code
flowOf(Location(), .....)
    .debounce(1000)
    .collect { firstItem ->
        // Do stuff
    }
That's better.
Copy code
var subject = PublishSubject.create<Location>()

subject.asFlow()
    .debounce(1000)
    .collect { firstItem ->
        // Do stuff
    }
This is immediately usable. 🙂
f
Thanks, I did use PublishSubject before, but I don't like the dispose
In the case of PublishSubject.asFlow case,
subject.onNext(location)
should works, is it correct?
The location is feed by other place, like
LocationListener
@Dominaezzz would you mind taking another look, thank you
d
If I'm being honest, I don't know too much about Rx.
One sec, I'll do some googling.
Ah! You need a
BroadcastChannel
!
Copy code
val subject = BroadcastChannel()

// This is a subscription.
subject.asFlow()
    .debounce(1000)
    .collect { firstItem ->
        // Do stuff
    }

// This is `onNext`
subject.send(location)
👍 1
Why don't you like the dispose?
f
boilerplate
Cool. The broadcastChannel is exactly what I'm looking for. Thank you
d
Don't forget to call
subject.close()
once/if you're done publishing. 😅
f
😅 Okay, so they are some in the sense of need to manual release
d
It's not strictly required but it's good practice if you're able to do it.
g
You don’t need broadcast channel for this, it’s just implementation detail of source, what is your source of events? Do you have sample of you code which you want improve? Also debounce() is wrong operator for this, of you have stream of events which emit more often than your denounce delay it will never emit
So could you show example of your existing code and semantics which you want, essentially you want not more than 1 event per N seconds, right?
f
@gildor sorry, was in a meeting.
Copy code
private val sensorListener = object : SensorEventListener {
        override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {
        }

        override fun onSensorChanged(event: SensorEvent?) {

            event?.let {
                localScope.launch {
                    subject.send(it)
                }
            }
        }
    }
Copy code
localScope.launch {
            subject.asFlow()
                .debounce(1000)
                .collect { event ->
                    logger.debug("ELDServiceDaemon accelerometer sensor ${event?.values?.get(0)}" +
                        "${event?.values?.get(1)} ${event?.values?.get(2)}")
                }
        }
sensorListener is the source of event. Because sensor events are way too many(more than ten per second), I want to throttle it and process one per second
g
What is
subject
? Looks like a channel
see, debounce is wrong choice in this case, if your subject.asFlow() emits alwas faster than 1 element per 1000 you will get no events
You need
sample(1000)
instead
But it also not the same as your Rx sample, which not drop events, but buffer them and emit as collection
This is why I asked about required semantics
f
😂 Yes, I got no event, so I changed it back to rxjava way
cool. let me try sample
g
back to rxjava way
Could you elaborate, do you use both flow and rxjava?
f
```
// rxjava way
Copy code
private val subject = PublishSubject.create<SensorEvent>()
//....
disposable.add(subject
                .buffer(1L, TimeUnit.SECONDS)
                .subscribe { events ->
                    logger.debug("ELDServiceDaemon accelerometer sensor ${event[0]?.values?.get(0)}" +
                        "${event[0]?.values?.get(1)} ${event[0]?.values?.get(2)}")
                })

// ...
        override fun onSensorChanged(event: SensorEvent?) {
                subject.onNext(it)
}
g
But it’s exactly the same. it debounce and drop events
f
// flow way
Copy code
private val flowSubject = BroadcastChannel<SensorEvent>(1000)
///...
localScope.launch {
            flowSubject.asFlow()
                .sample(1000)
                .collect { event ->
                    logger.debug("ELDServiceDaemon flowSubject accelerometer sensor ${event?.values?.get(0)}" +
                        "${event?.values?.get(1)} ${event?.values?.get(2)}")
                }
        }
//... 
localScope.launch {
                    flowSubject.send(it)
                }
g
also you don’t need BroadcastChannel
f
yes, I failed to explain my semantic. I don't need save all events as array. It's good to drop others.
g
better to wrap callback to Flow (or to Observable, depends on what is better for you) directly, it will be more safe, no need to close and expose global state
okay, your Rx snippet is also wrong, you will get no events if source is faster than debounce
f
Yep, I forget replace the debouce, what I used is buffer
updated
Copy code
better to wrap callback to Flow (or to Observable, depends on what is better for you) directly, it will be more safe, no need to close and expose global state
regarding this, could you pls elaberate more or a link to an article
g
Rx also has
sample()
operator
❤️ 1
it shows how to wrap callback to flow using callbackFlow builder
f
THANKS.
sample is way better than buffer in my case
Thanks @gildor @Dominaezzz, it works now