Mukilesh Sethu
08/09/2022, 6:09 AMlaunch
) which polls data from a stream, and the reader is closed on job.invokeOnCompletion
. We use suspendCancellableCoroutine
to register a callback with the reader. 1 data point is polled at a time. We started seeing a potential memory leak with CancellableContinuationImpl/ChildContinuation
created as part of suspendCancellableCoroutine
. We expected only 1 active CancellableContinuationImpl/ChildContinuation
per job and that is the case but job holds reference to old ChildContinuation
from earlier which prevents it from getting GC'd (in the nodeList
)
My understanding is that it is detached from the job on successful invocation of resume
, which is what is expected from our service and even in the heap dump the state
of `ChildContinuation`denotes it. I am wondering if there is any other scenario where CancellableContinuationImpl/ChildContinuation
is not removed from the nodeList
? Or if there is a delay in removing it.rednifre
08/09/2022, 1:53 PMfun suspendFoo(scope: CoroutineScope): Deferred<Int> =
scope.async { 4 }
suspend fun foo(): Int {
delay(0)
return 4
}
Kirill Grouchnikov
08/09/2022, 3:23 PMSam
08/10/2022, 12:51 PMrocketraman
08/10/2022, 7:24 PMNino
08/11/2022, 1:46 PMinitializeStuffAndGetFoo()
function between my coroutines without this dangerous first() as String
?
https://pl.kotl.in/7fVqO3EXP
import kotlinx.coroutines.*
fun main() = runBlocking {
println(initializeStuffAndGetFoo())
}
suspend fun initializeStuffAndGetFoo() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
listOf(
async { getFooValue() },
async { initA() },
async { initB() },
).awaitAll().first() as String
}
suspend fun getFooValue(): String {
delay(2_000)
return "Foo"
}
suspend fun initA() = delay(1_000)
suspend fun initB() = delay(3_000)
Lukas Lechner
08/12/2022, 10:00 AModay
08/15/2022, 6:47 PMif (marketingConsentTypes != null && marketingConsentTypes.all { it in validEnumValues }) {
viewModelScope.launch(coroutineDispatchers.main) {
marketingConsentTypes.forEach { consentType ->
giveConsent.execute(
GiveConsent.Input(
consentType = GiveConsent.ConsentType.valueOf(consentType),
hasGivenConsent = true
)
).await()
}
isLoading.postValue(false)
}
} else {
error.postValue(Event(Throwable()))
}
the issue is not that I am running this usecase in a loop, that’s what I want
the issue is that I would like to get back the Throwable that this usecase already does return for me, with Rx usually I’d get it back in the error block of the subscribe, here where can I get it to let my mutable property know that the error happened?fitermay
08/15/2022, 9:45 PMfitermay
08/15/2022, 9:49 PMfitermay
08/15/2022, 9:50 PMdimsuz
08/16/2022, 11:05 AMFlow<Int>
, say 1, 20, 30, 35, how could I produce a flow which "samples" this flow and counts the difference between last elements emitted in chunked time windows?
for example sampleTime=20ms if 1,20, 30
got emitted in 20ms since start and then only one item 35
got emitted in next 20 ms, I would get 29,5
(30-1,35-30
) out.
Should I combine sample with map with timestamps? Would be grateful for direction.
Or should I write a fully custom "transform" function instead of trying to combine existing ones?Curtis Ullerich
08/16/2022, 5:08 PMZoltan Demant
08/17/2022, 6:30 AMGlobalScope.launch {}
or `scope.launch(context = NonCancellable) {}`; or are they basically the same thing? My operation is just a few millis long.Christopher Porto
08/18/2022, 7:37 AMsuspend fun test() {
val response = bar()
println("I want this to be on same thread as it was before doing bar call")
}
suspend fun bar() {
val response = someOtherCallThatSwitchesThread()
return response
}
I figured calling suspendCoroutine
inside bar then launching new coroutine that resumes the continuation with the response would work and while it does change the thread, it doesnt go back to original.Zoltan Demant
08/18/2022, 7:54 AMscope.launch {}
from MainScope? Generally, the task itself will run with the "correct" dispatcher, for example: loading something from a database is always executed using withContext(<http://Dispatchers.IO|Dispatchers.IO>)
.
But to me it seems that I might as well specify something like Dispatchers.Default
in the launch block, so that all tasks are at least off-loaded from the main thread by default? As a real life scenario, I was just working on an issue where UI froze after the user had selected a file to import from; I had failed to realize that contentResolver.openInputStream(uri) was blocking rather than reading from the inputStream itself. If I had been using Dispatchers.Default
, this wouldnt have been a problem. Now, instead I have to wrap each individual call like this (there arent many, but still) in a withContext(Dispatchers.Default/IO)
.ddimitrov
08/18/2022, 11:00 AMitnoles
08/19/2022, 1:43 AMrrva
08/19/2022, 10:07 AMfun <A, B> Iterable<A>.parallelMap(context: CoroutineContext, f: suspend (A) -> B): List<B> = runBlocking {
map { async(context) { f(it) } }.map { it.await() }
}
@OptIn(ExperimentalCoroutinesApi::class)
private val apiPool = Dispatchers.IO.limitedParallelism(10)
val results = foo.parallelMap(apiPool) { fetchStuffFromRestAPI(it.id) }
Vsevolod Kaganovych
08/19/2022, 2:49 PMMutableSharedFlow
which uses debounce?
Example: I’m using this flow to track the input text in textfield, debounce and distinct it, and if it passes I make some calculations.
But if in unit test I emit the value, it never enters the collectLatest {}
lambda.wrongwrong
08/20/2022, 6:11 AMfuture
function in kotlinx-coroutines-jdk8
available as per the sample code?
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-jdk8/#:~:text=to%20the%20future-,Example,-Given%20the%20following
The definition shows that a CoroutineScope
is required, and actually reproducing the sample code results in a compile error.
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/future.html
Also, if there is a way to achieve the same behavior as in the sample code, I would appreciate it if you could let me know.Zoltan Demant
08/20/2022, 6:13 AMCoroutineScope
, but I need to do it inside a suspending block of my scope. This feels like a really bad idea, is there a better approach?
scope.launch {
// Do some suspending work
scope.coroutineContext.cancelChildren()
// Ordering becomes very important - any code from here and on is never executed.
// This isnt a problem by itself as I dont have any more code here, but it has introduced very subtle
// bugs in the past
}
Joe Altidore
08/20/2022, 10:23 PMsuspend fun isFinished(token: String): Boolean {
return coroutineScope {
val a = async {
updateProfile(token = token)
}
val b = async{
updateStatus(token = token)
}
(a.await()) && (b.await())
}
}
Methods updateProfile(token: String) and updateStatus(token: String) are the api methodsLukas Lechner
08/21/2022, 8:22 AMdan.the.man
08/21/2022, 9:38 PMrishabhsinghbisht
08/22/2022, 5:25 AMkotlinx.coroutines.JobCancellationException
, kotlinx.coroutines.TimeoutCancellationException
in firebase crashlytics.
We are not able to reproduce this ourself. Any help appreciated?Amir hossein Abdolmotallebi
08/22/2022, 9:10 AMSergio C.
08/22/2022, 1:56 PMsuspendCoroutine
block?
suspendCoroutine { continuation -> }
I need to call continuation more than once but its throwing error
java.lang.IllegalStateException: Already resumed
at kotlin.coroutines.SafeContinuation.resumeWith(SafeContinuationJvm.kt:44)
Any way to bypass this?Lukas Lechner
08/23/2022, 8:54 AMLilly
08/23/2022, 8:48 PMcombine
the first time trying to combine multiple flows to an single object. One of the flows return items which are part of a List
. How can I get access to the previous combined object to accumulate the items of the list?Lilly
08/23/2022, 8:48 PMcombine
the first time trying to combine multiple flows to an single object. One of the flows return items which are part of a List
. How can I get access to the previous combined object to accumulate the items of the list?ephemient
08/23/2022, 9:08 PMfun <T, R> Flow<T>.withHistory(
maxSize: Int,
transform: suspend (List<T>) -> R,
): Flow<R> {
require(maxSize > 0)
return flow {
val window = ArrayDeque<T>(maxSize)
collect {
if (window.size == maxSize) window.removeFirst()
window.addLast(it)
emit(window.toList())
}
}
}
fun <T, R> Flow<T>.zipWithPrevious(
transform: suspend (T?, T) -> R,
): Flow<R> = flow {
var prev: T? = null
collect { cur ->
emit(transform(prev, cur))
prev = cur
}
}
Lilly
08/24/2022, 5:56 PMBroadcastReceivers
so they do not emit
that frequently. In my logs, I can see that one of the Flows emit but the transform
block of combine is not triggered. Does combine has some hidden behavior?fun startListening() {
combine(
bluetoothLocal.discoverDevices(),
bluetoothLocal.discoveryState(), // emits a value but transform block is not triggered
bluetoothLocal.bluetoothAdapterState(),
protocolState,
) { discoveredDevice, isScanning, isBluetoothEnabled, protocolState ->
....
}.launchIn(scope)
}
I would expect that transform block is called everytime one of the flows emit a value, isnt that the case?bluetoothLocal.bluetoothAdapterState()
does not has an initial value. When I emit an initial value like bluetoothLocal.bluetoothAdapterState().onStart { emit(bluetoothLocal.isEnabled) }
it works like expected.