Davide Giuseppe Farella
01/15/2020, 8:40 PMjeggy
01/16/2020, 12:30 AMlaunch
scope?
Example code: https://pl.kotl.in/g3Gz4nH43Aaron Stacy
01/16/2020, 3:01 AMscopeToBeObserved.coroutineContext.launch {
try {
delay(FOREVER)
} catch (ignored: CancellationException) {
// ... unbind the service connection ...
}
}
Animesh Sahu
01/16/2020, 6:44 AMLuis Munoz
01/16/2020, 4:20 PMv0ldem0rt
01/17/2020, 3:51 AMsuspend
function. How can I get return Class<*>
of a suspend function. So far I know the last parameter of suspend
function is a Continuation<T>
. How can I get Class<T>
from Continuation<T>
Esa
01/17/2020, 2:55 PMCoroutineScope.produce()
, ReceiveChannel<E>.consumeEach{}
and ProducerScope.send()
? Is there any way to know? I’ve got a working implementation now, but knowing that it may break without warning is a bit scary. Also, is this the wrong place to ask this question?
Is the discussion boards a better place?Thiyagu
01/17/2020, 6:27 PMfun pollMessages(): Flow<Message> {
return channelFlow {
repeat(noOfReceivers) {
launch {
val messages = sqsClient.fetchMessages()
messages.forEach {
send(it)
}
}
}
}
}
Thiyagu
01/17/2020, 8:34 PMAntanas A.
01/18/2020, 11:02 AMJérôme Gully
01/18/2020, 11:32 AMbenny.huo
01/18/2020, 2:13 PMgeorgiy.shur
01/18/2020, 4:13 PMFlow
in my production code, but it seems that my understanding of it and coroutines in general isn't sufficient.
I reduced my project code to a simple reproducible test example.
I'm using TestCollector
class to collect values, emitted by the Flow
:
class TestCollector<T>(scope: CoroutineScope, flow: Flow<T>) {
private val collectedValues = mutableListOf<T>()
private val job = scope.launch { flow.collect { collectedValues.add(it) } }
fun assertValues(vararg values: T) = run {
val valuesList = values.toList()
if (valuesList != collectedValues) {
fail("\nExpected values: $valuesList\nCollected values:$collectedValues")
}
this
}
fun dispose() = job.cancel()
}
fun <T> Flow<T>.test(scope: CoroutineScope) = TestCollector(scope, this)
This is my test itself:
@Before
fun setup() {
Dispatchers.setMain(Dispatchers.Unconfined)
}
@Test
fun testFlowCollector() = runBlockingTest {
var firstEmit = true
val channel = ConflatedBroadcastChannel(0)
val testCollector = channel.asFlow().onEach {
if (firstEmit) {
launch {
channel.send(1)
channel.send(2)
}
firstEmit = false
}
}.test(this)
testCollector.assertValues(0, 1, 2)
testCollector.dispose()
}
So basically I'm using blocking test and what I'm trying to do, is launch those send
s only on the first emit. What I'm expecting to collect are all those three numbers 0, 1, 2
sequentially. But the test fails:
java.lang.AssertionError:
Expected values: [0, 1, 2]
Collected values:[0, 2]
For some reason, the emission of 1
is getting lost.
I'm trying to understand what's going on. Most probably I'm just misunderstanding/misusing the flow and the channel. Maybe there are coroutine experts who may explain it to me. 🙂addamsson
01/18/2020, 5:55 PM.join()
a Job
? What I have right now looks like this:
@Test
fun someTest {
myObj.doSomething().join()
}
My problem is that I can't call join()
if I want to wait for the coroutine to complete because I have no CoroutineScope
but there is no runBlocking
in a common project which what I usually use in my jvm tests. What can I do instead to solve this problem?Gabriel Feo
01/18/2020, 7:30 PMBabacar Tall
01/18/2020, 8:19 PMMauricio Barbosa
01/19/2020, 4:52 PMDoing first request
retrieving first request
Exception in thread "main" kotlinx.coroutines.JobCancellationException: Parent job is Completed; job=ScopeCoroutine{Completed}@383534aa
Has someone any ideia of what am I doing wrong?svenjacobs
01/20/2020, 7:36 AMFlow
that achieves the following: If the Flow has not produced at least one value in a given timeframe (x miliseconds), produce a default value followed by any values the Flow might produce afterwards. In contrast to a solution with debounce
however, if the Flow has produced a value before the timeframe, the default value will just be ignored and the value will be emitted immediatly.hmole
01/20/2020, 12:53 PMDrew Hamilton
01/20/2020, 4:18 PMinternal suspend fun <E> ReceiveChannel<E>.forEach(action: (E) -> Unit) {
val iterator = iterator()
while (iterator.hasNext())
action.invoke(iterator.next())
}
I was surprised that such an extension didn’t already exist—things like consumeEach
only consume existing items in the Channel and then cancel. Am I either missing a function that does what I want, or missing a reason that I shouldn’t do this?diesieben07
01/20/2020, 7:08 PMfun foo(): Flow<ResultA>
and each ResultA
has a fun getElements(): Flow<SubResult>
. The getElements
is tied to the same execution context as the containing flow and has to be cancelled with it.eygraber
01/20/2020, 9:00 PMFatal Exception: JobCancellationException: Job was cancelled
Is there any sort of global debugging I can turn on to try and track this down? If so, are any of them safe for a production environment?svenjacobs
01/21/2020, 8:33 AMFlow
🙂 I have a Flow<State>
that represents an application (UI) state. Now what I want to do is convert this into a flow with certain "side effects" on parts of the state. Here's example code:
data class State(
val string: String? = null,
val number: Long = 0
)
val flow = flowOf(
State(
string = "Hello world",
number = 0
),
State(
string = "Hello world",
number = 1
),
State(
string = null,
number = 1
),
State(
string = "Hello world 2",
number = 1
)
)
val flow2 = flowOf(
flow.mapNotNull { it.string }
.onEach { println("string $it") },
flow.map { it.number }
.onEach { println("number $it") }
).flattenMerge()
val job = flow2.launchIn(GlobalScope)
The thing is, in the isolated example this works however in my real application I have the following behaviour:
1. The first State object triggers both onEach
, so the output is string
and number
.
2. Any subsequent changes to State
will always only trigger the second onEach
, which is number
. (it is always the last Flow onEach
that is triggered here. So if there are three inner Flows then only the third will be called)
3. If I use flattenConcat()
instead of flattenMerge()
then only the first onEach
will be called for every State change.
I'm lost here. Any ideas? Why am I doing this? Imagine that later a distinctUntilChanged()
is added before each onEach
so that the side effect is only triggered when the selected value of the state has changed.
But first of all this should even work without distinctUntilChanged()
. Any ideas?orafaaraujo
01/21/2020, 1:07 PM.catch { }
operator of Flow
should not get one NPE
that might occurs inside it?dave08
01/21/2020, 5:18 PMflowOn
here:
inline fun <reified T : Any, reified R : Any> Flow<T>.concurrentMap(
dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
crossinline mapFun: suspend (T) -> R
)Flow<R> = flatMapMerge {
flow { emit(mapFun(it)) }.flowOn(dispatcher)
}
?RoqueRueda
01/21/2020, 6:10 PMsuspend () -> String
however when I try to compile it says: cannot provide dependency without provides annotationChristophe Smet
01/21/2020, 6:34 PMPaul Woitaschek
01/22/2020, 11:21 AMmyanmarking
01/22/2020, 4:44 PMpaulblessing
01/22/2020, 9:12 PMsuspendCancellableCoroutine
to cause .join()
after cancellation to suspend until the Continuation is resumed / the background work is finished, similar to how things work when using a withContext
to switch threads?
I'm curious if I can get these to behave the same:
private suspend fun runViaThread(): String {
return suspendCancellableCoroutine { continuation ->
thread {
println("Starting background work")
Thread.sleep(2000)
println("Finished background work")
continuation.resume("done")
}
}
}
private suspend fun runViaWithContext(): String {
return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Starting background work")
Thread.sleep(2000)
println("Finished background work")
"done"
}
}
@Test fun testViaThread() {
/*
Prints:
Starting background work
Before join "coroutine#2":StandaloneCoroutine{Cancelling}@5c7fa833
After join "coroutine#2":StandaloneCoroutine{Cancelled}@5c7fa833
Finished background work
*/
runBlocking {
val job = launch(Dispatchers.Default) { runViaThread() }
delay(1000)
job.cancel()
println("Before join $job")
job.join()
println("After join $job")
delay(3000)
}
}
@Test fun testViaWithContext() {
/*
Prints:
Starting background work
Before join "coroutine#4":StandaloneCoroutine{Cancelling}@2b98378d
Finished background work
After join "coroutine#4":StandaloneCoroutine{Cancelled}@2b98378d
*/
runBlocking {
val job = launch(Dispatchers.Default) { runViaWithContext() }
delay(1000)
job.cancel()
println("Before join $job")
job.join()
println("After join $job")
delay(3000)
}
}