diesieben07
11/17/2020, 2:36 PMFlow
between a fixed number of collectors. Currently I just do data.shareIn(this, SharingStarted.Lazily)
. Then I let all the collectors colllect that shared flow. However there is a race condition here. In theory the following can happen:
• first collector starts collecting, this starts the sharing
• the emitter emits things and they are immediately consumed by the already subscribed first collector.
• the rest of the collectors start collecting, but they missed out on the first values.
I could set replay
to some number, but that's just a stopgap. Is there a way to say "okay, sharing coroutine, you can start dropping things now, there won't be any further subscribers" after all the subscribers are there?Lulu
11/17/2020, 3:32 PMFlow
of requests that need to be processed by (many) different workers, and I was wondering which option is better from the following:
1. Have each worker apply filter { }
on the main Flow
to check if this request belongs to it.
2. Have a separate Flow
for each worker, and a general worker that checks all the main `Flow`'s requests and re-emit each request to the corresponding worker's Flow
.
3. (possibly something I haven't thought of?)diesieben07
11/17/2020, 6:21 PMshareIn
. However this does not support a finite flow, as shareIn
makes the flow endless ("Shared flow never completes"). How can the collectors of the shared flow ever complete? Even if the original flow completes, they will continue to "hang", expecting more elements, because the shared flow never completes.Aaron Todd
11/17/2020, 7:50 PM1 suspend fun processTasks(chan: ReceiveChannel<MyTask>) {
2 for(task in chan) {
3 doWork() // non-suspend function
4 }
5 }
is it correct to say that on Line 3 once the coroutine is running on a thread it will not suspend again and potentially switch threads until the next call to chan.receive()
on line 2?
More generically the only time a coroutine has a chance to suspend is at a suspension point which is defined by a suspend
function call? (i.e. no instruction level suspension possible)zsperske
11/18/2020, 2:56 AMcallbackFlow
and a Channel
? both seem to support "hot" streamsTolriq
11/18/2020, 7:40 AMAll methods of shared flow are *thread-safe* and can be safely invoked from concurrent coroutines without external synchronization.
But IDE on SharedFlow.emit says: Collects the value emitted by the upstream. This method is not thread-safe and should not be invoked concurrently.
Which one is right?solidogen
11/18/2020, 12:20 PMDavide Giuseppe Farella
11/18/2020, 1:42 PMFlow
to a StateFlow
on the fly? ( i.e. flow.asStateFlow()
)Rolbin
11/18/2020, 2:11 PMSlackbot
11/18/2020, 2:18 PMbbaldino
11/18/2020, 4:51 PMselect
expression in the 'parent' on a set of StateFlows
from the subcomponents, but I saw onReceive
isn't directly supported by StateFlow
. I tried using produceIn
on the StateFlow
so I could use onReceive
, but then I get the most recent state over and over. I looked at using distinctUntilChanged
on the StateFlow
, but then saw that has no effect...What should I go with here? I also can't 'miss' a value here, so maybe StateFlow
is the wrong tool?doodla
11/18/2020, 8:18 PMvar x: Int = 0
suspend fun modifyX() {
if (x > 10) {
x -= 1
} else {
x += 1
}
}
fun main() {
runBlocking {
withContext(<http://Dispatchers.IO|Dispatchers.IO>){
repeat(10) {
modifyX()
}
}
}
}
Does x
need to be @Volatile
?df
11/18/2020, 8:40 PMcoroutinedispatcher
11/18/2020, 10:31 PMSharedFlow
but I am getting an IllegalStateException: This job has not completed yet
. It’s basically pretty easy because I commented out the rest of the code for now:
fun registerNewUser(email: String, password: String, hint: String) {
viewModelScope.launch { _state.emit(State.Loading) }
// commented out for the moment just to make sure it's not the rest of the code that is failing the test
}
And the test:
@Test
fun `starting registration process will set the state immediately to loading`() =
runBlockingTest(testCoroutineDispatcher) {
registerViewModel.registerNewUser("anyEmail", "anyPassword", "anyHint")
~ assertEquals(registerViewModel.state.first(), RegisterViewModel.State.Loading)
}
Versions:
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1",
coroutines_android : "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1",
coroutines_test: "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1"
Probably doing something wrong, but not sure what. Any ideas/input?yschimke
11/19/2020, 10:03 AMcont.resume(Unit) { unlock(owner) }
vs
cont.resume(Unit, onCancellation = { unlock(owner) })
elizarov
11/19/2020, 3:00 PMkotlinx-coroutines
version 1.4.1-native-mt
released (with multithreading in Kotlin/Native)christophsturm
11/20/2020, 11:07 AMPublisher<T>.asFlow()
not support nullable values? https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/as-flow.htmlMohamed Ibrahim
11/20/2020, 12:27 PMrepositoryContract.retrieveTopStories()
is a suspend function which returns List<Story>
why I can’t convert it to Flow by calling
repositoryContract.retrieveTopStories().asFlow()
why
11/20/2020, 1:26 PMchristophsturm
11/20/2020, 3:46 PM(0 until entries).map {
async {
repo.create(connection, user)
}
}.forEach { it.await() }
Nemanja Mladenović
11/20/2020, 5:15 PMLulu
11/20/2020, 7:20 PMJohn O'Reilly
11/21/2020, 11:22 AMStateFlow
in view model (along with use of stateIn
)....is there something equivalent to MediatorLiveData
or switchMap
when using that?Sivan
11/22/2020, 11:43 AMFlow
with suspend functions
to read a stream of data from my API. To get all the registered users, flow
seems like a cool option. But while creating a user which is a one-shot function, would flow really come in handy? I really like the idea of emitting streams by passing a Result wrapper around the data and this gives me more clarity and readability in code.
Just want to know if it is considered as best practice to use Flow for one shot functions like creating a user or does it become unnecessary?df
11/22/2020, 6:55 PMPaul Woitaschek
11/23/2020, 7:46 AMsamuel
11/23/2020, 12:41 PMsuspend inline fun <T>Flow<T>.fireAndForget(crossinline block: (T) -> Unit): Flow<T> {
coroutineScope {
launch {
runCatching {
collect {
block(it)
}
}
}
}
return this
}
Lulu
11/23/2020, 11:45 PMtateisu
11/24/2020, 2:22 AMtateisu
11/24/2020, 3:01 AMtateisu
11/24/2020, 3:01 AMgildor
11/24/2020, 4:19 AM