agta1991
elizarov
fun <T1, T2, R : Any> combineLatest(flow1: Flow<T1>, flow2: Flow<T2>, block: suspend (T1, T2) -> R): Flow<R> = channelFlow { val latest = atomic<Pair<Data<T1?>, Data<T2?>>>(Data.NotAvailable to Data.NotAvailable) launch { flow1.collect { value -> val (t1, t2) = latest.updateAndGet { (_, t2) -> Data.Available(value) to t2 } if (t1 is Data.Available && t2 is Data.Available) { if (t1.value != null && t2.value != null) { send(block(t1.value, t2.value)) } } } } launch { flow2.collect { value -> val (t1, t2) = latest.updateAndGet { (t1, _) -> t1 to Data.Available(value) } if (t1 is Data.Available && t2 is Data.Available) { if (t1.value != null && t2.value != null) { send(block(t1.value, t2.value)) } } } } } sealed class Data<out T> { data class Available<out T>(val value: T) : Data<T>() object NotAvailable : Data<Nothing>() }
A modern programming language that makes developers happier.