Pitel
05/16/2022, 10:43 AMsomeFlow.collect {
Log.d(TAG, ">")
// Some stuff
Log.d(TAG, "<")
}
So, I should in Android's logcat alternating arrows going left and right. And in most cases, I do. But on some rare occasions, I see 2 going right and 2 going left. Any idea why that might happen?!Brian Estrada
05/17/2022, 7:14 AMDeferred<Int>
and then call it and complete it later by another function (Will add an example in a 🧵)ghosalmartin
05/17/2022, 9:59 AMSanat Tripathi
05/17/2022, 8:45 PMsuspendCancellableCoroutine
and Mutex
suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
val callback = object : Callback { // Implementation of some callback interface
override fun onCompleted(value: T) {
// Resume coroutine with a value provided by the callback
continuation.resume(value)
}
override fun onApiError(cause: Throwable) {
// Resume coroutine with an exception provided by the callback
continuation.resumeWithException(cause)
}
}
// Register callback with an API
api.register(callback)
// Remove callback on cancellation
continuation.invokeOnCancellation { api.unregister(callback) }
// At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
}
suspend fun execute() {
val mutex = Mutex(false)
mutex.lock()
try {
awaitCallback()
} catch(e: Throwable) {
throw e
} finally {
mutex.unlock()
}
}
In the above setup, the moment suspension happens at suspendCancellableCoroutine
mutex in execute
method gets unlocked part of the finally
blockNino
05/18/2022, 2:57 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flowA : Flow<Int> = flowOf(1, 2, 3)
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a).map { b: String ->
doSomeCpuIntensiveStuff(b)
}
}.collect { c: Long ->
println(c)
}
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a)
}.map { b: String ->
doSomeCpuIntensiveStuff(b)
}.collect { c: Long ->
println(c)
}
}
fun doSomeMemoryIntensiveStuffForFlowB(a: Int): Flow<String> {
// Some Memory intensive stuff related to "a" (object creation, etc)
var memoryGreedyList = listOf<Int>()
repeat(10_000) { memoryGreedyList = memoryGreedyList + listOf(it) }
return flowOf(a.toString(), a.toString(), a.toString())
}
fun doSomeCpuIntensiveStuff(b: String): Long {
// Some CPU intensive stuff related to "b" (mapping, etc)
repeat(10_000) { b.toDouble() * it }
return b.toLong()
}
https://pl.kotl.in/sOl65cZgZJoffrey
05/19/2022, 12:44 PMsuspend fun main
is considered inactive?
suspend fun main() {
println(currentCoroutineContext().isActive) // prints false
}
https://pl.kotl.in/sVFnLz9mMBastian
05/19/2022, 3:55 PMAlexandru Hadăr
05/20/2022, 1:28 PMcurrentCoroutineContext()
function?
I want to cancel my listener when my coroutine is no longer active. I don’t have access to the scope, but I can use currentCoroutineContext()
since i’m inside a suspend function. Could this have any negative impact? Like memory leak or a false-positive?
private suspend fun doEffects(colorChangeCallback: OnColorChangedCallback, valueAnimator: ValueAnimator) {
val context: CoroutineContext = currentCoroutineContext()
valueAnimator.addUpdateListener {
if (context.isActive) {
// Do your work
} else {
// Cancel other work
valueAnimator.cancel()
}
}
}
kevin.cianfarini
05/20/2022, 5:36 PMjanvladimirmostert
05/21/2022, 10:29 AMGrigorii Yurkov
05/22/2022, 10:42 AM0
. Because i
can be modified from different threads and kotlin compiler doesn't mark it as volatile
under the hoodspierce7
05/22/2022, 6:35 PMcoroutineScope {}
, calling several async methods, and collecting the deferreds into a MutableList<Deferred<Any>>
. Before the coroutine is over, I call deferreds.awaitAll()
on the list. This isn’t actually necessary if I don’t care about the results, right? A parent suspend function won’t complete until all it’s children coroutines have completed, right?andylamax
05/22/2022, 6:53 PMdelay
what other functions actually suspends?
I am trying to get an example of a function that actually suspends (like delay), Not just marked as a suspend function and goes into IO operations like many I have seenJohn O'Reilly
05/23/2022, 5:41 PMStateFlow
using stateIn
(which is called/collected from Fragment)...seemingly it work but feels like there's an issue, or at least some redundancy here but can't put my finger on why right now? (reason that's it's a suspend function instead of property is that it needs to call suspendFunctionReturningFlowB
here but probably other ways around that....)
suspend fun someFunction() = flowA.combine(suspendFunctionReturningFlowB()) { valA, valB ->
<some logic>
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), null)
uli
05/24/2022, 6:24 AMinline fun <reified T> Flow<T>.collectIn(scope: CoroutineScope, collector: FlowCollector<T>): Job {
return scope.launch {
collect(collector)
}
}
It is so obvious, that I was thinking there must be something fundamentally broken if it is not part of the coroutines library.napperley
05/24/2022, 10:45 PM// ...
if (args.isNotEmpty()) args.forEachIndexed { pos, item -> println("Arg ${pos + 1}: $item") }
println("Ping")
println("Pong")
sleep(4u)
println("Another item...")
while (true) {
usleep(500u)
}
The function for running the Serverless Function (on the Serverless Platform side):
private fun CoroutineScope.runFunction(filePath: String, vararg args: String) = launch(context = Dispatchers.Default) {
println("Running Serverless Function...")
val file = openProcess(filePath = filePath, args = args)
file.readLines(5u).forEach { println(it) }
val processName = extractProcessName(filePath)
val pid = fetchLatestPid(processName).toInt()
if (pid > 0) kill(pid, SIGINT)
file.closeProcess()
}
Tianyu Zhu
05/26/2022, 12:39 AMfun justSomeFunction() {
launchACoroutine(<http://Dispatchers.IO|Dispatchers.IO>) {
while (true) {
// just do some work
}
}
}
Exerosis
05/26/2022, 8:29 PMcoroutineScope {
println("Entered scope")
launch {
println("why not??: ${Thread.currentThread().name}")
}
}
How can I debug this printing nothing at all? I do have a non standard CorountineContext: https://srcb.in/y7TGICTNxq
However, it seems to work with everything else so I'm not sure what I'm doing wrong here.David Stibbe
05/27/2022, 12:13 AMfor( message in myChannel){ ... }
loop. I can see, via logging, that the events are being send and I see that they are read in the for loop. However, all except for one of the sending coroutines do not continue and keep being stuck in the suspended state.
Any idea what could cause this ?
Adding a buffer to the channel seems to help, but I would like to know what causes my issue ?Arjan van Wieringen
05/28/2022, 6:34 PMmap
, but if I want it to be a StateFlow again I need to use stateIn
but then I require a scope. Or is the mapped result still a stateflow?
The usecase is I have a class that I want to expose a StateFlow which is based on another one. But if I want to use stateIn
I need to do this from a coroutine. So it becomes something like this:
interface History {
val activeLocation: StateFlow<Location>
// ...
}
class Router<R>(
private val history: History,
private val routing: Routing<R>,
private val scope: CoroutineScope
) {
lateinit var activeRoute : StateFlow<R>
private set
init {
scope.launch {
activeRoute = history.activeLocation
.map { location -> routing.getRoute(location) }
.stateIn(scope)
}
}
}
I am not sure if this is the most optimal way of doing it... I have the feeling I am overcomplicating stuff.andylamax
05/29/2022, 1:19 AMOvsyannikov Alexey
05/29/2022, 3:08 AMExerosis
05/30/2022, 12:54 AMval ctx = currentCoroutineContext()
val test = object : CoroutineScope {
override val coroutineContext = ctx
}
task = test.launch(context) { block() }
Not the same as:
coroutineScope {
task = launch(context)
}
I know the first one is bad practice so are they both pretty bad?Arjan van Wieringen
05/30/2022, 8:22 AMEffectDispatcher
two ways:
// as a suspending function
interface EffectDispatcher<E> {
suspend fun dispatchEffect(effect: E)
}
// return a job
interface EffectDispatcher<E> {
val scope: CoroutineScope
fun dispatchEffect(effect: E): Job
}
As far as I understand:
In the suspending case (1st) it is the responsibility of the callee to provide a coroutinescope, and in the case of the 2nd the EffectDispatcher itself has a scope in which the effects launch and the callee gets a reference to the dispatched effect job, maybe to cancel it or something, or can cancel every running job with scope
.
Is this correct? As my preference goes I'd go for the 2nd option.
EDIT:
Option number 3 would be to have EffectDispatcher<E>
implement CoroutineScope
. I like that one best actually.Yoavya
05/30/2022, 1:52 PMConcurrentHashMap
with coroutines?
I see 2 potential issues, assuming that I want to compute a value computeIfAbsent
. I don’t think using getOrPut
is an option since it is not concurrent and the value may already exist:
1. If my compute function is suspending (can be runBlocking
with parallel computation to speed the process) then another coroutine (on the same thread) can run the same synchronized
code. I think that is a problem
2. If I am running on a dispatcher that runs multiple threads and my compute function is suspending again then I can find myself waking up in a different thread then the one that I started in (can that happen?). Will it break the synchronized block inside the ConcurrentHashMap
? I am not sure what happens here.
Is there anything that is coroutine safe equivalent to ConcurrentHashMap
or do you know if there is something planned to solve these issues? (if what I described are issues)Luke
05/31/2022, 2:04 PMcollectLatest
that can be followed by launchIn
instead of wrapping it in a launch
block? For now, I copied the implementation of collectLatest
without the collect
call but I was wondering if something already existed.Sam
05/31/2022, 3:25 PMsuspend fun get(key: K): V = supervisorScope {
asyncCache.get(key) { k: K, _ -> future { loader.load(k) } }.await()
}
The problem I'm having is with cancellation. If multiple consumers are simultaneously waiting for a given key, and one of them stops waiting (i.e. is cancelled), the current implementation will have all the consumers receive a cancellation exception.
I can change that by loading the key in a separate scope (e.g. a dedicated CoroutineScope belonging to the cache, rather than a scope belonging to one of the consumers) and replacing await()
with asDeferred().await()
as suggested by the docs. But then I have a problem where nobody can cancel the job. Even if all the consumers stop waiting, the cache entry will continue loading. To handle that problem I would need to introduce some lifecycle management into the cache, e.g. make it Closeable
and have it clean up its own coroutines on close.
Has anybody encountered this before and found a nice solution?Endre Deak
05/31/2022, 10:01 PMimport kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import sun.misc.Signal
suspend fun main() {
withContext(Dispatchers.Default) {
Signal.handle(Signal("INT")) {signal ->
println("RECEIVED ${signal.name}")
this.coroutineContext.cancelChildren()
}
launch {
withContext(Dispatchers.Default) {
launch {
while (true) {
try {
println("foo")
// delay(2000) // if I put delay here, it fails with an infinite loop on SIGINT
} catch (e: Exception) {
println("error: ${e.message}")
}
delay(2000) // if I put delay here outside try-catch, no problem on SIGINT
}
}
}
}
}
println("finished")
}
Exerosis
06/01/2022, 12:44 AMrunBlocking(MyContext) {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
//doesn't contain my context anymore
}
launch {
//still contains my context
}
}
I need to be an identifier that indicates that a call is part of the same sequence of calls. Worst case even if I could just get it to log some kind of warning that would be a good start.Eric Chee
06/01/2022, 2:54 PMMutableStateFlow
for updating and a public StateFlow
that uses .stateIn()
Example:
class ExampleViewModel: ViewModel() {
private val _state = MutableStateFlow<Int>(0) // Set initial value
val state: StateFlow<Int> = _state
.stateIn(
scope = ...,
started = ...,
initialValue = 0 // Have to duplicate initial value :(
)
fun onSomeAction() {
_state.getAndUpdate { lastState -> lastState + 1 }
}
}
Eric Chee
06/01/2022, 2:54 PMMutableStateFlow
for updating and a public StateFlow
that uses .stateIn()
Example:
class ExampleViewModel: ViewModel() {
private val _state = MutableStateFlow<Int>(0) // Set initial value
val state: StateFlow<Int> = _state
.stateIn(
scope = ...,
started = ...,
initialValue = 0 // Have to duplicate initial value :(
)
fun onSomeAction() {
_state.getAndUpdate { lastState -> lastState + 1 }
}
}
Joffrey
06/01/2022, 2:55 PMstateIn
here?Eric Chee
06/01/2022, 3:04 PMcombine(<ExternalFlows>) {
...
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = AuthorScreenUiState(AuthorUiState.Loading, ...)
)
Joffrey
06/01/2022, 3:11 PMEric Chee
06/01/2022, 4:16 PM.stateIn()
for consuming cold flows into a StateFlow in the case i want those cold flows to stay active depending on the subscriber sharing behavior i defined?
• B) If the ViewModel is the creator of a MutableStateFlow, i dont need to use .stateIn() since that mutable stateFlow is already hot and will stay alive for the lifecycle of the ViewModel, regardless if theres no collectorsclass ExampleViewModel(externalIntFlow: Flow<Int>): ViewModel() {
data class State(val msg: String, val isLoading: Boolean = false)
private val _state = MutableSharedFlow<State>()
val state = externalIntFlow.map { action ->
when (action) {
1 -> State("One")
else -> State("?")
}
}.merge(_state)
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = State("Empty")
)
fun someAction() = viewModelScope.launch {
/*
Want to update last value of `state` but dont get benefit of state.getAndUpdate
since state is not mutable from stateIn(), but instead have to use external sharedFlow + state.value.copy()
*/
_state.emit(state.value.copy(isLoading = false))
delay(1000)
_state.emit(State("Success"))
}
}
Joffrey
06/01/2022, 4:48 PMstateIn
needs a scope - to start this coroutine behind the scenes. And it's also why you define a policy to keep this coroutine alive or cancel it. If you create a mutable state flow directly, you just created some sort of observable state variable. It doesn't need any coroutine on its own, because the state will be updated by other pieces of code (they may use coroutines themselves, but they don't need to because state flows have a state
property that can be updated without suspending).
B) truestateIn
on the external cold flows, you could just manually launch a coroutine that collects them and updates your state flow. This way you can also update your state flow using _state.value=x
in someAction()
(it will not be read-only)Eric Chee
06/01/2022, 5:12 PMclass ExampleViewModel(externalIntFlow: Flow<Int>): ViewModel() {
data class State(val msg: String, val isLoading: Boolean = false)
private val _state = MutableStateFlow(State("Empty"))
val state = _state.asStateFlow()
init {
viewModelScope.launch {
externalIntFlow.map { action ->
when (action) {
1 -> State("One")
else -> State("?")
}
}.also { _state.emitAll(it) }
}
}
fun someAction() = viewModelScope.launch {
_state.getAndUpdate { it.copy(isLoading = false) }
delay(1000)
_state.update { State("Success") }
}
}
Joffrey
06/01/2022, 5:14 PMephemient
06/01/2022, 5:54 PMprivate val _state = MutableStateFlow<Int>(0)
val state: StateFlow<Int>
get() = _state.asStateFlow()
Joffrey
06/01/2022, 5:56 PM