expensivebelly
07/21/2021, 11:17 AMObservable.concatArrayEager(...)
in Flow?alexfacciorusso
07/26/2021, 8:45 AMMyThing1(val id: String)
I have another data class which we don’t care about its content, but let’s call it MyThing2
I have a Flow, that returns a list of MyThing1 fun myEmitter1(): Flow<List<MyThing1>>
I have another Flow that takes in input an ID and returns a flow of objects, so fun myEmitter2(id: String): Flow<MyThing2>
For each MyThing1 received from the flow, I want to combine it with the latest MyThing2 that gets emitted by myEmitter2
and return a Flow<List<CombinedThing>>
given the CombinedThing is data class CombinedThing(val myThing1: MyThing1, val myThing2: MyThing2)
.
Any thought for doing it efficiently?Arpan Sarkar
07/31/2021, 8:18 PMLiveData
to how can i convert the MediatorLiveData
part to StateFlow
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
import androidx.lifecycle.MutableLiveData
abstract class InputFrom {
private val _isFromValid = MediatorLiveData<Boolean>()
val isFromValid: LiveData<Boolean>
get() = _isFromValid
abstract val fields: List<LiveInputField<*>>
fun init() {
fields.forEach { inputField ->
_isFromValid.addSource(inputField.value) { changedValue ->
inputField.validate(changedValue)
_isFromValid.value = fields.all {
it.isValid
}
}
}
}
inner class LiveInputField<T>(
private val errorMessage: String? = null,
private val predicate: (T?) -> Boolean
) {
val value = MutableLiveData<T>()
private val _error = MutableLiveData<String>()
val errorText: LiveData<String>
get() = _error
var isValid: Boolean = false
fun validate(value: Any?) {
@Suppress("UNCHECKED_CAST")
return if (predicate(value as? T)) {
_error.value = null
isValid = true
} else {
_error.value = errorMessage
isValid = false
}
}
}
}
Example Usages:
class LoginFrom : InputFrom() {
val username =
LiveInputField<String>("At least 4 characters, only letters, numbers, ., _ allowed.") {
!it.isNullOrBlank() &&
Pattern.compile("[a-zA-Z0-9_.]{4,}")
.matcher(it)
.matches()
}
val password =
LiveInputField<String>(
"8 characters or more, at least one number, one uppercase, one lowercase."
) {
!it.isNullOrBlank()/* &&
Pattern.compile("((?=.*\\d)(?=.*[a-z])(?=.*[A-Z]).{8,})")
.matcher(it)
.matches()*/
}
override val fields: List<LiveInputField<*>>
get() = listOf(
username,
password
)
}
Justin Tullgren
08/02/2021, 1:15 PMroot flow: OperationStart ------ AnotherOperation ------ OperationCancel -------- AnotherBOperation
next flow: OperationStart, perform long running action while !OperationCancel
Marcin Wisniowski
08/11/2021, 10:12 PMdebounce()
and distinctUntilChanged()
. I want to debounce elements if they are the same. So A B B C D
in quick succession will become A B C D
. I can't just use distinctUntilChanged()
, because I want to allow sending the same element twice, as long as it's not in quick succession. How would I achieve that?CLOVIS
08/22/2021, 2:48 PMval flow = myFlow()
flow.collect {
if (it is Success) {
// I found the value I was waiting for,
// stop collecting values and go back to executing the function
}
}
Marek Kubiczek
08/23/2021, 3:46 PMviewLifecycleOwner.lifecycleScope.launch {
val result = someFlow.first()
doSomethingInUI(result)
}
where doSomethingInUI
is not a suspend
function.
What will happen if scope is destroyed/cancelled while there is ongoing operation someFlow.first()
(eg. database read). Is first()
cancellable? Will it exit the launch block gracefully or do I need to always call ensureActive()
before a call to non suspended function in such case?
The same stands for
someFlow.onEach {
doSomnethinginUI(it)
}.launchIn(viewLifecycleOwner.lifecycleScope)
Is there any guarantee onEach
won’t be called after scope is destroyed/cancelled?Chris Fillmore
08/25/2021, 6:24 PMMutableStateFlow.compareAndSet()
, but with a `sealed interface`:
sealed interface State {
object NotConnected : State
class Connected(val connection: SomeConnection) : State
}
val stateFlow = MutableStateFlow<State>(State.NotConnected)
...
// Obviously doesn't work, can't use State.Connected in this way
stateFlow.compareAndSet(State.Connected, State.NotConnected)
darkmoon_uk
08/26/2021, 7:55 AM.filterNotNull()
Flow
operatorSemyon Zadorozhnyi
08/26/2021, 12:44 PMChris Fillmore
08/30/2021, 5:32 PMMutableStateFlow.compareAndSet(expect, update)
where update
is evaluated lazily?Alex
08/31/2021, 10:39 AM.map
a StateFlow
and have it still be a StateFlow
? Why the conversion to a regular Flow
?Chris Fillmore
09/01/2021, 1:06 AMStateFlow.value
very often? For example, every 15-30 ms. I realize I could just keep a reference, but I worry about it getting stale. Thanks for any insightAlexander Black
09/02/2021, 12:29 AMval scope = MainScope() + Job()
val scope2 = MainScope() + Job()
val someEmitter = MutableStateFlow("")
fun test3()= runBlocking {
scope.launch {
someEmitter.emit("1")
someEmitter.emit("2")
someEmitter.emit("3")
}
scope2.launch {
someEmitter.collect {
println(it)
}
}
}
why does scope2 collect operation get blocked by scope? do they have to be on different threads? Am I missing something? I’ve gotten this type of thing to work before, but only by using another thread as the collector.Marc Knaup
09/10/2021, 6:25 PMFlow.buffer()
there’s BufferOverflow
.SUSPEND
, .DROP_OLDEST
and .DROP_LATEST
.
Is there a way to fail if the buffer is full and stop the collector?Simon Lin
09/14/2021, 3:47 AMInstead of MutableLiveData with default value to MutableStateFlow
Instead of MutableLiveData without default value to MutableSharedFlow
Instead of SingleLiveEvent to Channel.receiveAsFlow
Max Kachinkin
09/15/2021, 5:07 PMedenman
09/17/2021, 2:10 AMflowOn
that takes a CoroutineScope
? i basically want a way to have a flow { }
block that runs inside an existing scopeChris Fillmore
09/21/2021, 3:28 PMpipe
function (which I just made up) available in the coroutines library? (Sends emissions from one Flow
to another FlowCollector
) :
suspend fun <T, U> Flow<T>.pipe(destination: FlowCollector<U>, transform: (T) -> U) {
collect {
destination.emit(transform(it))
}
}
It feels so obvious but I’ve been looking at the available extension functions and haven’t spotted it. Thanks!Hank
09/24/2021, 2:03 AMSimon Lin
09/29/2021, 3:59 AMCoroutineExceptionHandler
in flow?
for example
val flow = flow { emit("data") }
.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), 1)
Chris Fillmore
10/04/2021, 9:00 PMdata class MyDomainObject(
val id: String,
val enabled: StateFlow<Boolean>
)
val myDomainObjects: StateFlow<List<MyDomainObject> = /* This data comes from somewhere */
// I would like a list of objects which have `enabled.value == true`, and
// I would like this to be updated any time any `enabled` state changes
val enabledObjects: Flow<List<MyDomainObject> = myDomainObjects.transform { objects ->
combine(*objects.map { it.enabled }.toTypedArray()) { enabledStates ->
emit(objects.filterIndexed { index, _ -> enabledStates[index] })
}
}
The implementation seems slightly wrong, the way I’m using index
to emit objects which are enabled. But I don’t see another way to do this. Any advice? Thanks in advance.Mohamed Ibrahim
10/07/2021, 1:38 PMprivate fun SearchScreenEvent.eventToUsecase(): Flow<SearchState> {
return when (this) {
is SearchClicked -> searchUsecase(this.query)
is SearchQueryChanged ->
flowOf(this.query)
.debounce(5000)
.flatMapConcat { searchUsecase(this.query) }
}
}
why debounce is not working hereMatti MK
10/07/2021, 1:45 PMprivate val networkResponse = onContinueClicked.filter { it }.zip(
combine(
email.filter { it.count() >= MIN_INPUT_LENGTH },
password.filter { it.count() >= MIN_INPUT_LENGTH })
{ e, p -> Pair(e, p) }
) { _, cred ->
val (user, pass) = cred
repository.login(username = user, password = pass)
}
I’m playing around with flows: aim is to do a simple login network request. email
and password
are MutableStateFlow
, this works fine except I’d like to emit a Loading
value before the second to last line. The login
call does not return a flow.
I think I’m missing the use of the correct operator here in place of zip
or I should have my repository return a flow (and startWith
a given state in the repo).parth
10/13/2021, 2:44 PMMutableStateFlow
. Looks something like this:
fun whatever() {
viewModelScope.launch {
inputFlow
.map {...}
.collect { value ->
mutableStateFlow.value = value
}
}
}
(Assume that I need to push other values into the MSF
outside this collect
block)
One issue I’ve run into with the above construction is that when the observer of the MSF
cancels collection (e.g. the Fragment observing the ViewModel falls off the screen), it doesn’t cancel this intermediate collection…this makes absolute sense — that cancellation signal is not being propagated upstream!
So digging into the stateIn
sources, I’ve come up with what I think is a way to push the cancellation “upstream” — can y’all give thoughts/comments/suggestions on the following approach?
fun <T> Flow<T>.stateInto(downstream: MutableStateFlow<T>, scope: CoroutineScope) {
val upstream = this
scope.launch {
downstream.subscriptionCount
.mapLatest { count -> //this is a simplified version of [StartedWhileSubscribed#command]
if (count > 0) true
else {
delay(800.milliseconds)
false
}
}
.dropWhile { active -> !active }
.distinctUntilChanged()
.collectLatest { active ->
when (active) {
true -> upstream.collect(downstream) //will be cancelled upon new emission
false -> Unit /*just cancel and do nothing*/
}
}
}
}
dephinera
10/25/2021, 9:25 AMalthaf
11/01/2021, 2:45 PMvar button = document.querySelector('.button');
var label = document.querySelector('h4');
var clickStream = Rx.Observable.fromEvent(button, 'click');
var doubleClickStream = clickStream
.bufferWhen(() => clickStream.debounceTime(250))
.map(arr => arr.length)
.filter(len => len === 2);
doubleClickStream.subscribe(event => {
label.textContent = 'double click';
});
doubleClickStream
.delay(1000)
.subscribe(suggestion => {
label.textContent = '-';
});
Diego
11/03/2021, 8:18 PMdata class UserDTO(val id: String)
data class PetDTO(val name: String)
interface Api {
suspend fun users(): List<UserDTO>
suspend fun pets(userId: String): List<PetDTO>
}
data class User(
val id: String,
val pets: List<Pet>
)
data class Pet(val name: String)
class UsersRepository(api: Api) {
val users: Flow<List<User>> = TODO()
}
In RxJava I would do something like this:
val users: Observable<List<User>> = api.users()
.flatMapIterable { it }
.concatMap { userDto ->
api.pets(userDto.id)
.map { petsListDto ->
User(userDto.id, petsListDto.map { Pet(it.name) })
}
}.toList()
How can I implement UsersRepository
and return the list of `User`s using Kotlin Flow?Daniel Ryan
11/08/2021, 12:30 PMdarkmoon_uk
11/09/2021, 2:02 AMcallbackFlow { ... }
if we throw a plain Exception
this halts the VM immediately, and is not caught as an in-Flow
`.catch`able error, while throwing a RuntimeException
is `.catch`able.
While this isn't so surprising given the conventional relationship between Exception
and RuntimeException
, I can't find this documented in respect to Flows - should it be? Is it?darkmoon_uk
11/09/2021, 2:02 AMcallbackFlow { ... }
if we throw a plain Exception
this halts the VM immediately, and is not caught as an in-Flow
`.catch`able error, while throwing a RuntimeException
is `.catch`able.
While this isn't so surprising given the conventional relationship between Exception
and RuntimeException
, I can't find this documented in respect to Flows - should it be? Is it?ephemient
11/09/2021, 2:17 AMdarkmoon_uk
11/09/2021, 2:46 AM