agta1991
03/05/2020, 11:41 AMelizarov
03/05/2020, 11:47 AMagta1991
03/05/2020, 1:27 PMelizarov
03/05/2020, 1:27 PMagta1991
03/05/2020, 1:32 PMfun <T1, T2, R : Any> combineLatest(flow1: Flow<T1>, flow2: Flow<T2>, block: suspend (T1, T2) -> R): Flow<R> =
channelFlow {
val latest = atomic<Pair<Data<T1?>, Data<T2?>>>(Data.NotAvailable to Data.NotAvailable)
launch {
flow1.collect { value ->
val (t1, t2) = latest.updateAndGet { (_, t2) -> Data.Available(value) to t2 }
if (t1 is Data.Available && t2 is Data.Available) {
if (t1.value != null && t2.value != null) {
send(block(t1.value, t2.value))
}
}
}
}
launch {
flow2.collect { value ->
val (t1, t2) = latest.updateAndGet { (t1, _) -> t1 to Data.Available(value) }
if (t1 is Data.Available && t2 is Data.Available) {
if (t1.value != null && t2.value != null) {
send(block(t1.value, t2.value))
}
}
}
}
}
sealed class Data<out T> {
data class Available<out T>(val value: T) : Data<T>()
object NotAvailable : Data<Nothing>()
}