Ansh Tyagi
11/09/2021, 7:31 PMDiego
11/17/2021, 4:05 PMval flowA: Flow<String>
suspend fun funB(): Int
How can I combine the result of both into a Flow?miqbaldc
11/22/2021, 6:41 AM.collect
-in flows inside .collect
works this ways?
Would love to know if there’s any suggestion/feedback to improve below snippet code ❤️
lifecycleScope.launchWhenCreated {
val uploadsBody = mutableListOf<File>()
files
.asFlow()
.map { originalFile ->
originalFile
.compressImageAsFlow(this)
.collect { compressedImageFile ->
uploadsBody.add(compressedImageFile)
}
uploads
}
.collect { result ->
viewModel.uploadFile(this, result)
}
}
joadar
11/22/2021, 5:56 PMprivate val _uiState = MutableStateFlow(DeveloperUiState(isLoading = true, sectionsItems = DeveloperSections()))
val uiState: StateFlow<DeveloperUiState> = _uiState.asStateFlow()
...
_uiState.update {
it.copy(
sectionsItems = it.sectionsItems.copy(
userSection = it.sectionsItems.userSection?.copy(
options = it.sectionsItems.userSection.options.copy(
userPremium = it.sectionsItems.userSection.options.userPremium.copy(
isSelected = isChecked,
summary = if (isChecked) "Yes" else "No"
)
)
)
)
)
}
Is there a way to reduce this path to edit a specific variable from an object?
Thanks for your inputs!Chris Fillmore
11/25/2021, 2:35 PMfun <T> Flow<T>.onChange(action: (oldValue: T, newValue: T) -> Unit): Flow<T>
rachael
11/30/2021, 5:42 PMjoadar
11/30/2021, 9:01 PMdateFlow.collectLatest { date ->
repository.getContentFromDate(date).collect { result ->
I can change the date so the content update.
But I have a favorite feature on the content.
On the default date, the favorite is working well (I collect the new value) but when I update the date, the collect doesn’t work on favorite events.
What am I doing wrong?
Thanks.expensivebelly
12/01/2021, 9:51 AMsuspend fun waitForNetworkConnection() = isConnectedFlow.filter { it }.single()
isConnectedFlow
is a StateFlow, initially false
(let’s assume we are in airplane mode in an Android app), then network connection is back, isConnectedFlow.value = true
, however, the waitForNetworkConnection
never ends and I don’t know why, I can see it only enters in the filter
once and the second emission is ignored? I’ve also tried
isConnectedFlow.first { it }
to no avail. Any help would be appreciatedSimon Lin
12/03/2021, 2:36 AMrefresh()
to update userFlow
(let emit(userRepository.getUser())
call again.)
How do I implement refresh()
?
// ViewModel
val userFlow = flow {
emit(userRepository.getUser())
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), replay = 1)
fun refresh() {
// How to reload user?
}
Mikael Alfredsson
12/09/2021, 7:26 AM.value
parameter so that I can query the parameter at any point in time? (sort of a stateful share flow 🙂 )Václav Škorpil
12/12/2021, 7:44 PMclass KeywordService(val keywordRepo: KeywordRepo) {
fun getKeywordData(
keywordId: UUID,
): Flow<String> {
val name = keywordRepo.getKeywordName() // suspend function
return keywordRepo.getKeywordData(name) // function that returns flow
}
}
Currently i am calling it with a flow builder, but i am not sure if this is correct.
class KeywordService(val keywordRepo: KeywordRepo) {
fun getKeywordData(
keywordId: UUID,
): Flow<String> = flow {
val name = keywordRepo.getKeywordName() // suspend function
val dataFlow = keywordRepo.getKeywordData(name) // function that returns flow
emitAll(dataFlow)
}
}
This way it feels little bit clunky, is there any better way to do it?Stylianos Gakis
12/13/2021, 8:56 AMStylianos Gakis
12/17/2021, 11:52 AM.collectLatest { //here }
I am not under a coroutine scope, but simply a suspend function, therefore I do not have access to CoroutineScope.async
. I should be able to do this somehow right?
ps. I’m using collectLatest with the idea that if a new value comes, the two async calls I am doing should cancel themselves since I will not be needing their result anymore, is this a correct assumption?John Aoussou
12/17/2021, 3:04 PMprivate val _stateFlow1 = MutableStateFlow(0)
Is there a way to define _stateFlow2
so that _stateFlow2.value = _stateFlow1.value * _stateFlow1.value
?darkmoon_uk
12/21/2021, 3:11 PMflatMap-
family operator that ignores new upstream emissions while a mapping operation is being processed (i.e. mapped stream is not completed)? How could this be achieved? I don't want emissions buffered as with concat
, but thrown away/ignored.elye
12/22/2021, 4:57 AMval _liveData = MutableLiveData(0)
val _stateFlow = MutableStateFlow(0)
But In LiveData, we can ensure the data is saved and restored by using
val _liveData: MutableLiveData<Int> = savedStateHandle.getLiveData("Key", 0)
For StateFlow, is there a way (or API) to keep the last value and restore it, like what we have in LiveData?Maciek
12/22/2021, 11:50 AMcatch { }
in the chain? If I'd want to have a infinite flow I must handle the exceptions with try catch
in the lambdas because flow will close after any exception "leaking" to the stream, right? Or is there a way to do some explicit recover from the exception? Example code, only 0
will be emitted, next emit 1
is lost due to flow completion after exception.
flow {
emit(0)
emit(1) // this emit is never reached
}
.onEach { if (it == 0) throw Throwable() }
.catch { println(it) }
.collect()
Florian Walther (live streaming)
01/01/2022, 10:50 AMgetOrAwayValue()
(the helper function from Google's testing samples) on Flow#asLiveData()
? I'm not getting a value.Stylianos Gakis
01/04/2022, 9:45 AMstatein
? I’ll show you my use case and try to explain what I mean in the thread 🧵Slackbot
01/05/2022, 8:42 PMStylianos Gakis
01/07/2022, 12:23 PMsealed interface Types {
object One : Types
object Two : Types
}
val someFlow: Flow<Types>
someFlow
.filterIsInstance<Types.One>()
.distinctUntilChanged()
.collectLatest() { one ->
// I want this to be cancelled on an emission of `Two` from `someFlow` but keep on going alive without interruption on a repeat emission of `One` if it was `One` before too
coroutineScope {
while(isActive) {
doWork(one)
}
}
}
Alexander Black
01/19/2022, 2:53 AMsomeFlow.onEach { /*Do something*/}.takeUntil(someOtherFlowAsPredicate).collect()
Ciprian Grigor
01/19/2022, 9:29 PMJustin Tullgren
01/21/2022, 4:27 PMChannel.receiveAsFlow
can process individual items in parallel by collectors? Or is it an internal "fan out". I just want to process a queue of emissions in parallel. Thanks!
val flow = channel.receiveAsFlow()
repeat(5) { launch { flow.collect { /*receive channel item in separate coroutine */ } } }
joadar
02/05/2022, 4:54 PMvar running: Flow<Pair<Event, String?>> = flow {}
suspend fun execute(node: NodeInterface) {
running = node.run()
}
The first execute()
is working, I collect it from running, but others call to execute()
are not giving anything. How can I make it working correctly? What do I need to use? I tried with shareFlow, tried to collect the node.run()
flow and emit to the running
but nothing is collected 😕 Do I need to use channel instead of flow?
Thanks for your input!harry.singh
02/08/2022, 5:04 PMStateFlow
. My test looks like this:
@Test
fun `test state flow`() = runBlocking {
val stateFlow = flowOf(1, 2, 3)
.onEach {
println(it)
}
.stateIn(
scope = TestCoroutineScope(),
started = SharingStarted.WhileSubscribed(),
initialValue = 0
)
val results = mutableListOf<Int>()
val job = stateFlow.collectAsync(TestCoroutineScope()) {
results.add(it)
}
assertEquals(listOf(0, 1, 2, 3, 4), results)
job.cancel()
}
I was expecting this test to pass but it is failing. The actual size of results is 1
and the value it contains is 3
. I debugged my test with onEach
and in console every value from the source flow is printed which means the value was emitted but it was never received in the collector. Does anybody know why am I only receiving the last value in my collector in the above code?Tunji Dahunsi
02/12/2022, 4:19 PMA mutable state flow is created using MutableStateFlow(value) constructor function with the initial value. The value of mutable state flow can be updated by setting its value property. Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value.
I was wondering does this behavior
So a slow collector skips fast updates, but always collects the most recently emitted value.
also apply to StateFlow
instances created with the stateIn
method?Alexander Maryanovsky
02/16/2022, 10:44 AMStateFlow/MutableStateFlow
doesn’t implement ReadOnlyProperty/ReadWriteProperty
? It seems like it’s a perfect fit for a thread-safe property.Nick Allen
02/17/2022, 8:25 AMcallbackFlow
Can Korkmaz
02/22/2022, 2:30 PMCan Korkmaz
02/22/2022, 2:30 PMval suIdKeyFlow = dataStore.data.map { // this is dataStoreRepository class member
it[PreferenceKeys.suIdKey] ?: 0
}
ViewModel member function:
private suspend fun readSuId(){
storeRepository.suIdKeyFlow.collect {
currentUserSuId.value = it
Log.d("main", "current user su id flow: ${currentUserSuId.value}") // log is printed to logcat
}
}
ViewModel init block:
init {
Log.d("main","Main View Model created")// this gets printed to logcat
viewModelScope.launch(<http://Dispatchers.IO|Dispatchers.IO>){
Log.d("main","inside viewmodel coroutine scope 1")// this also gets printed to logcat
readSuId()
Log.d("main","inside viewmodel coroutine scope 2: --> ${currentUserSuId.value}}")// can't observe this log at logcat
if(currentUserSuId.value != 0){
getCurrentUser(currentUserSuId.value)
}
}
isUserLoggedIn()// this sets isLoggedIn mutableState<Boolean> in viewmodel, and observed at composable for automatic login, not necessarily related to problem
Log.d("main", " user logged in : ${userLoggedIn.value}")
Tim Oltjenbruns
02/22/2022, 6:04 PMCan Korkmaz
02/23/2022, 7:55 AMTim Oltjenbruns
02/23/2022, 12:44 PM