marcinmoskala
01/27/2022, 5:18 PM<http://Dispatchers.IO|Dispatchers.IO>.limitedParalellism(100)
and Dispatchers.Default.limitedParalellism(100)
?dimsuz
01/28/2022, 1:04 PMclass Feature {
private val scope = CoroutineScope()
private val _flow = MutableSharedFlow<Int>()
fun start() {
scope.launch { flow.emit(1); flow.emit(2) }
}
val flow: Flow<Int> = _flow
}
test code:
should("do something") {
val awaitFirstEmit = feature.flow.onStart { feature.start() }.first()
awaitFirstEmit shouldBe 1
}
Is it OK to use onStart
like this, or will this produce flakiness?
the important thing here is I want to ensure that first item is not lost.trevjones
01/29/2022, 1:00 AMrunBlockingTest
to runTest
. For each test you get to pass, take a drink!Arjan van Wieringen
01/30/2022, 10:03 AMAKlam
01/31/2022, 2:36 PMCiprian Grigor
01/31/2022, 4:37 PMRoudy Korkis Kanaan
02/01/2022, 4:43 AMmapLatest
operator. More details in the thread with code examples.Marko Novakovic
02/01/2022, 4:54 PMFlow
that emits when either SharedFlow
emits?aballano
02/01/2022, 6:11 PMMutableSharedFlow
default constructor + tryEmit issue. I was reading through the issue and seems to me that the docs added here are not 100% accurate? It seems to me that they indicate that the tryEmit
call can succeed, but in reality it will always fail (aka not emit anything) but can return true or false depending on having subscribers or not, isn’t it? cc @elizarov Wondering if it would make sense to have a quick PR there to simplify the texting, could volunteer for that 🙂flygerian.eagle
02/02/2022, 4:11 AMubu
02/02/2022, 12:56 PMbrabo-hi
02/02/2022, 5:40 PM*Warning:* Never collect a flow from the UI directly from *launch* or the *launchIn* extension function if the UI needs to be updated. These functions process events even when the view is not visible. This behavior can lead to app crashes. To avoid that, use the *repeatOnLifecycle* API as shown above.
What is the safest way to collect stateFlow in compose since we don’t have access to `*`repeatOnLifecycle`*` since it is not recommended to collect using launch
?dimsuz
02/03/2022, 11:38 AMstateIn
which requires a CoroutineScope
?
val s1: StateFlow<Int>
val s2: StateFlow<Long>
val s3: StateFlow<Long> = combine(s1, s2) { v1, v2 ->
v1.toLong() + v2
}
// ^^^ can't do that, combine results in `Flow<Long>`,
// must do stateIn...
dimsuz
02/03/2022, 1:46 PMcombine
, but no top-level zip
. I have a List<Flow<Int>>
and want to zip them, what is my best option?Lukasz Kalnik
02/03/2022, 2:56 PM@Test
fun `when no gateway from scanning present then throw`() {
every { state.gateway } returns null
assertThatIllegalStateException().isThrownBy { runBlocking { useCase.execute(Params()) } }
}
This is what it looks like after the migration, according to the official migration guide:
val exceptions = mutableListOf<Throwable>()
val customCaptor = CoroutineExceptionHandler { _, throwable ->
exceptions.add(throwable)
}
@Test
fun `when no gateway from scanning present then throw`() = runTest {
every { state.gateway } returns null
launch(customCaptor) {
useCase.execute(Params())
}
advanceUntilIdle()
assertThat(exceptions).hasSize(1)
.first().isInstanceOf(IllegalStateException::class.java)
}
However, the exception is for some reason not caught by the customCaptor
, but thrown in the test and the test fails.Paul Woitaschek
02/03/2022, 3:36 PMDeferred
For await
, it says:
[…] If the Job of the current coroutine is cancelled or completed while this suspending function is waiting, this function immediately resumes with CancellationException. […]Why would the await function resume with a CancellationException if the job completes while await is waiting?
George
02/04/2022, 8:42 AM//@RestController
//@RequestMapping("${v1}/ping")
public class TestController {
@GetMapping(value = ["", "/"])
public suspend fun testAsync(): Deferred<ApiResponse> {
coroutineScope {
// add here launch, async etc
}
}
}
For every api should i add a coroutineScope to achieve structured concurrency ?aballano
02/04/2022, 4:29 PMonEachLatest
sort of 🤔 Sharing here for feedback or in case I’m missing something 😬
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onEachLatest(action: suspend (T) -> Unit): Flow<T> = transformLatest { value ->
action(value)
return@transformLatest emit(value)
}
the only real difference with onEach
is the transform
-> transformLatest
but I guess that should work, right?tomas-mrkvička
02/04/2022, 8:46 PMrunBlocking
. The documentation says that it should be used in main
, tests and libraries to bridge two worlds. I found zero information about structured concurrency within plain command line applications. Lets say I have a tool which migrates data from one source to another and I would like to use concurrent executions for some parts of the whole migration process. Now, should I define runBlocking
somewhere "in the middle", or how to use coroutines in the world where no life cycle managers exists?
ThanksGeorge
02/06/2022, 3:21 PModay
02/06/2022, 7:04 PMlaunch{}
-> import javafx.application.Application.launch
Lukasz Kalnik
02/07/2022, 6:06 PM@Test
fun `test flow`() = runTest {
val eventUpdateService = mockk {
every { subscribeToEvents() } returns flowOf(SomeEvent).onStart { delay(1000) }
}
val presenter = Presenter(
UnconfinedTestDispatcher(),
eventUpdateService
)
presenter.attachView(view)
advanceTimeBy(2000)
verify { view.displayEvent() } // fails because the flow didn't emit
}
class Presenter(
coroutineContext: CoroutineContext,
val eventUpdateService: EventUpdateService
) {
val coroutineScope = CoroutineScope(coroutineContext)
fun attachView(view: View) {
coroutineScope.launch {
eventUpdateService.subscribeToEvents().collect { event ->
view.displayEvent()
}
}
}
}
Lukasz Kalnik
02/08/2022, 12:28 PMStandardTestDispatcher
and UnconfinedTestDispatcher
in a case where you always inject a dispatcher in the system under test?
Let's say we have a presenter, like in my post above, which always uses one dispatcher for all coroutine calls (e.g. Dispatchers.Default
). For tests it gets injected a test dispatcher, so during tests every coroutine launched inside the presenter uses this injected dispatcher.
Is there a difference which dispatcher is used then?Mini
02/08/2022, 7:11 PMTomer Abramovich
02/09/2022, 6:54 AMsuspend fun myLibraryFunction(query : Query) : QueryResult{
val requestContext: RequestContext = currentCoroutineContext()[RequestContext]!! //expecting a dedicated context element
val tenantId = requestContext.tenantId
do something with tenantId
}
suspend callerFunction () {
val requestContext = RequestContext(tenantId)
CoroutineScope(requestContext).async { myLibraryFunction(query) }.await()
}
^^^
This is what i came up with, but it seems redundant to create a new coroutine (i.e async)
i just want the user to be able to set the requestContext on the current context and that the called function will inherit that
something like
suspend fun callerFunction () : QueryResult {
val requestContext = RequestContext(tenantId)
currentCoroutineContext().plus(requestContext) //i know it's immutable so won't work
myLibraryFunction(query)
}
brabo-hi
02/09/2022, 6:07 PMonStopped
since on CoroutineWorker
it is defined public final override fun onStopped() {}
Tim Malseed
02/09/2022, 10:38 PMStateFlow
, I can call MutableStateFlow()
. But, if I want to convert a Flow into a StateFlow
, I call stateIn()
.
stateIn()
requires a CoroutineScope
. But MutableStateFlow()
doesn’t. Why is that?K Merle
02/10/2022, 5:57 AMval x = combine(y.invoke(), z.invoke()){ y+z }
Both y
and z
are flows of same type and they seem to not throw any errors.stojan
02/10/2022, 10:12 AMList<Flow<T>>
to Flow<List<T>>
?Shaheer Muzammil
02/10/2022, 11:01 AMShaheer Muzammil
02/10/2022, 11:01 AMephemient
02/10/2022, 11:04 AMmap[key]
in a single-threaded dispatcher, sure that's safemap[key] = map[key] + foo()
where foo()
is a suspending function, that's potentially problematic (just as in threaded code)Oliver.O
02/10/2022, 11:07 AM