alex.krupa
05/27/2018, 3:18 PMChannel that works like RxJava Observable.sample() operator?
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#sample-io.reactivex.ObservableSource-boolean-
Use case:
I have a pair of 2 suspend functions: write() and read(). Under the hood writing fires an action and the action's result gets pushed to a Channel that I consume using receive() in read() . I could as well combine them into a writeForResult() that just returns the value, but I don't think it makes a difference here.
Problem is that while I can invoke write() many times in a short period (let's say 100/second), read() results are delayed by ~100ms each (so 10/second) in their Channel (which I consume).
Example steps:
1. Call write() and read() one after the other.
2. Repeat above BEFORE ~100ms pass - write() called before read() in 1 returned a value, second read() should return null immediately.
3. Repeat above AFTER ~100ms pass - write() called after read() in 1 returned a value, second read() should return a normal non-null value.
I guess I could achieve something similar with `wait()`/`notify()` on a Java object, but I'm trying to solve it using `Channel`s. And as in my initial question - Observable.sample() seems perfect for this case, but I don't want to import Rx just for this one issue.
I tried various experiments with a proxy channel that uses ConflatedChannel (which always holds the latest value). There's also a thing called Mutex, but I'm not sure if it's right to use here.
Any other ideas?Vsevolod Tolstopyatov [JB]
05/28/2018, 8:07 AMsample operator can be implemented using select with onReceive on two channels. Our idea is not to introduce dozens of Rx-like operators, but to provide building blocks which can be used to build any operator without much complexityalex.krupa
05/28/2018, 6:28 PMChannel, a regular collection and simple lock/flag was enough. But I'll still check what's possible with your suggestion. 🙂