oday
11/15/2021, 1:18 PMandThen()
viewModelScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
petsRepository.getPets(this).collect { pets ->
deviceRepository.getDevices(this).collect { devices ->
SecretX
11/15/2021, 2:38 PMJob#invokeOnCompletion
always invoked independent on the result of that Job (e.g. it gracefully completed, or it was cancelled while suspended)?AmrJyniat
11/15/2021, 3:44 PMLiveData builder
in flow?dimsuz
11/16/2021, 11:39 AMval flow1 = flowOf { emit(1); delay(3000); emit(2) }
launch {
flow1.collect()
flow2.collect()
}
Justin Tullgren
11/16/2021, 4:32 PMdimsuz
11/16/2021, 4:58 PMsupervisorScope
in which I want to do mutliple launch
-es and allow each one to fail individually without cancelling other running ones. At the same time I'd want to install an exception handler on the supervisorScope
itself to centrally handle everyting.
But it seems that this can't be done, according to official documentation, i'll need to do launch(handler)
every time. Do I understand this right?
It's not problematic, but I'd rather avoid "forgetting" to pass "handler" to launch
and have it automatically provided...Rak
11/16/2021, 11:28 PMrg.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.5.2
and tried to use GlobalScope.rxSingle but that method is not available on GlobalScope.Richard Gomez
11/17/2021, 1:19 AM@GET
suspend fun get(): Uni<String> = uni { getFoo() } // run coroutine and return the result as a "Uni", which is similar to Reactor's Mono.
The existing 3rd-party helpers in kotlinx.coroutines' rely on AbstractCoroutine
, which warns that it shouldn't be used outside of the core lib. (e.g. mono builder)ursus
11/17/2021, 2:07 AMsomeOtherPlace {
someUserScope.launch {
val userFlow: Flow<User?> = userDao.user()
userFlow
.collect {
// can 'it' be null here?
}
}
}
fun logout() {
allUserCoroutineScopes.forEach { it.cancel() }
userDao.deleteUser()
}
lets assume someUserScope
is in the allUserCoroutineScopes
list, and I'll only delete user after running the cancelation forEach
Is there a way the flow at the top will emit null? Should never happen, right? Is it safe to !!
there? (I'm looking at sources but its non obvious to me)tursunov abdulbois
11/17/2021, 2:18 PM@Singleton
class AuthenticationInterceptor @Inject constructor(profileRepository: ProfileRepository, @ApplicationScope coroutineScope: CoroutineScope) :
Interceptor {
private val tokenFlow: Flow<String?> = profileRepository.getProfileToken()
.stateIn(coroutineScope, SharingStarted.Eagerly, null)
override fun intercept(chain: Interceptor.Chain): Response {
val requestBuilder = chain.request().newBuilder()
val token: String? = runBlocking { // this line should be changed
tokenFlow.firstOrNull()
}
token?.let { requestBuilder.addHeader("Authorization", it) }
return chain.proceed(requestBuilder.build())
}
}
elye
11/18/2021, 4:06 AMoverride fun onDraw(canvas: Canvas) {
super.onDraw(canvas)
CoroutineScope(Dispatchers.Default).launch { // <-- add coroutine
canvas.drawLine(
0f, 0f,
width.toFloat(), height.toFloat(),
strokePaint
)
}
}
More detail (with image) hereloloof64
11/18/2021, 12:28 PMcoroutineScope.launch {
in order to start a coroutine in my code. But is there a variant of launch
which runs the coroutine for a given amount of time, instead of infinitely ? Because otherwise, I'm afraid my code, which is a bit a kind of spaghetti as now, will be worse.Roeniss Moon
11/19/2021, 3:12 AMColton Idle
11/19/2021, 8:00 AMsuspend fun getThingFlow(myId: String): Flow<Thing?> {
val thingFlow = MutableStateFlow<Thing?>(null)
FirebaseFirestore.getInstance()
.document("things/$myId")
.addSnapshotListener { value, error ->
thingFlow.emit(value!!.toObject<Thing>())
}
return thingFlow
}
hfhbd
11/19/2021, 10:34 AM1.6.0
support calling suspend functions (and flow collect) from other iOS threads?Ayfri
11/20/2021, 8:37 AMTolriq
11/20/2021, 10:45 AMflowX.drop(1)
And would like to have a function that collect them all in a scope and call 1 function when any value is received (would be nice it the result was a flow so I can debounce too)
So some kind of combine but without types and transform and supporting any number of input flows.Vsevolod Tolstopyatov [JB]
11/22/2021, 11:54 AMlimitedParallelism
along with <http://Dispatchers.IO|Dispatchers.IO>
being unbound for limited parallelism views.
• K/N new memory model is now part of the release and all the coroutine primitives can be shared across multiple threads if the new memory model is enabled.
• A lot of improvements and bug fixes.
Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.6.0-RCThomas
11/22/2021, 9:28 PMnative-mt
release for 1.6.0
?ursus
11/23/2021, 1:44 AMbuffer
equivalent in Flow? flow.buffer is not the same behavior from what I see, i.e. I want to collect upstream emits into some buffer, and only emit the buffer when its full/time elapses etculi
11/23/2021, 9:02 AMlimitedParallelism
in 1.6.0 looks very promising. One question that jumped immediately to my mind: Is this feature recursive? i.e. let’s say I have 2 endpoints I want to serve and I want to limit thread usage to 64 overall, but only allow 50 threads per endpoint. Would the following code do that?
val overallIODispatcher = Dispatchers.IO.limitedParallelism(64)
val endpoint1Dispatcher = overallIODispatcher.limitedParallelism(50)
val endpoint2Dispatcher = overallIODispatcher.limitedParallelism(50)
rook
11/24/2021, 3:20 PMcallbackFlow
. I want to ensure that the value emitted from the flow matches the value that the callback receives. My problem appears to be that the ProducerScope
block doesn’t get called until something subscribes to the flow. I thought that using shareIn
would fix my problem, but it still doesn’t appear to kick off the ProducerScope
in time
fun someTest() = runBlocking {
val expected = "someMessage"
val resultFlow = getCallbackFlow().shareIn(this, SharingStarted.Eagerly, replay = 1)
sendMessageToTriggerCallback(expected)
val actual = resultFlow.first()
assertEquals(expected, actual)
}
After adding some logging, I find that what happens is that first, sendMessageToTriggerCallback
is invoked, then the callback is registered in the ProducerScope
, then it suspends indefinitely awaiting the first signal from the resultFlow
. I’ve tried a lot of variations on this pattern, and I can’t seem to make the ProducerScope
run without blocking my ability to subsequently trigger sending the message. I’ve tried launching callback flow collection in its own job and sending the message a separate job and yielding between them. I’ve tried leveraging onSubscription
, but that doesn’t fix the issue either, I get the same execution order as the example above. I’m at a bit of a loss as to how to enforce a deterministic execution order in this test case.Aaron Stacy
11/24/2021, 11:12 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.Executors
fun main(args: Array<String>) = runBlocking {
val requests = flow {
var i = 0
val serverDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val handlerDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
launch(serverDispatcher) {
while (true) {
i += 1
val result = i
launch(handlerDispatcher) {
delay(500)
launch(serverDispatcher) {
emit(result)
}
}
}
}
}
// I want this to:
//
// 1. Suspend while collecting each emitted value.
// 2. Continue to collect until seeing a value >= 4
// 3. At that point continue.
// 4. Cancel the flow (thereby closing the server's socket).
//
// But instead it suspends, the while loop keeps running and indicating it's emitting values, and the onEach print
// statement never executes.
requests.takeWhile { it < 4 }.onEach { println("got $it") }.collect()
}
lesincs
11/25/2021, 12:22 AMmessage
posting by Dipatchers.Main
already in message queue
but not be picked by Looper
will be removed from the message queue
if the coroutine is canceled?Tran Thang
11/25/2021, 6:41 AMval coroutineContext: CoroutineContext get() = Job() + Dispatchers.Main
Alexandru Hadăr
11/25/2021, 3:19 PMclass Launcher(dispatcher: CoroutineDispatcher) {
private val scope = CoroutineScope(SupervisorJob() + dispatcher)
fun launchItem() {
scope.launch { /* Do my thing here */ }
}
}
I suppose the correct way to test this is to use Dispatchers.setMain(myDispatchers)
in the @Before
function and then use runBlocking
in my test function, where I call launcher.launchItem
, right?
Or how should I test a method that’s not suspend
but launches a coroutine
inside ?Tgo1014
11/26/2021, 9:18 AMdelay()
s inside a runTest{}
? In my mind the delays would wait until I call advanceTimeBy()
but in practice the test just run and ignore all the delays. What’s the point of the advanceTimeBy()
then?knthmn
11/26/2021, 2:59 PMinterface Worker {
// does the work, throws Exception promptly if cancel() is called
fun doWork()
fun cancel()
}
What is the best way to convert it into suspend fun Worker.doWorkCancellable()
? Preferably something that works with a single threaded executor.Clament John
11/27/2021, 10:09 AMDennis Schröder
11/28/2021, 4:47 PMval sharedFlow = MutableSharedFlow<String> = MutableSharedFlow(replay = Int.MAX_VALUE)
Dennis Schröder
11/28/2021, 4:47 PMval sharedFlow = MutableSharedFlow<String> = MutableSharedFlow(replay = Int.MAX_VALUE)
Dominaezzz
11/28/2021, 4:55 PMJoffrey
11/28/2021, 4:58 PMgildor
11/29/2021, 4:37 AMDennis Schröder
11/29/2021, 8:11 AMgildor
11/29/2021, 8:23 AMJoffrey
11/29/2021, 8:25 AMDominaezzz
11/29/2021, 8:34 AMDennis Schröder
11/29/2021, 11:43 AM