https://kotlinlang.org logo
#flow
Title
# flow
a

althaf

11/01/2021, 2:45 PM
Hi i'm having a hard time converting this RxJS code to Kotlin flow Some how in java script after the buffer we are getting the stream events as array , however in flow i'm not receiving it has an array of events. Is this because buffer() behaves differently from bufferWhen in RxJS ?
var button = document.querySelector('.button');
var label = document.querySelector('h4');
var clickStream = Rx.Observable.fromEvent(button, 'click');
var doubleClickStream = clickStream
  
.bufferWhen(() => clickStream.debounceTime(250))
  
.map(arr => arr.length)
  
.filter(len => len === 2);
doubleClickStream.subscribe(event => {
  
label.textContent = 'double click';
});
doubleClickStream
  
.delay(1000)
  
.subscribe(suggestion => {
    
label.textContent = '-';
  
});
n

Nick Allen

11/01/2021, 11:10 PM
Those methods are not remotely similar.
Flow.buffer
affects back-pressure. RxJs's bufferWhen, groups items together. I think you'll need to build your own operator, something like:
Copy code
private object MySignal
fun <T, S> Flow<T>.myBufferWhen(signal: Flow<S>): Flow<List<T>> = callbackFlow<List<T>>{
    val merged = merge(this as Flow<Any?>, signal.map { MySignal})
    var nextList = mutableListOf<T>()
    merged.collect {
        if (it is MySignal) {
            if (nextList.isNotEmpty()) {
                send(nextList)
                nextList = mutableListOf<T>()
            }
        } else {
            nextList.add(it as T)
        }
    }
    if (nextList.isNotEmpty()) {
        send(nextList)
    }
}
(not tested, just typed this out in slack)
👍 1
28 Views