natario1
04/29/2021, 11:32 AMnatario1
04/29/2021, 4:09 PMclass Worker {
fun doWork()
fun cancelWork()
}
My attempt below (untested) doesn't look very good, I wonder if there's any better way.
coroutineScope {
val job = launch {
while (isActive) {
delay(50)
}
}
job.invokeOnCompletion {
if (it is CancellationException) {
worker.cancelWork()
}
}
worker.doWork()
job.cancel()
}
ursus
04/30/2021, 12:50 AMclass AppInForeground {
val appInForeground: Observable<Boolean> by lazy {
Lifecycle.wrapAsObservable()
.distinctUntilChanged()
.publish()
.replay(1)
}
}
naive Flow implementation
class AppInForeground {
fun appInForeground(scope: CoroutineScope) {
if (_appInForeground == null) {
synchornized(this) {
if(_appInForeground == null) {
_appInForeground = Lifecycle.wrapAsFlow()
.distinctUntilChanged()
.shareIn(scope, SharingStarted.Lazily, 1)
}
}
}
return _appInForeground!!
}
}
just looks stupid, however the main issue is that callers should not determine the scope, right?
I think idiomatic way is this
class AppInForeground : HasLifecycle {
private val scope = SupervisorScope(..)
private val _appInForeground = MutableStateFlow(false)
val appInForeground: Flow<Boolean> get() = _appInForeground
override fun create() {
scope.launch {
Lifecycle.wrapAsFlow()
.distinctUntilChanged()
.collect {
_appInForeground.value = it
}
}
}
override fun destroy() {
scope.cancel()
}
}
Works, however its a bit less efficient, i.e. it subscribers when the AppInForeground class si instantiated, also whole lot more ceremony...
Is there a way more close to the rx way? or should I just deal with it?Cody Mikol
04/30/2021, 2:08 PMmkrussel
04/30/2021, 3:01 PMFlow
where a lot of updates can happen at once and I want the collector to only process the last one.
The producer and the consumer are both on the UI thread. The desired behavior is when the producer gets out of their loop and goes back to waiting for some event, then the consumer gets the latest state.
What I have now is a callbackFlow
that is conflated
.
val flow = callbackFlow<Any> {
event.register { offer(someValue) }
awaitClose { event.unregister() }
}.conflate()
val job = scope.launch() {
flow.collect { }
The scope is using Android's MainScope
. And the event is sending events on the main thread.
When I was using a MutableStateFlow
this behaves as I wanted, but with the conflated callbackFlow
the caller is getting the first and last event, once all the events have been offerred.T
05/01/2021, 12:09 PMval loginSequence = SharedMethod<String>()
suspend fun request() {
try {
loginSequence
.awaitIfNeeded()
return doRequest(...)
} catch(e: LoginRequiredException) {
return loginSequence.awaitOrBlock(
onBlockReturn = {
return@awaitOrBlock executeLoginSequence(e)
},
onAwaitReturn = {
return@awaitOrBlock doRequest(
...
)
})
}
}
with the implementation of this class with mutexes:
class SharedMethod<T> {
private val mutex: Mutex = Mutex()
suspend fun awaitOrBlock(
onBlockReturn: suspend () -> T,
onAwaitReturn: suspend () -> T
): T = coroutineScope {
val isLocked = mutex.tryLock()
if (!isLocked) {
awaitIfNeeded()
return@coroutineScope onAwaitReturn()
}
try {
return@coroutineScope onBlockReturn()
} finally {
mutex.unlock()
}
}
suspend fun awaitIfNeeded() {
while (mutex.isLocked) {
delay(1000)
}
}
}
Is there a better way to implement this behaviour? Especially the "while (mutex.isLocked) {...}" seems pretty ugly.
Is this the right place for using mutexes? Happy to hear your thoughts. :)Marcin Wisniowski
05/02/2021, 3:52 PMBaseContinuationImpl.resumeWith
and the rest of coroutines machinery, not the "actual" calling method.Andrew Gazelka
05/02/2021, 8:57 PM// a method ...
val (sharedFlow, job) = flow.shareIn(scope, SharingStarted.Eagerly)
jobs.add(job)
return sharedFlow
Brian Dilley
05/02/2021, 9:23 PMDeferred<X>
- what’s the best way to retrieve their results as they come in - and at any point bail after receiving a result that you’re looking for while canceling the others?ursus
05/03/2021, 2:24 AMprivate fun <T> Flow<T>.onStartDebounced(initialValue: T, debounceMillis: Long): Flow<T> {
return onStart { emit(initialValue) }
.withIndex()
.debounce { if (it.index == 0) debounceMillis else 0L }
.map { it.value }
}
Works, but im a Rx guy, is there something more idiomatic / performant (so I can actually learn something)?Mark
05/03/2021, 3:53 AMCompletableDeferred
or something else?
// from this
val someProperty: Int by lazy {
// refactoring to perform some suspendable logic
}
// to this
suspend fun someProperty(): Int {
// only should be executed max once (and then cached)
}
julioromano
05/03/2021, 1:36 PMCoroutineScope
?
As if the CoroutineScope
were a message queue and Jobs were messages in it: Would there be a way to track if the CoroutineScope
is currently handling Jobs or is sitting idle?
What I’m trying to accomplish: I’d like to build an idle/busy signal (possibly as a Flow<Boolean>
) that will tell me whether a CoroutineScope
is currently running any coroutines or not.Erik
05/03/2021, 1:50 PMMutableSharedFlow()
by default has no buffer (replay = 0
, extraBufferCapacity = 0
, i.e. buffer = replay + extraBufferCapacity = 0
). Does that mean that if I have many emitters that call emit
, that they would suspend (in order) until the subscribers of the shared flow process the emissions one by one? So, in other words: has the buffer moved away from the shared flow, that intrinsically has no buffer space, to the coroutine scope(s) that now contain various suspended child coroutines trying to emit
?Chris Grigg
05/03/2021, 2:59 PMlaunch
from your existing context and then move it elsewhere using withContext
instead of calling launch
and specifying dispatcher. scope.launch { withContext(<http://Dispatchers.IO|Dispatchers.IO>) { work() } }
instead of scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) { work() }
. I remember this being described as preferential since there was some performance penalty when launching in a different context that did not occur when moving a running coroutine using withContext
. I’m trying to find guidance on this again and I can’t find anything about it. Did I misunderstand something when I was first learning?Erik
05/03/2021, 3:15 PMMutableSharedFlow<Int>().emit(0)
(from a suspending function), then the emit
call does not suspend. The documentation says that on buffer overflow the call should suspend. To me it seems that there is no buffer, so emit(0)
causes a buffer overflow, so the emit
call should suspend. Instead, it returns quickly! Am I misinterpreting the following?
suspending on buffer overflowOr can the documentation be improved?
Kshitij Patil
05/03/2021, 3:32 PMMutableSharedFlow
with replay=0
? I'm using turbine for testing Kotlin Flows and I managed to write tests for MutableStateFlow
, but the same emitted values doesn't work for SharedFlows.ermac10k
05/06/2021, 11:59 AMPablo
05/06/2021, 2:08 PMscope.launch{
var foo = true
while(foo){
delay(1_000)
//do some actions here
//with these actions I check if the time is correct and if goes inside the if i put the foo to false so it leaves the while
if(whatever) {
foo=false
//call one method here
}
}
}
Is this a correct way? Is there a better way with flow?Pablo
05/06/2021, 2:31 PMval timer = (0..MyInterval)
.asSequence()
.asFlow()
.onEach { delay(1_000) }
timer.collect { //do something }
But how do I stop the timer as I do with if(whatever) foo = false?Rob
05/06/2021, 6:15 PMBen
05/06/2021, 7:38 PMprivate val searchQueryStream = MutableStateFlow<String>("")
private val refreshChannel = Channel<Unit>(Channel.CONFLATED).apply { offer(Unit) }
val resultStream: Flow<PagingData<TenorGifTileData>> =
searchQueryStream
.combine(refreshChannel.receiveAsFlow()) { queryStream, _ -> queryStream }
.filter { searchQuery ->
searchQuery.query.isNotEmpty()
}
.flatMapLatest { searchQuery ->
repository.resultsForSearchQuery(searchQuery)
}
fun refresh() {
if (!refreshChannel.isClosedForSend) {
refreshChannel.offer(Unit)
}
}
ursus
05/07/2021, 11:50 AMmbonnin
05/07/2021, 3:07 PMMutableSharedFlow
before emitting something ?
val mutableFlow = MutableSharedFlow<Int>()
val sharedFlow = mutableFlow.asSharedFlow()
// start listening to the sharedFlow
launch {
sharedFlow
.collect {
// by the time we start collecting, the first item is gone already
// and nothing is received
println("$it")
}
}
// emit something
mutableFlow.emit(1)
Adding a delay before emitting works but doesn't sound great and I don't see a way to enforce a before-after relationship thereArcticLampyrid
05/08/2021, 8:48 AMtheapache64
05/09/2021, 9:30 AMStateFlow
vs asStateFlow
? 🧵Marian Schubert
05/09/2021, 9:51 AMYan Pujante
05/10/2021, 1:19 PMclass JobsMgr {
private val _jobsToProcess = Channel<Job>(Channel.UNLIMITED)
private val _jobsProcessed: Flow<Job> = _jobsToProcess.consumeAsFlow().map { job -> /* processing */ }.flowOn(Dispatchers.Default)
init {
// I find this very ugly.. is there a better way?
Executors.newSingleThreadExecutor().execute {
runBlocking {
_jobsProcessed.collect {
delay(10 * 1000)
_lock.withLock { _jobs.remove(it.jobId) }
}
}
}
}
fun enqueueJob(...) {
//...
runBlocking {
_jobsToProcess.send(job)
}
}
}
It is running in a spring boot application so I do not have control over the main loop. This is why I created a process to collect and remove the completed jobs. Is this how Channel/Flow is supposed to be used? And is there a better way to "start" the collection. That seems like ugly code to me but not sure how to do better.carbaj0
05/11/2021, 4:42 PMsuspendCancellableCoroutine <Nothing> {}
needed ? For me is just an empty call
class ProduceStateScopeImpl<T>(
state: MutableState<T>,
override val coroutineContext: CoroutineContext
) : ProduceStateScope<T>, MutableState<T> by state {
override suspend fun awaitDispose(onDispose: () -> Unit): Nothing {
try {
suspendCancellableCoroutine<Nothing> { }
} finally {
onDispose()
}
}
}
frankelot
05/12/2021, 1:55 PMtaer
05/12/2021, 2:44 PMtaer
05/12/2021, 2:44 PMsuspend fun foo(){
flow { emit(1) }
.map { it*2 }
.collect { println(it) }
}
Casey Brooks
05/12/2021, 3:14 PMhttps://www.youtube.com/watch?v=tYcqn48SMT8▾
taer
05/12/2021, 3:26 PM