Timo Drick
09/02/2020, 2:26 PMfun <T> Flow<T>.stateList() = Flow2StateList(this)
sealed class FlowState<out T> {
object Loading : FlowState<Nothing>()
class Error(val error: Throwable): FlowState<Nothing>()
class Item<T>(val value: T): FlowState<T>()
}
class Flow2StateList<T>(
private val flow: Flow<T>
): CoroutineScope {
private val job = Job()
override val coroutineContext = Dispatchers.Main + job
val list: SnapshotStateList<FlowState<T>> = mutableStateListOf()
private val requestItemsChannel = Channel<Int>(Channel.CONFLATED)
private val retryChannel = Channel<Boolean>(Channel.CONFLATED)
private var requestedListSize = 0
init {
launch {
log("$this - Start collecting for $flow")
if (requestedListSize <= 0) {
requestedListSize = requestItemsChannel.receive()
}
list.add(FlowState.Loading) // set last list item to loading
var inProgress = true
flow.buffer(5)
.retryWhen { cause, _ ->
log("Retry", cause)
if (inProgress) {
list.removeLast()
}
list.add(FlowState.Error(cause)) // Add error item to end of list
inProgress = true
return@retryWhen retryChannel.receive()
}.collect {
if (inProgress) {
list.removeLast()
inProgress = false
}
list.add(FlowState.Item(it))
if (list.size >= requestedListSize) {
log("Wait until new items needed. List size: ${list.size} requested: $requestedListSize")
requestedListSize = requestItemsChannel.receive()
}
}
log("$this - collection ready")
}
}
/**
* Request more items to consume from the flow
*/
internal fun requestItems(count: Int) {
if (job.isActive) {
launch {
requestItemsChannel.send(list.size + count)
}
}
}
fun retry() {
launch {
retryChannel.send(true)
}
}
protected fun finalize() {
job.cancel()
requestItemsChannel.cancel()
log("$this - Channel and job canceled")
}
}