AdalPari
12/10/2020, 6:59 PMsuspend fun foo1() // function
suspend fun foo2() // function
First approach:
// sequential code
viewModelScope.launch { foo1() }
// more sequential code
viewModelScope.launch { foo2() }
// more sequential code}
Second approach:
viewModelScope.launch {
// sequential code
foo1()
// more sequential code
foo2()
// more sequential code
}
I've also saw this second approach with async
calls inside the main launch
one.
Thanks!Animesh Sahu
12/11/2020, 4:54 AMorg.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2-native-mt
problem started yesterday... (Project builds fine)Ryu Ishikawa
12/11/2020, 6:55 AMsuspendCoroutineUninterceptedOrReturn
function. It looks like the lowest API I can reach.
(since it says throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
)
Is there any good document to understand how it works in bytecode level or anything else?
Especially, I'd like to understand how suspendCoroutineUninterceptedOrReturn
capture the current running coroutine.
thanks you!
(If it's not good to post that here, please let me know where to post 🙇 )Dariusz Kuc
12/11/2020, 4:45 PMsuspend fun doWork() = coroutineScope {
val result = ...
// do some other processing that calculates/processes result
publishData(result) // optional stuff that may be slow, e.g. publish analytics, debug info, etc
result
}
Thanks to structured concurrency if I launch another coroutine from the doWork
, all the child coroutines (including publishData
from above) have to complete before returning from the scope. I could obviously just do GlobalScope.launch { publishData }
but that seems “wrong”.
Wondering whether instead I should be using something like buffered Channel
so publish is async and data is processed in another coroutine. Or maybe there is something simpler?bitkid
12/11/2020, 7:55 PMAzur Haljeta
12/11/2020, 8:18 PMisActive
. But how to make it cooperative when you're dealing with code which doesn't include loops, i.e. when you don't have a right place to put the isActive
check?Lilly
12/11/2020, 9:01 PMprivate val packetChannel: Channel<ResponsePacket> = Channel(Channel.UNLIMITED)
suspend fun sendPacketWithBatchResponse(packet: RequestPacket): List<ResponsePacket> {
sendPacketWithAcknowledgement(packet)
val packets = mutableListOf<ResponsePacket>()
for (item in packetChannel) {
Timber.e("here")
packets.add(item)
}
// Unreached code
return packets
}
// Another non-suspending function which is decoupled from the consuming function
fun produce() {
...
packetChannel.offer(packet)
}
The goal is to offer multiple packets (~20) to the channel and when finished jump to the sendPacketWithBatchResponse
and consume these packets all at once but sendPacketWithBatchResponse
never returns and I don't know whyAndrea Giuliano
12/12/2020, 5:31 PMchristophsturm
12/13/2020, 10:15 AMAness Iqbal
12/13/2020, 4:26 PMTymmm1
12/13/2020, 9:25 PMcollect
just once from a StateFlow
or SharedFlow
? One can call cancel()
on the job of the coroutine that is collecting but that seems a bit cumbersome...Nikky
12/14/2020, 9:34 AMRob
12/14/2020, 7:41 PMmng
12/14/2020, 10:41 PMTimerFlow
object.
I wanted to Unit Test this object but I stumbled across a problem, because I am exposing a SharedFlow
for every “tick” of the Timer I am having trouble testing it since the replay
value is set to 0.
What are the best practices in regards to setting up a Timer like I am here, should I even use a SharedFlow
to expose every tick or should I be using something else?
If the approach I am using is correct, what would be the proper way for me to test this?Nikky
12/15/2020, 9:57 AMbrandonmcansh
12/15/2020, 3:56 PMcombine
?Rob
12/15/2020, 5:07 PMprivate val events: MutableSharedFlow<Unit> =
MutableSharedFlow()
fun foo(): Pair<String, String>? = null
private suspend fun bar(): String = events
.transform {
emit(
foo()
?.let { it.first }
?: return@transform
)
}
.first()
Tower Guidev2
12/16/2020, 12:48 PMkotlinx.coroutines.channels.actor
in my Android `androidx.work.CoroutineWorker`(s).
What I am trying to achieve is that my Worker makes multiples calls to a remote API endpoint
and each API response is sent to an actor channel to be processed on another thread so that I can immediatley
start the API call for subsequent pages of data
I have these message types
sealed class Message {
class Responsible(val value: Response<List<MyApiData>>) : Message()
class GetValue(val deferred: CompletableDeferred<Int>) : Message()
}
This Actor
fun CoroutineScope.persistActor() = actor<Message>(context = Dispatchers.Unconfined, capacity = Channel.UNLIMITED) {
for (msg in channel) {
when (msg) {
is Message.Responsible -> managePage(msg.value)
is Message.GetValue -> msg.deferred.complete(1)
}
}
}
private lateinit var persister: SendChannel<Message>
each page returned from my API is processed by this recursively called function:-
private suspend fun managePages(accessToken: String, response: Response<List<MyApiData>>) {
when {
result != null -> return
response.isSuccessful -> persister.send(Message.Responsible(response))
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) managePages(accessToken, service.documents(accessToken, parts[1]))
}
}
Once all the pages have been retrieved from the remote end point I execute the following code to await the actor to complete persisting the returned data
val completed = CompletableDeferred<Int>()
persister.send(Message.GetValue(completed))
println("Counter = ${completed.await()}")
persister.close()
what I am concerned about
1). is this a "good" approach?
2). Does my worker wait for all data to be persisted before completing?
3). What improvements could I make?Dariusz Kuc
12/16/2020, 5:33 PMpublic fun <T : kotlin.Any> org.reactivestreams.Publisher<T>.asFlow(): kotlinx.coroutines.flow.Flow<T>
public fun <T : kotlin.Any> kotlinx.coroutines.flow.Flow<T>.asPublisher(): org.reactivestreams.Publisher<T>
This is valid
// inferred -> flow: Flow<Any?>?
val flow = when (val publisherOrFlow: Any? = fetchValue()) {
is Publisher<*> -> publisherOrFlow.asFlow()
is Flow<*> -> publisherOrFlow
else -> null
}
Yet this is invalid
// inferred -> publisher: Publisher<*>?
val publisher = when (val publisherOrFlow: Any? = fetchValue()) {
is Publisher<*> -> publisherOrFlow
is Flow<*> -> publisherOrFlow.asPublisher() // <-- issue here -> inferred type Any? is not a subtype of Any
else -> null
}
Florian
12/17/2020, 7:58 AMflatMapLatest
that I can trigger explicitly and only once? I want to switch a Flow to another Flow if a certain condition is metLauren Yew
12/17/2020, 4:18 PMDominaezzz
12/18/2020, 9:59 PMscope + SupervisorJob(scope.coroutineContext.job)
? I just want to create a child coroutine scope.louiscad
12/18/2020, 10:02 PMscope.launch { supervisorScope { ... } }
which you can then extract as an extension function.Florian
12/19/2020, 7:44 AMcontinuation.resume
requires a 2nd argumentLilly
12/19/2020, 10:42 PMoverride suspend fun connect(
scope: CoroutineScope,
device: BluetoothDevice,
uuid: UUID,
secure: Boolean
) {
dataSource.connectToSocket(device, uuid, secure) { socket ->
scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
try {
dataSource.listenToSocket(socket)
} catch (e: Throwable) {
Timber.tag(TAG).e("Error: ${e.message}.")
throw e
}
}
startCommunication()
}
}
How can I pass the exception to the caller of connect so that the caller can catch this exception?
My approach with a nested coroutine might be wrong: I need another coroutine here because dataSource.listenToSocket(socket)
is a suspending function with a while (true) { ... }
statement and therefore won't never return, but I have to call startCommunication()
directly after calling dataSource.listenToSocket(socket)
.Ivan Pavlov
12/21/2020, 4:17 PMprivate val eventFlow = MutableSharedFlow<BotStatusChangeEvent>()
override val events: Flow<Event> = eventFlow
override fun sendEvent(event: Event) {
coroutineScope.launch {
eventFlow.emit(event)
}
}
What is the correct way to test that if I do
sendEvent(Event(1))
sendEvent(Event(2))
sendEvent(Event(3))
I receive all events in events
flow?christophsturm
12/22/2020, 11:45 AMMarc Knaup
12/22/2020, 4:23 PMkotlinx-coroutines-reactive
-based coroutines using runBlockingTest
?
The related issue (https://github.com/Kotlin/kotlinx.coroutines/issues/1204) and pull request (https://github.com/Kotlin/kotlinx.coroutines/pull/1206) seem to be abandoned for a long time.WukongRework.exe
12/22/2020, 11:17 PMShalom Halbert
12/23/2020, 10:29 AMlaunch
is used, we use dispatchers IO
and Default
to run code asynchronously. Why is async
run asynchronously if you do not run on one of those dispatchers? It's possible I'm misunderstanding something.Shalom Halbert
12/23/2020, 10:29 AMlaunch
is used, we use dispatchers IO
and Default
to run code asynchronously. Why is async
run asynchronously if you do not run on one of those dispatchers? It's possible I'm misunderstanding something.Circusmagnus
12/23/2020, 11:53 AMin different threads
It just means, that they do not depend on each other, are not synchronized in any way. They may still run in parallel OR be just interleaved on single thread.
suspend functions may, well... suspend their execution - waiting for something. This does not block a thread. So, during this suspension, other functions / coroutines may be run on this thread. And, when they finish, our suspend function may resume in the very same thread.
Considering code
suspend fun one() {
delay(100)
}
suspend fun two() {
callSomeOtherSuspendFun
}
singleThreadScope.launch {
doSth()
one()
doSomeOtherThing()
}
singleThreadScope.async { two() }
We are on single thread. The launch will start first. But soon enough it will suspend for 100 milliseconds. From this moment our async may start executing. It too cuould get suspended, because it is calling suspending function - making room for launch to resume its execution.
So conceptually, both launch and async are executing at the same time. We cannot know, which one will finish first. They are asynchronous, even, when run on single thread.
viewmodelScope
(which is run on Dispatchers.Main). They will soon enough suspend, waiting for network calls / db queries to complete. And while they are suspended, other data fetching-corotuines may run on Main Thread. What is more - UI can run on Main thread too, uninterrupted.
As long as you are not calling any blocking functions, you are free to go on Main Thread, and just forget about threading completely.
*blocking function - function, which will execute for a considerable time, bloking a thread. suspend functions exposed by Room or Retrofit are not blocking, and can be called on Main Thread no problem.withContext()
• wrap non-blocking callback APIs in suspendCancellableCorotuine
- which will make coroutine suspend until called back by callback
• launch
and async
without any Dispatcher specified. If execution needs to be switched to different thread, assume, that suspend fun
, which you are calling, has taken care of that (via withContext()
or suspendCancellableCoroutine
)streetsofboston
12/23/2020, 1:53 PM