Andrea
02/23/2022, 11:43 PMdata class HomeUiState(
val users: MutableList<User>
)
class HomeViewModel(
) : ViewModel(){
private val repository: Repository = Repository()
private val viewModelState = MutableStateFlow(HomeUiState(mutableListOf()))
val uiState: StateFlow<HomeUiState> = viewModelState
.stateIn(
viewModelScope,
SharingStarted.Eagerly,
viewModelState.value
)
val userId = MutableStateFlow(1)
// 1 solutions: Keep the list out the UIState and collect it in compose with collectAsState
val userList = userId.flatMapLatest {
repository.getUsersList()
}.stateIn(
viewModelScope,
SharingStarted.WhileSubscribed(5000),
emptySet()
)
// 2 solution, flatMapLatest for the filtered list
init {
userId.flatMapLatest {
repository.getUsersList(it)
}.onEach { users ->
viewModelState.update {
it.copy(users = users) }
}.launchIn(viewModelScope)
}
}
expensivebelly
03/01/2022, 7:14 PM@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E>
to SharedFlow
?Michael Marshall
03/03/2022, 4:12 AMFlow<UnmappedValue>
into a Map<SomeKey, Flow<MappedValue>>
? Like the opposite of merge
? Maybe explode
or subFlows
or something like that?Colton Idle
03/08/2022, 12:43 AMvar queryText = MutableStateFlow("")
init {
viewModelScope.launch {
myState.queryText.debounce(250).collect {
search(it)
}
}
}
private fun search(term: String) {
viewModelScope.launch {
val result = service.searchApiCall(term)
...
Now that I think about it. I probably want my old network calls to be cancelled too. Can anyone point me in the right direction about how I should rethink this?miqbaldc
03/15/2022, 5:02 AMsingle
is equivalent to first
?
There’s seems a weird issue in my part. When using single
it returns null
, but exist otherwise with first
Nicolai
03/17/2022, 10:54 AMsignupUseCase.signUp(createPostRegister(email, password))
.flatMapConcat {
when (it) {
is NetworkResult.Success -> loginUseCase.login(
createPostToken(
email,
password
)
)
is NetworkResult.Error -> {
it.rbError?.let { error -> handleErrors(error) }
emptyFlow()
}
else -> emptyFlow()
}
}
.flatMapConcat {
when (it) {
is NetworkResult.Success -> {
prefManager.saveString(AppConstants.TOKEN_STORAGE, it.data)
prefManager.saveString(AppConstants.EMAIL_STORAGE, email)
prefManager.savePasswordEncrypted(password)
updateCountryUseCase.updateCountry()
return@flatMapConcat patchSelfUseCase.patchSelf(name, ApiConstants.fullName)
}
else -> emptyFlow()
}
}.flatMapConcat {
when (it) {
is NetworkResult.Success -> getSelfUseCase.getSelf()
is NetworkResult.Error -> {
it.rbError?.let { error -> handleErrors(error) }
emptyFlow()
}
else -> emptyFlow()
}
}.flatMapConcat {
when (it) {
is NetworkResult.Success -> {
it.data?.let { self -> instance.setSelf(self) }
legalRevisions.value?.let { revisions ->
return@flatMapConcat putLegalAcceptUseCase.putLegalAccept(revisions)
} ?: run {
return@flatMapConcat emptyFlow()
}
}
is NetworkResult.Error -> {
it.rbError?.let { error -> handleErrors(error) }
emptyFlow()
}
else -> emptyFlow()
}
}.collect { signupCompleted.postValue(true) }
Matthew Laser
03/24/2022, 5:48 PMcombine
operator, but it’s not getting me quite the result I’m after.
Here’s an example of what I’ve got now
flowOne()
.combine(flowTwo, { a, b -> Pair(a,b) })
.onEach { ... }
While the actual values of the Pair
i’m creating are correct, as far as I can tell, the values are only updated when flowOne
emits a value, only then combining with the latest value from flowTwo
. Is there an elegant operator (or approach) I can use to propagate my Pair
immediately when either flowOne
or flowTwo
update, not just flowOne
?
update: turns out I combine
is what I want! except, rather than flowOne.combine(flowTwo)
, I need combine(flowOne, flowTwo)
:)Alexander Maryanovsky
03/24/2022, 6:22 PMshareIn(scope.newChildScope())
and closing with scope.coroutineContext.cancelChildren()
Ryosuke Yamada
03/28/2022, 4:13 PMFlow
or StateFlow
.
Many codes expose Flow
in repository, and convert to StateFlow
in ViewModels like this.
val userState: StateFlow<User?> = userRepository.userFlow.stateIn(viewModelScope, Eagerly, null)
This case always show null state on UI started, even if repository has initial value.
This problem may be solved by exposing StateFlow
instead of regular Flow
in repository, is it bad practice?Arun Joseph
03/28/2022, 6:56 PMAlexander Maryanovsky
03/29/2022, 8:06 AMrcd27
04/02/2022, 3:12 PMshareIn
functionality. Trying to share the flow emitions betwenn 3 subscribers, but only one of them (first) is collecting data:
@Test
fun testSharedFlowEmitions() = runBlocking {
var tick = 0
val f: Flow<Int> = flow {
while (tick < 10) {
emit(tick)
tick++
}
}
val sharedFlow: SharedFlow<Int> = f.shareIn(this, SharingStarted.Eagerly, replay = 1)
val firstSubscriber = sharedFlow.collect { Truth.assertThat(it).isGreaterThan(0) }
val secondSubscriber = sharedFlow.collect { Truth.assertThat(it).isGreaterThan(0) }
val thirdSubscriber = sharedFlow.collect { Truth.assertThat(it).isGreaterThan(0) }
}
Need help so that all three get ticks.Arun Joseph
04/03/2022, 12:51 PMSharedFlow
only to emit to its first subscriber?Mikael Alfredsson
04/07/2022, 9:32 AMdarkmoon_uk
04/07/2022, 2:38 PMObservable.combineLatest(...)
with n Flows to combine? There has to be a better way than this 👉 🧵julian
04/07/2022, 8:59 PMamb/race
where I don't allow myself to use channels directly. Or, to put it differently, I use SharedFlow
instead of channels.
It runs fine except that final flow collection suspends indefinitely, even after the jobs used to share the losing flows have completed and the winning flow has emitted all its items.
Why?
I spent a couple days trying to figure out what might be going on, without success.
Here's a gist of my work, including the code I'm using to test it, and the output generated.
You'll see from the output that the flow correctly emits the exception thrown by the first flow, and cancels the third flow because it emits last (so it loses the race), and all the items in the winning flow are emitted.
This is a learning exercise, so I'm interested in the shortest path to getting this implementation to work. I'm not looking for a completely different implementation that works.
Also happy to create a project on Github with dependencies (I use Arrow's Either data-type) if anyone's interested in running the code directly. Let me know.
Thank you!Mohamed Ibrahim
04/11/2022, 8:07 PMObservable.compose()
in Kotlin Flow, so we don’t break the chain,
the idea is to offer only callbacks to consumer code, and internally I’m using Stateflow
and SharedFlow
. in Rxjava I was using Observable.compose()
with Rx transformers.Greg Rynkowski
04/11/2022, 8:25 PMenighma
04/14/2022, 9:05 AMSam Stone
04/15/2022, 2:45 AMflatMapMerge{}
? Android SQLite has a limit of 999 params per query, so I window
my params by 999, call asFlow
, then flatMapMerge{ids->db.flowOfEntities(ids)}
. Is that wrong?Victor Collod
04/15/2022, 2:53 PMNick Williams
04/17/2022, 10:51 AMfun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
trySendBlocking(value)
.onFailure { throwable ->
// Downstream has been cancelled or failed, can log here
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
/*
* Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose {
require(isClosedForSend) // <--- THIS FAILS
api.unregister(callback)
}
}
Sam Stone
04/18/2022, 4:02 AMList<Y>.flatMapMerge { channelFlow { send(it.toX()) } }.buffer()
while preserving concurrency?Sam Stone
04/24/2022, 5:21 PMflow2
being called in flow1.map{ flow2(it) }.flattenConcat()
?
flow1
is a MutableStateFlow<MutableSet<Int>>
of db entity IDs, flow2
is a Flow of lists of entities, which is updated when any of the flow's entities' properties change. I am updating the properties and then calling flow1.emit(set)
, but only flow2.onEach
is being called, not flow1.map{}
. Any suggestions?Mohamed Ibrahim
04/27/2022, 9:19 PMMutableSharedFlow<T>
?Landry Norris
05/20/2022, 6:50 PMAlexander Maryanovsky
05/23/2022, 8:42 AMcombine(flows, transform)
not call transform
if flows
is empty? Is that intended? Seems like it would be more natural to call it with an empty array.Alexander Maryanovsky
05/30/2022, 3:06 PMFlow{ fun collect() }
and Flow.collect
(an extension function) needs to have a stern talking to.AmrJyniat
06/01/2022, 9:12 AMsharedFlow
and later return the value that I want, Do Reply
help me in this task?AmrJyniat
06/06/2022, 2:10 PMdistinctUntilChanged()
default behavior when using shareIn()
and StateIn()
?