Tomas Kormanak
12/09/2021, 9:16 AMParent job is Cancelling
but I can't figure out which coroutine failed and where.Thomas Flad
12/09/2021, 12:42 PMshareIn
operator + WhileSubscribed
to cancel/resume the SharedFlow in repeatOnLifecycle
when the Android App goes to background. It’s working fine so far. I also combine this flow with a MutableStateFlow in my ViewModel to react on user input and update my state.
class MyViewModel(
dataSource: MyDataSource,
sharingStarted: SharingStarted = WhileSubscribed(5000)
) : ViewModel() {
var userInput = MutableStateFlow(1)
val state = dataSource.dataFlow()
.combine(userInput) { data, userInput ->
OutputClass(data, userInput)
}
.shareIn(viewModelScope, sharingStarted, 1)
}
My issue is that I want to test this. I’ve tried to use Turbine for this but it does not help with the combine.
@Test
fun testMyViewModel() = runBlockingTest {
val cut = MyViewModel(
mockk { coEvery { dataFlow() } returns flowOf(Data()) },
sharingStarted = SharingStarted.Eagerly
)
cut.state.test {
// Also an emit to the userInput here does not help
// cut.userInput.emit(2)
assertEquals(OutputClass(...), awaitItem())
}
}
I’m getting a timeout instead of the result.Andrew Kuryan
12/10/2021, 12:01 AMMichael Clancy
12/10/2021, 2:34 PMdimsuz
12/10/2021, 4:00 PMsuspend fun userList(): List<User>
fun userList(): Flow<List<User>>
Any idea on naming here, so that they don't conflict? in Rx we used to call the "blocking" one userListSync
, and "sync" were rare beasts actually. But in coroutine world calling suspend
function "sync" is not correct.Javier
12/10/2021, 4:20 PM> Task :semver-core:linkDebugTestLinuxX64 FAILED
e: Compilation failed: Cannot get target from triple x86_64-unknown-linux-gnu.
mcpiroman
12/10/2021, 5:20 PMTolriq
12/10/2021, 6:39 PMjeff
12/10/2021, 8:46 PMClament John
12/11/2021, 10:20 AMprivate val searchString = MutableStateFlow("")
val rooms = searchString.flatMapLatest {
if (it.isBlank()) {
fetchPrivateRooms()
} else {
fetchPrivateRooms().combineTransform(fetchPublicRooms()) { priv, pub ->
emit(filterRooms(priv, pub, it))
}
}
}.stateIn(viewModelScope, SharingStarted.Eagerly, emptyList())
Q: How do I do this if fetchPrivateRooms
returns a LiveData<List<Rooms>>
instead of a flow? I can't use observeAsState directly as I wish to merge and transform the data coming from the data source
What is my usecase?
I wish to use a library that uses livedata extensively in Compose. I've previously use the above code when I was working with flow
But don't know how to merge and transform livedata within a viewmodel. Note: This business logic is to be seperated from the UI logic and so I can't use a lifecycleowner (I think)mbonnin
12/12/2021, 8:15 PMcatch
or retry
in IntelliJ take me to a decompiled stub instead of the source. Is there anything I can do about this?ursus
12/13/2021, 4:29 AMJoakim Forslund
12/13/2021, 10:01 AM.collect {}
on, however, I want to wait for a StateFlow/Boolean to be true before the collection can proceed, what is the recommended way of currently handling this? (None of the emits can be dropped, they need to wait for the StateFlow/Boolean to be true and emit as soon as it changes)tylerwilson
12/13/2021, 1:46 PMdimsuz
12/13/2021, 4:13 PMval flowA: MutableSharedFlow<Int>
flowB.collectTo(flowA)
I know there's shareIn
, but it creates a new flow, while I have an existing one.George
12/13/2021, 8:11 PMSimon Lin
12/14/2021, 6:34 AMfun foo() {
viewModelScope.launch {
val inputList = listOf("a", "b", "c")
val deferredList = mutableListOf<Deferred<String>>()
repeat(3) { index ->
deferredList += async {
delay((1L..10000L).random())
inputList[index]
}
}
val outputList = deferredList.map { it.await() } // a, b, c
}
}
Florian
12/14/2021, 10:33 AMrunBlockingTest
because TICK_DELAY
is skipped so the CPU doesn't have time to execute something else? Normally, I leave this loop by collecting the value in another place and cancelling the coroutine around it.
while (true) {
delay(TICK_DELAY)
val totalElapsedTime = systemClockProvider.getElapsedRealTime() - startTime
tasksDataSource.increaseMillisCompletedToday(
selectedTask.id,
totalElapsedTime - lastReportedTime
)
lastReportedTime = totalElapsedTime
}
ursus
12/14/2021, 12:54 PMdimsuz
12/14/2021, 12:56 PMsuspend fun func() {
var counter = 0
someComplexFlow() // <-- potentially calls various withContext() inside its operators
.collect { counter++ }
}
Jonathan Olsson
12/14/2021, 1:45 PMsuspend
lambdas could be used just like normal lambdas. Why then is this not allowed?:
fun main() {
val f = suspend { i: Long ->
delay(i)
}
runBlocking {
f(1000L)
}
}
handstandsam
12/14/2021, 2:28 PMWilliam Reed
12/14/2021, 2:57 PMStateFlow
in my viewmodel that is backed by the Flow
based jetpack DataStore. Any recommendations on how to do this without using runBlocking
to get the initial value?
i was thinking of calling a suspend fun
to do this in my view model init
, but then the StateFlow
would not be available as soon as it is requested by the viewRobert Jaros
12/14/2021, 6:31 PM1.6.0-RC
, but in 1.6.0-RC2
it's not working anymore because the collect
extension function was removed with https://github.com/Kotlin/kotlinx.coroutines/pull/3047 :
val stateFlow = MutableStateFlow(State())
val actionFlow = MutableSharedFlow<Action>()
launch {
actionFlow.collect {
stateFlow.value = stateReducer(stateFlow.value, it)
}
}
What is the correct way to implement this?Colton Idle
12/15/2021, 4:30 AMviewModelScope.launch {
val thing: String = someSuspendFunctionThatReturnsAValue()
aMethodCallThatIsAFlow(thing).collect {
if (it.status == "SUCCESS"){
state.success = true
//should I call job.stop() here?
}
}
}
My problem is that I want the flowable to stop collecting. Can I just save a ref to the call to launch and then call stop()? Or is starting a flowable in a coroutine scope frowned upon in general?Pablo
12/15/2021, 3:23 PMinternal class MyPresenter @Inject constructor(...) {
private val _uiState = MutableStateFlow<Foo>(Foo.Loading)
override val uiState: StateFlow<Foo> = _uiState
private val _sideEffect = MutableSharedFlow<Foo2>()
override val sideEffect : SharedFlow<Foo2> = _sideEffect
}
init {
scope.launch {
usecase.getBar().fold(
onSuccess = { _uiState.emit(mapper(whatever))
if(...) _sideEffect.emit(whatever)
}
onFailure = { _uiState.emit(handleError(it)) }
}
}
I'm trying to handle these emit, but I'm not succeeding I'm trying with the library app.cash.turbine but is throwing me an error...
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
(Coroutine boundary)What I'm doing in the test is :
@Test
fun test() {
//given with the usecase call and willReturn what I want
//given with the mapper call and willReturn what I want
//Init presenter so the init {} method gets fired
presenter.uiState.test {
assertEquals(WhatIExpect, awaitItem())
awaitComplete()
}
But it's not working, is there something I'm missing?nilTheDev
12/15/2021, 5:39 PMstateFlow
emit changes when done in a nested value.
For context, consider this example,
data class Person(var name: String)
class SomeViewModel : ViewModel() {
// made public for brevity
val state = MutableStateFlow( listOf(Person("John")) )
init {
// will this change be emitted?
state.value[0].name = "Bob"
}
}
Will the change in the init
block be emitted? I am pretty sure that liveData
wouldn't emit this change. But would stateFlow
do that?groostav
12/15/2021, 7:52 PMSangmin Lee
12/16/2021, 4:20 AMfrankelot
12/16/2021, 6:29 PMsuspending fun
that puts something in a queue and suspends until that “something” gets its turn and is finished being processedfrankelot
12/16/2021, 6:29 PMsuspending fun
that puts something in a queue and suspends until that “something” gets its turn and is finished being processed.send()
suspends until the channel has buffer capacity and starts processing… not when its done processingNick Allen
12/16/2021, 6:54 PMMutex
frankelot
12/16/2021, 6:57 PMCasey Brooks
12/16/2021, 6:57 PMFlow
might be a better mechanism than raw channels, since by design the emitter will suspend until the collector has finished processing the itemNick Allen
12/16/2021, 6:57 PMCompleteableJob
in the item to signal when done but it’s easy to mess up error situations and you lose natural structured concurrencyfrankelot
12/16/2021, 7:04 PMmutex
and flows
and report back, thanks! for the helpThe purpose of channels is to make a “pipeline” of async communication. Putting an item into a channel and then waiting for processing to complete isn’t how they’re intended to be used, which is why it’s not working for you.this makes sense.. I don’t want async communication
darkmoon_uk
12/16/2021, 11:38 PMsuspendCoroutine
function e.g. in pseudocode:
fun queuedProcess(someParam: Param) = suspendCoroutine { continuation ->
addToQueue(param, continuation)
ensureQueueIsRunning()
}
...and elsewhere, the queue processing calls queueItem.continuation(result
)suspendCoroutine
suspends the execution at that point and gives you a Continuation
object that you can invoke with a result when you want that point of execution to continue. You can then store this object as part of your queued operation, and invoke it when the queued process completes.K Merle
12/17/2021, 5:35 AMCasey Brooks
12/17/2021, 3:38 PMsuspendCoroutine
route works, but it might be easier and make the intent clearer to use a CompletableDeferred, which would effectively do the same thing