agta1991
03/05/2020, 11:41 AMelizarov
03/05/2020, 11:47 AMagta1991
03/05/2020, 1:27 PMelizarov
03/05/2020, 1:27 PMagta1991
03/05/2020, 1:32 PMagta1991
03/05/2020, 1:32 PMfun <T1, T2, R : Any> combineLatest(flow1: Flow<T1>, flow2: Flow<T2>, block: suspend (T1, T2) -> R): Flow<R> =
    channelFlow {
        val latest = atomic<Pair<Data<T1?>, Data<T2?>>>(Data.NotAvailable to Data.NotAvailable)
        launch {
            flow1.collect { value ->
                val (t1, t2) = latest.updateAndGet { (_, t2) -> Data.Available(value) to t2 }
                if (t1 is Data.Available && t2 is Data.Available) {
                    if (t1.value != null && t2.value != null) {
                        send(block(t1.value, t2.value))
                    }
                }
            }
        }
        launch {
            flow2.collect { value ->
                val (t1, t2) = latest.updateAndGet { (t1, _) -> t1 to Data.Available(value) }
                if (t1 is Data.Available && t2 is Data.Available) {
                    if (t1.value != null && t2.value != null) {
                        send(block(t1.value, t2.value))
                    }
                }
            }
        }
    }
sealed class Data<out T> {
    data class Available<out T>(val value: T) : Data<T>()
    object NotAvailable : Data<Nothing>()
}