Mayur
02/25/2021, 10:09 AMzygote: Explicit concurrent copying GC freed 35008(4MB) AllocSpace objects, 5(164KB) LOS objects, 89% free, 2MB/20MB, paused 211us total 67.413ms
zygote: Increasing code cache capacity to 512KB
Choreographer: Skipped 48 frames! The application may be doing too much work on its main thread.
zygote: Do partial code cache collection, code=53KB, data=54KB
zygote: After code cache collection, code=53KB, data=54KB
zygote: Increasing code cache capacity to 256KB
(after some optimization it lowered but still having the issue)
I need help to select the right approach understand the issue insightful in way to resolve and use correctly.ribesg
02/25/2021, 10:10 AMCoroutineScope(Dispatchers.Default + CoroutineExceptionHandler { _, e ->
log.error("Uncaught error in scope", e)
})
But it looks like when an error occurs, or rather after a few errors, coroutines basically stop like if the scope was dead or something. Eveyrthing seems stuck. Does calling the CoroutineExceptionHandler cancel the scope’s job or something? Do I need to specify a + SupervisorJob()
when building the scope? I don’t really understand what’s happeningdan.the.man
02/25/2021, 8:16 PMcollect
or onEach
to subscribe to the updates on StateflowSecretX
02/26/2021, 6:55 AMopen class Listener<T>(element: T) {
var value: T = element
protected set(value) {
field = value
notifyObservers(field)
}
private val observers: ConcurrentHashMap<UUID, Observer<T>> = ConcurrentHashMap()
private fun notifyObservers(newValue: T) {
observers.forEach { it.value.onChange(newValue) }
}
fun addListener(observer: Observer<T>): Subscription {
var uuid = UUID.randomUUID()
while(observers.contains(uuid)) {
Thread.sleep(1)
uuid = UUID.randomUUID()
}
val subscription = Subscription(uuid, this)
observers[uuid] = observer
return subscription
}
fun unsubscribeListener(uuid: UUID) = observers.remove(uuid)
fun unsubscribeAll(): Int {
val num = observers.size
observers.clear()
return num
}
}
SecretX
02/26/2021, 6:56 AM(DefaultDispatcher-worker-5 @coroutine#5) Subscribing 10000 listeners took 177ms
(DefaultDispatcher-worker-4 @coroutine#4) Subscribing 10000 listeners took 178ms
(DefaultDispatcher-worker-1 @coroutine#3) Subscribing 10000 listeners took 178ms
(DefaultDispatcher-worker-3 @coroutine#2) Subscribing 10000 listeners took 178ms
(DefaultDispatcher-worker-6 @coroutine#6) Subscribing 10000 listeners took 178ms
(main @coroutine#1) Unsubscribing all 49962 listeners took 0ms
Rob
02/26/2021, 3:14 PMMutableStateFlow<T : Any>
where the initial value is not set immediately? I am migrating from RxJava2 BehaviorSubject<T : Any>
. The value is not initialized until some process finishes. Observers of the value are transient. I would rather it not be a T?
where I filter out the initial null value. Thanks!voben
02/26/2021, 3:31 PMjeff
02/26/2021, 3:51 PM@Test
fun `emitting too fast`() {
val scope = CoroutineScope(Dispatchers.Default)
val actionFlow = MutableSharedFlow<String>(extraBufferCapacity = 10)
var success = false
actionFlow
.onEach { success = true }
.launchIn(scope) // 1
actionFlow.tryEmit("HELLO") // 2
runBlocking {
withTimeout(1000) {
while (!success) {
yield()
}
}
success shouldBe true
}
}
I understand why, I think, it's because the 'subscription' to actionFlow is launched (1
) and therefore hasn't happened yet by the time I try to emit (2
). Therefore, because `SharedFlow`s don't buffer or anything by default, the emission of "HELLO" is just dropped.
My question is, what's the best way to make sure 1
is ready before I do 2
. I can imagine there's a way to use SharedFlow.subscriptionCount
but that seems inelegant. Any better way?Ahmed Ibrahim
02/26/2021, 3:53 PMonEach
variant but with onEachLatest
instead? I have a flow that has an onEach
operator on its chain, and in its body, there is a suspendCancellableCoroutine, and I want it to be cancelled when a new value is emitted, I tried looking around but couldn't find any.Erik
02/26/2021, 6:47 PMobject
with private construct-once properties (e.g. private val x = SomeConstructor()
) and public suspend
functions that access these properties. The suspending funs all use withContext(myContext) {}
to ensure they access the properties from the correct thread. How can I ensure that the properties are instantiated on that same thread? Would it be enough to use private val x by lazy { SomeConstructor() }
as long as x
is only accessed from within withContext(myContext)
? Will that guarantee that x
is constructed on the thread(s) backing myContext
?Big Chungus
02/27/2021, 10:14 AMBee
02/27/2021, 3:21 PMrunBlocking
and coroutineScope
, which I put into this question. Could someone help me with it, please?william
02/27/2021, 3:35 PM// ignore these warnings
kotlin.target.compilations.all {
allKotlinSourceSets.forEach { sourceSet ->
sourceSet.languageSettings.apply {
useExperimentalAnnotation("kotlinx.coroutines.FlowPreview")
useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi")
useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI")
}
}
}
Big Chungus
02/27/2021, 7:25 PMspierce7
02/28/2021, 1:09 AMtheapache64
03/01/2021, 1:15 PMOrhan Tozan
03/01/2021, 6:16 PMflatMapConcat { }
2️⃣ flatMapMerge { }
3️⃣ i dont remember using any of themRechee Jozil
03/01/2021, 10:44 PMspierce7
03/02/2021, 12:57 AMdelay
in them?Aslam Hossin
03/02/2021, 8:48 AMsuspend fun <T> wrapStatusListener(value: T, block: (StatusListener) -> Unit): T {
return suspendCancellableCoroutine { continuation: CancellableContinuation<T> ->
block(object : StatusListener() {
override fun onSuccess() {
continuation.resume(value)
}
override fun onError(errorInfo: ErrorInfo) {
continuation.resumeWithException(
Exception(
code = errorInfo.code,
status = errorInfo.status,
errorMessage = errorInfo.message
)
)
}
})
}
}
Bradleycorn
03/02/2021, 3:58 PMclass MyViewModel(): ViewModel() {
private var job: Job = viewModelScope.launch {
// Some long running async task, like polling a server or something
}
fun onSomeUIEvent() {
job.cancel()
}
}
Patrick Ramsey
03/03/2021, 3:04 AMdeferToThread()
? IE, a neat, packaged way to spawn a thread/run some blocking work on a thread pool, and monitor it from a coroutine which resumes the caller when the blocking code finishes?Vsevolod Tolstopyatov [JB]
03/03/2021, 3:50 PMkotlinx.coroutines
1.4.3 and 1.4.3-native-mt are here!
• CoroutineStart.UNDISPATCHED
promoted to stable API
• Better support of ThreadContextElement
and its integrations
• Proper support of onUndeliveredElement
in unlimited channels
• Various performance improvements and bug fixes
Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.4.3mikehearn
03/03/2021, 4:42 PMonReceiveOrNull
is already imported, but, not being used.Brian Carr
03/03/2021, 8:15 PMManu Eder
03/04/2021, 4:50 PM// f1 is a flow of somewhat random numbers, no deeper meaning
val f1 : Flow<Int> = flowOf(1, 1, 2, 3, 5, 8, 3, 1, 4, 5, 9, 4, 3, 7, 0, 7, 7, 4, 1, 5)
/*
Let's imagine that we are told f1 consists of "packets", which start with a length field followed by as many numbers
as indicated in the length field. We want to transform f1 into a flow where each element is the sum of the numbers
in the "payload" of one packet.
*/
runBlocking {
println("package sums:")
f1.transformImperatively<Int, Int> {
while (true) {
if (hasNext()) {
emit(sum(next()))
} else {
break
}
}
}.collect(::println)
}
code for sum:
suspend fun (FlowIterator<Int>).sum( count : Int ) : Int {
var s : Int = 0
for (i in 1..count) {
if( hasNext() ) {
s += next()
} else {
break
}
}
return s
}
It's not finished, more a proof-of-concept.
(In particular, I haven't had time yet to try and understand flow cancellation, or the context preservation properties expected of flows, so don't expect it to behave correctly in that regard yet. Though I would very much expect it to be possible to make it behave correctly in that sense as well.)
I would be happy about any kind pointers to discussions about similar things, reasons why something like this isn't or shouldn't be in kotlinx.coroutines, reasons why it might break down later on, etc.streetsofboston
03/04/2021, 6:14 PMprivate val _myFlow = MutableStateFlow<String>("")
val myFlow: SharedFlow<String> = _myFlow.asStateFlow()
over
private val _myFlow = MutableStateFlow<String>("")
val myFlow: SharedFlow<String> = _myFlow
?
A MutableStateFlow is a StateFlow; why the extra call to asStateFlow()
?Kris Wong
03/04/2021, 7:56 PMEllen Spertus
03/04/2021, 9:26 PMlaunch
(which presumably puts something on another thread) and suspend
functions, which enable that thread to be used for another coroutine?
I'm working in the context of Android, where it's vital that the main thread not get blocked. Why isn't launching a database access, for example, sufficient to guarantee main thread responsiveness? Is the issue that there are a small number of worker threads so no single database access should monopolize one?william
03/05/2021, 12:20 AMwilliam
03/05/2021, 12:20 AM.consumeAsFlow().onCompletion { ... }
?mng
03/05/2021, 1:49 AM