alex.krupa
05/27/2018, 3:18 PMChannel
that works like RxJava Observable.sample()
operator?
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#sample-io.reactivex.ObservableSource-boolean-
Use case:
I have a pair of 2 suspend
functions: write()
and read()
. Under the hood writing fires an action and the action's result gets pushed to a Channel
that I consume using receive()
in read()
. I could as well combine them into a writeForResult()
that just returns the value, but I don't think it makes a difference here.
Problem is that while I can invoke write()
many times in a short period (let's say 100/second), read()
results are delayed by ~100ms each (so 10/second) in their Channel
(which I consume).
Example steps:
1. Call write()
and read()
one after the other.
2. Repeat above BEFORE ~100ms pass - write()
called before read()
in 1 returned a value, second read()
should return null
immediately.
3. Repeat above AFTER ~100ms pass - write()
called after read()
in 1 returned a value, second read()
should return a normal non-null value.
I guess I could achieve something similar with `wait()`/`notify()` on a Java object, but I'm trying to solve it using `Channel`s. And as in my initial question - Observable.sample()
seems perfect for this case, but I don't want to import Rx just for this one issue.
I tried various experiments with a proxy channel that uses ConflatedChannel
(which always holds the latest value). There's also a thing called Mutex
, but I'm not sure if it's right to use here.
Any other ideas?Vsevolod Tolstopyatov [JB]
05/28/2018, 8:07 AMsample
operator can be implemented using select
with onReceive
on two channels. Our idea is not to introduce dozens of Rx-like operators, but to provide building blocks which can be used to build any operator without much complexityalex.krupa
05/28/2018, 6:28 PMChannel
, a regular collection and simple lock/flag was enough. But I'll still check what's possible with your suggestion. 🙂