esdrasdl
04/29/2022, 8:50 PMclass SampleViewModel : ViewModel() {
private val interactions = Channel<Any>()
private val state = MutableSharedFlow<Any>(
onBufferOverflow = BufferOverflow.DROP_LATEST,
extraBufferCapacity = 1
)
init {
viewModelScope.launch {
interactions.receiveAsFlow().collect { action ->
processAction(action)
}
}
}
fun flow() = state.asSharedFlow()
fun handleActions(action: Any) {
viewModelScope.launch {
interactions.send(action)
}
}
suspend fun processAction(action: Any) {
state.emit("Show loading state")
// process something...
state.emit("Show success state")
}
}
R
04/30/2022, 8:23 AMController.kt
when I attempt to call this from main()
I get an IDE error (see below), can anyone help me resolve this?
fun <T> awsrun(f: (Ec2Client) -> T): List<T> {
return clients.values.map { f(it) }
}
Muhammad Talha
04/30/2022, 6:10 PMPablo
05/04/2022, 8:41 AMFlow<T>
as a parameter and then inside has a collect { }
?
Example
fun doStuff(flow: Flow<String>) {
scope.launch {
flow.collect {
if(!it in myList) {
dataSource.add(it)
}
}
}
}
How would I have this verify that dataSource.add(it)
? Considering this myList
is something I can mock tooAndré Martins
05/04/2022, 2:57 PMcoroutineContext[ReactorContext.Key]
although I was expecting that this context would be restored after calling chain.filter
on my WebFilter, meaning that whenever I put something in coroutineContext[ReactorContext.Key]?.context
it would be visible in Reactor context, but it doesn’t seem to be the case. Is there anyway to do this?Remy Benza
05/05/2022, 7:27 AMNino
05/05/2022, 10:24 AMyield()
but I'm lost. I'd expect this code to print [1, 2, 3] \n [1, 2, 3, 4] \n End
but it doesn't... Why ?
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val mutableStateFlow = MutableStateFlow(emptyList<Int>())
val collectJob = launch {
combine(
flowOf(listOf(1, 2, 3)),
mutableStateFlow
) { list1, list2 ->
list1 + list2
}.collect { list ->
println(list)
}
}
yield()
mutableStateFlow.value = listOf(4)
yield()
collectJob.cancel()
println("End")
}
Playground : https://pl.kotl.in/EpSQVK30I
PS : with delay(100)
instead of yield()
, it works. With lower values, I get random results.rrva
05/05/2022, 7:32 PMclass ChannelWorker() {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
init {
GlobalScope.launch {
while(true) {
val receive = requestChannel.receive()
val resp = fetchStuff(receive.id)
responseChannel.send(FooResult(resp))
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
}
version 2:
class ChannelWorker {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
init {
CoroutineScope(EmptyCoroutineContext).launch {
for (message in requestChannel) {
val resp = fetchStuff(message.id)
responseChannel.send(FooResult(resp))
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
}
still I am creating a coroutine in a init block, is that really a good thing? how will the lifecycle of that coroutine be managed? What happens when an object of ChannelWorker goes out of scope, how would I stop the channel receiver coroutine? Or is it cleaner if ChannelWorker has start() and stop() methods?Nandu
05/05/2022, 8:39 PMPaul Woitaschek
05/06/2022, 5:42 PMLilly
05/06/2022, 11:19 PMstateIn
.
1. Is this a bad approach:
// presenter/viewmodel:
val myState: StateFlow<MyState> = api.subscribeToFlow()
.map {..}
.stateIn(scope = mainScope + Dispatcher.Default, started = SharingStarted.Lazily, initalValue = MyState.Initial)
// somewhere else:
fun subscribeToFlow(): Flow<Int> {
println("thread: ${Thread.currentThread()}.") // prints main thread
...
return someFlow()
}
I'm asking because the call to api.subscribeToFlow()
runs in main thread and I don't know how to make it switch to a default thread. I could wrap everything in withContext()
in my subscribeToFlow()
function but is there an alternative? This brings me to my 2nd question.
2. What is a use case for the overloaded suspend stateIn(scope: CoroutineScope)
function? It requires to be called from a suspension function while the simple stateIn
function does not.juliocbcotta
05/07/2022, 10:18 AMimport kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
suspend fun flowsWithCombine() {
val numbersFlow = flowOf(1, 2, 3)
val lettersFlow = flowOf("A")
combine(numbersFlow, lettersFlow) { number, letter ->
"$number$letter"
}.collect {
// delay(100)
println(it)
}
}
fun main() = runBlocking {
val deferred: Deferred<Unit> = async {
println("waiting...")
flowsWithCombine()
}
println(deferred.await())
}
If I run the code as above, I get
waiting...
1A
2A
3A
kotlin.Unit
if I uncomment the delay
the result changes
waiting...
1A
3A
kotlin.Unit
Why is that ?xxfast
05/09/2022, 1:22 AMmutableStateFlow.value = newValue
mutableStateFlow.emit(newValue)
Alexandre Brown
05/09/2022, 12:01 PMrunBlocking(<http://Dispatchers.IO|Dispatchers.IO>)
instead of runBlocking { }
, but not sure what is the immediate difference. My app is a backend so no main thread but still want to know the difference.
ThanksFlorent Dambreville
05/09/2022, 12:45 PM1.6.20
and I get a crash when calling a suspendCoroutine
function :
var topupCountries: Deferred<Set<String>> = buildCountriesAsync()
private set
private fun buildCountriesAsync() = CoroutineScope(backgroundDispatcher).async(start = CoroutineStart.LAZY) {
getCountries()
}
private suspend fun getCountries(): Set<String> {
return try {
topupCountries.getCompleted()
} catch (exception: IllegalStateException) {
// Crash here
fetchRestrictedTopupCountries()
} }
private suspend fun fetchRestrictedTopupCountries(): Set<String> {
return suspendCoroutine { continuation ->
val request = ApiTopupRequestFactory.getTopupCountries(
listener = {
val setResult = it.countries.toSet()
continuation.resume(setResult)
},
errorListener = {
continuation.resume(setOf())
}
)
execute(request)
}
}
The error is : java.lang.ClassCastException: kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to java.util.Set
Am I doing something wrong here ? Anyone else with this issue ?William Reed
05/10/2022, 5:12 PMStateFlow<NodeState>
property. I also have a NodeManager
which in turn has a MutableList<Node>
(which is just exposed as a List<Node>
. At my UI level I want to display something based on each `Node`’s state.
it starts to get a little weird since I need to observe each nodes state flow as well as account for the `NodeManager`s collection of nodes possibly changing. Any suggestions for how to approach this such that in my UI level I can end up with a List<NodeState>
so I can render views from it?
I’ve tried exposing a StateFlow<List<Node>>
from the NodeManager
but then from the UI level I would need to somehow manage the collection of each node / the node’s possibly getting removed or addedColton Idle
05/11/2022, 1:23 AMoverride suspend fun requestBooks(): Flow<List<Book>> {
return callbackFlow {
val listener =
FirebaseFirestore.getInstance().collection("books").addSnapshotListener { value, error
->
trySend(value!!.toObjects())
}
awaitClose { listener.remove() }
}
}
How do I move the trySend(value!!.toObjects())
to a background thread. My issue is that value!!.toObjects()
is a deserialization operation that can take a long time.
Wrapping trySend
in a launch with Dispatchers.IO seems to help my lag issue in my app, but is it really that easy? Or am i shooting myself in the foot.
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
trySend(value!!.toObjects())
}
Pablo
05/11/2022, 9:49 AMviewModel
have the call in the init{}
? Because the way to test in turbine is to create the viewModel
and then get the sharedStateFlow
and do the calls of the viewModel
but what if the code is in init
when creating the viewModel
is already called this init
so I can not test the first state of this sharedFlow
right?
Example :
@Test
fun test = runTest {
val viewModel = createViewModel() <-- already call the init
viewModel.stateFlow.test {
//What should I call here? If it's in init...
}
}
rrva
05/11/2022, 2:27 PMlouiscad
05/11/2022, 4:37 PMStateFlow
to another StateFlow
with a synchronous operation (it's quite cheap, it's only one object allocation), and I didn't want to have to link it to a CoroutineScope
this time, so I wrote this mapSync
extension function, which you can criticize, and copy at will.
I'm using it in a single-thread context, but I think it'd work just fine in multi-thread so long as the mapping operation is cheap and doesn't cause heavy recomputations on concurrent accesses.
import kotlinx.coroutines.flow.*
fun <T, R> StateFlow<T>.mapSync(transform: (T) -> R): StateFlow<R> = object : StateFlow<R> {
override val replayCache: List<R> get() = listOf(value)
override suspend fun collect(collector: FlowCollector<R>): Nothing {
this@mapSync.collect {
collector.emit(transform(it))
}
}
private var lastUpstreamValue = this@mapSync.value
override var value: R = transform(lastUpstreamValue)
private set
get() {
val currentUpstreamValue: T = this@mapSync.value
if (currentUpstreamValue == lastUpstreamValue) return field
field = transform(currentUpstreamValue)
lastUpstreamValue = currentUpstreamValue
return field
}
}
I hope it's helpful to some of you!allan.conda
05/11/2022, 5:38 PMrunTest
following this migration guide.
I’m struggling to migrate my test code which expects the dispatcher to be unpaused throughout the whole test.
Is there a way to do this or do I really have to call runCurrent()
for every step I want to assert?Muhammad Talha
05/12/2022, 1:00 AMsuspend fun getData(): Data = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val a = async(<http://Dispatchers.IO|Dispatchers.IO>) {
delay(1000)
Data("some data...")
}
a.await()
}
therealbluepandabear
05/13/2022, 8:19 AMvar toReturn: PixelArt? = null
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
AppData.pixelArtDB.pixelArtCreationsDao().getAllPixelArtCreations().observe(this@extendedGetCurrentPixelArtObj.findViewTreeLifecycleOwner()!!) {
toReturn = it[currentIndex]
}
}
return toReturn!!
therealbluepandabear
05/13/2022, 8:19 AM.await()
in this contextazabost
05/13/2022, 12:12 PMSharedFlow
collection?
Let’s say:
class MyClass(coroutineScope: CoroutineScope, flowToCollect: Flow<Int>) {
var lastObservedResult: Int? = null
init {
coroutineScope.launch {
flowToCollect.collect { lastObservedResult = it }
}
}
}
If I use runTest
and pass the created TestScope
then the test is going to fail after some time because there is a running coroutine.
@Test
fun testMyClass() = runTest {
MyClass(this, flow)
// do something here, make some assertions etc.
// at the end, the test is going to fail because of the running coroutine
}
After waiting for 60000 ms, the test coroutine is not completing, there were active child jobs (...)
So should I create another scope instead? Like this, for example?
val dispatcher = UnconfinedTestDispatcher()
@Test
fun testMyClass() = runTest(dispatcher) {
val additionalScope = CoroutineScope(dispatcher)
MyClass(additionalScope, flow)
// do something here, make some assertions etc.
additionalScope.close() // Is this necessary, btw? Is the launched coroutine going to leak after the test is finished or something?
// now the test won't fail, but I must remember to close the additional scope manually
}
azabost
05/13/2022, 7:48 PMfun asyncTest(
context: CoroutineContext = EmptyCoroutineContext, // in practice, it was never changed
timeoutInMillis: Long = 15000,
block: suspend CoroutineScope.() -> Unit
) {
runBlocking(context) {
withTimeout(timeoutInMillis, block)
}
}
It was supposed to fail the test if it took unexpectedly long to execute (real time, not virtual time) to avoid blocking the build agent in the CI system.
Now, after the migration to coroutines 1.6.x, I wanted to additionally use the new runTest
function that additionally checks if there are no active coroutines when the test is finished. Therefore I’m wondering if something like this is a good enough replacement for my previous utility:
fun asyncTest(
context: CoroutineContext = UnconfinedTestDispatcher(),
timeoutInMillis: Long = 15000,
block: suspend TestScope.() -> Unit
) = runBlocking {
withTimeout(timeoutInMillis) {
runTest(context, timeoutInMillis, block)
}
}
// EDIT: Nope, it doesn’t work the way I expected. It doesn’t work in the same way as my previous utility, e.g. when there is a coroutine with an endless while-loop with a delay inside it.
Any ideas?azabost
05/14/2022, 12:49 AMrunTest
behavior that makes it run all the delays at the end of the test body. I think I saw it mentioned somewhere, maybe in some GitHub issue, but I can’t find any mentions in the documentation regarding that behavior.
Example:
private val testCoroutineScheduler = TestCoroutineScheduler()
private val standardTestDispatcher = StandardTestDispatcher(testCoroutineScheduler)
@Test
fun `should execute the coroutine with delays`() = runTest(standardTestDispatcher) {
var iterations = 0
val job = launch {
while(isActive) {
println("iteration")
iterations++
delay(10)
}
}
iterations.shouldEqual(0) // this is an assertion
runCurrent()
iterations.shouldEqual(1)
advanceTimeBy(5)
iterations.shouldEqual(1)
advanceTimeBy(5)
iterations.shouldEqual(1)
runCurrent()
iterations.shouldEqual(2)
advanceTimeBy(10)
runCurrent()
iterations.shouldEqual(3)
job.cancel()
}
If I remove job.cancel()
at the end, the test is never going to finish and I’m going to be flooded by the println
invocations.
That behavior is quite problematic for me sometimes and that’s why I’m trying to understand it better.
There are some cases where I:
• want to control the scheduler to see how many times something periodical happened
• want the test to finish despite the lack of direct access for the launched coroutine containing the delay
For example:
private val testCoroutineScheduler = TestCoroutineScheduler()
private val standardTestDispatcher = StandardTestDispatcher(testCoroutineScheduler)
class Refresher(scope: CoroutineScope, refreshingDispatcher: CoroutineDispatcher) {
var refreshCount = 0
init {
scope.launch(refreshingDispatcher) {
while (isActive) {
refreshCount++
delay(10)
}
}
}
}
@Test
fun `refreshing should work`() = runTest(standardTestDispatcher) {
val refresher = Refresher(this, standardTestDispatcher)
refresher.refreshCount.shouldEqual(0)
runCurrent()
refresher.refreshCount.shouldEqual(1)
advanceTimeBy(15)
refresher.refreshCount.shouldEqual(2)
// this test never ends and I can't cancel the launched coroutine
}
I can make it work by removing runTest
but I’m simply not sure if this is what I should do in this scenario.
@Test
fun `refreshing should work`() {
val refresher = Refresher(CoroutineScope(standardTestDispatcher), standardTestDispatcher)
refresher.refreshCount.shouldEqual(0)
testCoroutineScheduler.runCurrent()
refresher.refreshCount.shouldEqual(1)
testCoroutineScheduler.advanceTimeBy(15)
refresher.refreshCount.shouldEqual(2)
}
Any advices?juliocbcotta
05/14/2022, 8:43 PM.test
... something like
val flow = MutableStateFlow(1)
flow.test {
assertEquals(awaitItem(), 1)
assertEquals(awaitItem(), 2) // <-- waits forever
}
flow.emit(2)
Would anyone have a tip on how to properly test it ?K Merle
05/15/2022, 8:51 AMcallbackFlow
but I do not need awaitClose()
callback. What would be a good candidate for it in flow?Robert Williams
05/15/2022, 11:04 AMRobert Williams
05/15/2022, 11:04 AMWrapper(value=3)
map2
which works correctly on all versionsdelay
it obviously works because it's not suspendingDan Fingal-Surma
05/16/2022, 7:19 AMinline fun <OldT, NewT> Wrapper<OldT>.map(mapFunction : (OldT) -> NewT) : Wrapper<NewT> =
value?.let { wrap(mapFunction(it)) } ?: Wrapper(null)
But this fails:
inline fun <OldT, NewT> Wrapper<OldT>.map(mapFunction : (OldT) -> NewT) : Wrapper<NewT> =
value?.let(mapFunction.andThen(::wrap)) ?: Wrapper(null)
If I try to change andThen
to not be an extension function, I get a compiler error:
inline fun <OldT, NewT> Wrapper<OldT>.map(crossinline mapFunction : (OldT) -> NewT) : Wrapper<NewT> =
value?.let(andThen(mapFunction, ::wrap)) ?: Wrapper(null)
inline fun <A, B, C> andThen(crossinline g: ((A) -> B), crossinline f: (B) -> C): (A) -> C {
return { it: A -> f(g(it)) }
}
Suspension functions can be called only within coroutine body
I get the same error if I manually inline the map function:
wrapped.value?.let(andThen({ oldVal ->
delay(1000)
oldVal.toString()
}, ::wrap)) ?: Wrapper(null)
If I take your original code and annotate everything with suspend
everywhere I can, it works: https://pl.kotl.in/zrgS-jyXi
So it seems like something is getting mixed up in the loose way that inline allow accepting suspend funs. Note that the error is on the T
from Wrapper
, which indicates to me that the argument it
to mapperFun
in mapOrDefault
is getting put into the wrong context somehow. It seems like a bug.Robert Williams
05/16/2022, 9:22 AMDan Fingal-Surma
05/17/2022, 12:57 AMRobert Williams
05/17/2022, 8:55 AM