Kris Wong
02/05/2021, 4:37 PMsuspend
function within runBlocking
, the following code should not be executed until the function completes, correct? I am seeing some flakiness in a test that doesn't make sense to me.Jason Ankers
02/05/2021, 4:43 PMlaunch {
val result = getSomething().apply {
doSomethingSuspending()
}
// code at this point never runs
}
Could anyone explain why this happens?Gilles Barbier
02/05/2021, 7:56 PMMutableSharedFlow
with a collect lambda.
val responseFlow = MutableSharedFlow<Message>(replay = 0)
...
responseFlow.collect {
...
}
When I collect a message, I do a test and for some value , I would like to leave the collect function and return a value. I'm not sure how to do that. Any idea?Gilles Barbier
02/05/2021, 9:48 PMsuspend fun test() {
o.method()
}
here, o
is a proxy obtained by Proxy.newProxyInstance(..., proxyHandler)
. So this will call the invoke
method of proxyHandler
. My question is: how can I propagate the suspend up to proxyHandler
and use a suspend function from its handle
method (without having to start a runBlocking of course) ?Florian
02/06/2021, 12:21 PMdelay
or any other blocking operation inside the collect
block of a Flow, the next value will not be collected until that delay is over right? So I can't get race conditions between successive emissions of the Flow?Brais Gabin
02/06/2021, 12:34 PMjoin
in RxJava? I have a flow of files and a flow of processors and I need to run all the processors through all the files. I'm looking for something like: filesFlow.join(processorsFlow) { file, processor -> processor.invoke(file) }
(proccessor.invoke
returns the result of the processor so I get new flow of type result)Gilles Barbier
02/06/2021, 10:39 PMfun main() {
val flow = MutableSharedFlow<Int>(replay = 0)
runBlocking {
launch {
println( flow.first { it >= 90 } )
}
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
repeat(1000) {
flow.emit(Random.nextInt(0, 100))
}
}
}
}
This code does not print anything. It prints when I remove <http://Dispatchers.IO|Dispatchers.IO>
at the second launch or (sometimes) if I add <http://Dispatchers.IO|Dispatchers.IO>
at the first launch... any explanation?Florian
02/07/2021, 1:35 AMMutableStateFlow
as my flatMapLatest
trigger when I need to allow emitting the same value in succession? Channel?Remy Benza
02/07/2021, 10:05 AM<http://Dispatcher.IO|Dispatcher.IO>
from an Android room database. What pattern is recommended to use?jean
02/08/2021, 10:22 AMprivate val supervisor = SupervisorJob()
private val coroutineScope = CoroutineScope(coroutineContext + supervisor)
...
fun onEvent(event: MyEvent) {
coroutineScope.launch {
val data = findHeavyWorkToExecute()?.invoke()
?: throw Exception(currentState, event)
someMoreWork(data)
}
}
I’m want someMoreWork()
to be executed only if the lambda returned by findHeavyWorkToExecute()
is not null and its execution does not fail. But when I test this someMoreWork()
is called even though the throw
statement occurs. What am I suppose to change there?james
02/08/2021, 12:02 PM# MainActivity.kt
override fun onStart() {
super.onStart()
var applicationID = runBlocking { Application.registerDevice() }
Log.d("AppID on-start", applicationID)
}
# Application.kt
object Application {
suspend fun registerDevice(): String {
return getTokenAsync().await()
}
private suspend fun getTokenAsync(): kotlinx.coroutines.Deferred<String> = coroutineScope {
async {
return@async FirebaseInstallations.getInstance().id.result.toString()
}
}
}
E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.example.application, PID: 20890
java.lang.IllegalStateException: Task is not yet complete
If anyone could see where I'm going wrong and help point me in the right direction on what I can do to resolve this I would really appreciate it 🙂
Thanks in advance!Niklas Gürtler
02/08/2021, 12:02 PMCancelledException
from a Coroutine, can I find out where the cancel originated from? Some of my coroutines are spuriously cancelled and I'm trying to find out why 🤔Vivek Sharma
02/08/2021, 1:32 PMnatario1
02/08/2021, 2:23 PMclass Client {
private suspend fun connectInternal(url: String) { /* does connection stuff */ }
private suspend fun disconnectInternal() { /* does disconnection stuff */ }
suspend fun connect(url: String) {
TODO("Should call connectInternal")
}
suspend fun disconnect() {
TODO("Should call disconnectInternal")
}
}
I'd like to implement connect
and disconnect
so that they call the internal APIs in a thread-safe fashion. Client can only be connected to one url at a time so, for example:
• disconnect should cancel any in-progress connection
• when connect is called from multiple threads, only the last succeeds, the others will be canceled
I can implement this in a thousand ways, but I always end up using some Java threading primitive at least in 1 place (like a synchronized block or a lock). Is there a way to achieve the same only with coroutine primitives?
One simple option is do everything in like a newSingleThreadContext
but that's more of a bad workaroundtaer
02/08/2021, 8:19 PMval pool = Executors.newFixedThreadPool(N)
val dispatcher1 = pool.asCoroutineDispatcher()
val dispatcher2 = pool.asCoroutineDispatcher()
pablisco
02/08/2021, 10:45 PMVivek Sharma
02/09/2021, 10:05 AM2 flows
and I want to observe their changes with flatMapLatest
, if either of them emit new value, I want to trigger some action, but how can I observe on 2 flows
, should I combine them ? how can I do soefemoney
02/09/2021, 11:01 AMclose
a channel if I only want to receive
a single value from it? Something like this inside some suspend function
val whatIWant = myChannel.receive()
myChannel.close()
return whatIWant
// VS
return mychannel.receive()
william
02/10/2021, 1:15 AMColton Idle
02/10/2021, 2:25 AMMaterialTheme.colors.*
I get an error of "@Composable invocations can only happen from the context of...", but if I just use a string there instead then it works fine, which shows that my @Composable is actually setup correctly. Code snipipets in thread.natario1
02/10/2021, 11:17 AMflowOn
would help but the scope there can't have a Job, which makes sense in the end. So I'm thinking of
flow.onEach { myScope.ensureActive() }
Is this the only way to do it? Are there any concerns other than performance?Slackbot
02/11/2021, 7:43 AMjean
02/11/2021, 7:56 AMChannel
to cast it to a type allowing me only to send/offer? The same way there is receiveAsFlow
to only be able to collect it, how can I restrict my code to only sending?appasni
02/11/2021, 1:59 PMfun main(): Unit = runBlocking {
println(1)
launch {
println(2)
launch {
println(3)
}
}
launch {
println(4)
}
println(5)
}
why does the above code print
1
5
2
4
3
instead of
1
2
3
4
5
everything is running in main thread?Mohamed
02/11/2021, 4:10 PMwithTimeout
inside suspendCoroutine
? something like:
suspendCoroutine{ cont->
withTimeout(1000){
// DO something
cont.resumeWithException(Exception("Timeout"))
}
}
Kshitij Patil
02/12/2021, 9:07 AMLena Stepanova
02/12/2021, 10:40 AMviewModelScope.launch {
validateEmail(user)?.let {
_errorLiveData.postValue(it)
stopLoading()
return@launch
}
}
And I want it to look something like
evaluate(validateEmail(user), launch)
with
fun evaluate(result: String?, job: (
context: CoroutineContext,
start: CoroutineStart,
block: suspend CoroutineScope.() -> Unit
) -> Job){
result?.let {
_errorLiveData.postValue(it)
stopLoading()
return@job
}
}
I get job is unresolved, so what am I doing wrong and how to make it right?Niklas Gürtler
02/12/2021, 3:42 PMfun fetchData (scope : CoroutineScope) : Pair<Deferred<Int>, Channel<Float>> {
// Start first operation
val r1 : Deferred<Int> = scope.async {
delay (200) // Pretend to do some work
42 // Return a result
}
val r2 = Channel<Float> ()
// Start second operation
scope.launch {
repeat (10) {
delay (it * 100L) // Pretend to do some work
r2.send(it.toFloat()) // Return one result
}
// Second operation done
r2.close()
}
return Pair(r1, r2)
}
fun main () {
val results = fetchData(GlobalScope)
runBlocking {
launch {
val res1 = results.first.await()
println("Result 1 : $res1")
}
launch {
for (elem in results.second) {
println ("Result 2 elem: $elem")
}
}
}
}
Without coroutines, I'd use a callback interface with functions onResult1(x:Int)
and onResult2(x : Float)
and call the first one once and the second one multiple times.natario1
02/12/2021, 4:49 PMFlow<Flow<T>>
like flatten*
which can cancel child flow collection when a new parent value is emitted? Say you have
fun parentFlow(): Flow<String> = ...
fun childFlow(id: String): Flow<Result> = ... // id comes from parentFlow
My goal is to have a single Flow<Result>
, taking the id from parent flow. One way to do this is:
val merged: Flow<Flow<Result>> = parentFlow().map { childFlow(it) }
val flat: Flow<Result> = merged.flattenConcat()
However in my case the parent id represents a kind of state, so when a new id is emitted, the flow should stop collecting the previous childFlow
. I'm struggling to find an elegant way to do this. I could collect the child inside the parent body (inside flow { }
for example), but then not sure how to cancel the previous onespierce7
02/13/2021, 12:07 AMMutableStateFlow
emits the last known value when it’s collection is started, and it also doesn’t publish a value if it equals
the previous event. If I wanted something that just receives events as they come in, and also publishes EVERY event regardless of whether it’s the exact same event as the last one, what are my options?spierce7
02/13/2021, 12:07 AMMutableStateFlow
emits the last known value when it’s collection is started, and it also doesn’t publish a value if it equals
the previous event. If I wanted something that just receives events as they come in, and also publishes EVERY event regardless of whether it’s the exact same event as the last one, what are my options?Javier
02/13/2021, 12:08 AMspierce7
02/13/2021, 12:16 AMIan Lake
02/13/2021, 1:09 AMspierce7
02/13/2021, 2:24 AM