oday
11/15/2022, 12:32 PMprivate val categoryFilter = BrowseEventsFilter.CategoryFilter(category = MutableStateFlow(Category.All))
private val locationFilter = BrowseEventsFilter.LocationFilter(location = MutableStateFlow(null))
private val periodFilter = BrowseEventsFilter.PeriodFilter(
period = MutableStateFlow(Period.Any),
dateRange = MutableStateFlow(null)
)
and then you change them
var filtersState = MutableStateFlow(listOf(locationFilter, periodFilter, categoryFilter)) // collectAsState in the activity
// how they change
filterState?.enabled?.value = ChipStatus.ENABLED
filterState?.period?.value = period
filterState?.dateRange?.value = null
and then observe them
flowOf(periodFilter.period?.value,
periodFilter.dateRange?.value,
categoryFilter.category.value,
locationFilter.location.value
).collect {
// do something everytime their values change
}
czuckie
11/16/2022, 9:44 AMBrendan Weinstein
11/16/2022, 2:02 PM0 libsystem_kernel.dylib ___pthread_kill
1 libsystem_pthread.dylib _pthread_kill
2 libsystem_c.dylib _abort
3 BaseBetaiOS konan::abort() (BaseBetaiOS)
4 BaseBetaiOS (anonymous namespace)::terminateWithUnhandledException(ObjHeader*)::$_1::operator()() const (BaseBetaiOS)
5 BaseBetaiOS void (anonymous namespace)::$_0::operator()<(anonymous namespace)::terminateWithUnhandledException(ObjHeader*)::$_1>((anonymous namespace)::terminateWithUnhandledException(ObjHeader*)::$_1) (BaseBetaiOS)
6 BaseBetaiOS (anonymous namespace)::terminateWithUnhandledException(ObjHeader*) (BaseBetaiOS)
7 BaseBetaiOS (anonymous namespace)::processUnhandledException(ObjHeader*) (BaseBetaiOS)
8 BaseBetaiOS kfun:kotlinx.coroutines#handleCoroutineException(kotlin.coroutines.CoroutineContext;kotlin.Throwable){} (BaseBetaiOS)
9 BaseBetaiOS kfun:kotlinx.coroutines.StandaloneCoroutine.handleJobException#internal (BaseBetaiOS)
10 BaseBetaiOS kfun:kotlinx.coroutines.JobSupport.finalizeFinishingState#internal (BaseBetaiOS)
11 BaseBetaiOS kfun:kotlinx.coroutines.JobSupport.tryMakeCompleting#internal (BaseBetaiOS)
12 BaseBetaiOS kfun:kotlinx.coroutines.JobSupport#makeCompletingOnce(kotlin.Any?){}kotlin.Any? (BaseBetaiOS)
13 BaseBetaiOS kfun:kotlinx.coroutines.AbstractCoroutine#resumeWith(kotlin.Result<1:0>){} (BaseBetaiOS)
14 BaseBetaiOS kfun:kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(kotlin.Result<kotlin.Any?>){} (BaseBetaiOS)
15 BaseBetaiOS kfun:kotlinx.coroutines.DispatchedTask#run(){} (BaseBetaiOS)
16 BaseBetaiOS kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.$dispatch$lambda$0$FUNCTION_REFERENCE$693.$<bridge-UNN>invoke(){}#internal (BaseBetaiOS)
17 BaseBetaiOS ___6f72672e6a6574627261696e732e6b6f746c696e783a6b6f746c696e782d636f726f7574696e65732d636f7265_knbridge762_block_invoke (BaseBetaiOS)
18 libdispatch.dylib __dispatch_call_block_and_release
19 libdispatch.dylib __dispatch_client_callout
20 libdispatch.dylib __dispatch_root_queue_drain
21 libdispatch.dylib __dispatch_worker_thread2
22 libsystem_pthread.dylib __pthread_wqthread
Panos
11/16/2022, 7:09 PMExecutors.newFixedThreadPool(8)
to create a pool of threads that are meant to indefinitely process incoming SQS messages. It seems from my tests that tasks are submitted constantly to the pool without waiting for one or more of the 8 allocated threads to finish. I’m not using shutdown as I do not want to shutdown this pool but instead use it indefinitely but wait when it is full. Any ideas?escodro
11/16/2022, 7:47 PMfun addItem(item: String) {
viewModelScope.launch {
repository.saveDataInRoom(item)
interactor.sendNotification(item)
interactor2.updateWidget(item)
}
}
The “addItem” screen is a Bottom Sheet that is dismissed after the user adds an item.
Since the saveDataInRoom()
is a heavy operation, by the time it ends the execution the coroutine was already cancelled by passing on ViewModel#onCleared()
. Then no code after that is executed after that command.
Any suggestions on how to ensure that everything is executed?
Thanks!Anders Sveen
11/17/2022, 6:43 AM@Test
fun shouldExecuteOnDifferentThreads() {
runBlocking {
// Some blocking DB operation
val firstThread = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
Thread.currentThread()
}
// Another blocking DB operation
val secondThread = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
Thread.currentThread()
}
assertThat(firstThread).isNotSameAs(secondThread)
}
}
jean
11/17/2022, 10:19 PMsuspend fun loadData(
scope: CoroutineScope,
state: MutableStateFlow<ViewState>,
) {
state.update {
it.copy(
data1 = it.data1.copy(status = AsyncDataStatus.LOADING),
data2 = it.data2.copy(status = AsyncDataStatus.LOADING),
data3 = it.data3.copy(status = AsyncDataStatus.LOADING),
)
}
scope.launch { loadData1FromRepository(state) }
scope.launch { loadData2FromRepository(state) }
scope.launch { loadData3FromRepository(state) }
}
private suspend fun loadData1(
state: MutableStateFlow<ViewState>
) = loadData1FromRepository().collect { result -> // this is a flow of 2 either values coming from a database and network
state.update { it.copy(data1 = newAsyncValue(result)) }
}
when running tests, I get a time out exception
state.test {
val loading = awaitItem()
assertEquals(
expected = initial.copy(
data1 = initial.data1.copy(status = AsyncDataStatus.LOADING),
data2 = initial.data2.copy(status = AsyncDataStatus.LOADING),
data3 = initial.data3.copy(status = AsyncDataStatus.LOADING),
),
actual = loading,
) // this passes correctly
val data1Loaded = awaitItem()
assertEquals(
expected = loading.copy(
data1 = initial.data1.copy(
status = AsyncDataStatus.SUCCESS,
data = someData1,
),
),
actual = data1Loaded,
) // here I get the time out
What am I supposed to do inorder to make the coroutines complete and return a value before the timeout?TW
11/18/2022, 12:35 PMTran Thang
11/19/2022, 11:00 AMdephinera
11/21/2022, 6:04 AMnatario1
11/22/2022, 11:07 AMcurrentCoroutineContext()
? I imagine it doesn’t suspend despite being suspend
and should return immediately, just fetching some compiler generated field, no extra allocations. Is this right?dead.fish
11/22/2022, 12:39 PMsuspend
function that feeds a StateFlow
with one or two different values, in quick succession. How would I test this with Coroutines 1.6? Issue is that during the test I only see the second emitted value and the first is already gone. I tried different things using different dispatches, but nothing worked out unfortunately.Bruno G Tavares
11/22/2022, 6:11 PMtry
would we have to account for the coroutine or it would catches that?Jhonatan Sabadi
11/22/2022, 8:27 PMJeff Lockhart
11/23/2022, 1:30 AMkotlinx-coroutines-test
. Each test is run within a runTest()
coroutine. All the tests execute and pass on JVM. On iOS they usually pass as well, but often (~30% of the time) one of the tests hangs, blocking the test suite's completion. I've seen almost all the tests cause this, from the first to the last, so it's not caused by a specific test.
If I add a print statement to the bottom of each of the runTest()
calls, the print statement is always executed, so it's hanging after the test code completes, but runtTest()
apparently isn't returning for some reason.
Any idea what could be the cause of this? The code is based on this test suite from SQLDelight, but using multiplatform paging and a different database.Asad Mehmood
11/23/2022, 9:57 AMnative suspend fun
) in rust via JNI. I've managed to create a function which starts an async rust fn in another thread and immediately returns COROUTINE_SUSPENDED
to kotlin.
The question is: Does resumeWith
run in the native thread (Does is treat the native thread as a dispatcher) or dispatch back to the kotlin dispatcher? I'm using a single thread to call resumeWith as setting up and tearing down JNI Threads are expensive and I would like to know whether its cheap or if it could end up starving other calls to resumeWith? Should I be using a dynamic blocking threadpool to call resumeWith?
See for rust code example: https://github.com/mehmooda/suspendlib/blob/main/src/lib.rsLukas Lechner
11/23/2022, 3:07 PMhttps://youtu.be/TvdF566t2Y8▾
Lukas Lechner
11/24/2022, 11:17 AMsuspend fun main(): Unit = coroutineScope {
val sharedFlow = MutableSharedFlow<Int>()
launch {
sharedFlow
.collect {
println("Process $it")
delay(1000)
println("$it processed")
}
}
launch {
// 1 should be processed
sharedFlow.emit(1)
println("sharedFlow emits 1")
// 2 should not be processed since downstream is busy
sharedFlow.emit(2)
println("sharedFlow emits 2")
// 3 should be processed again
delay(2000)
sharedFlow.emit(3)
println("sharedFlow emits 3")
}
}
I tried BufferOverflow.DROP_LATEST, but there we need to have a buffer value > 0George
11/24/2022, 5:19 PMopen class WrapperFlow {
private val hotFlow = MutableSharedFlow<Int>()
fun getFlow(): Flow<Int> = hotFlow
@Transactional
open suspend fun emit() {
hotFlow.emit(10)
}
}
suspend fun main() = coroutineScope {
println("hi")
val wrapperFlow = WrapperFlow()
val job1 = launch {
wrapperFlow.getFlow().collect {
println("job1: $it")
}
}
wrapperFlow.emit() // does not print job1: $it
job1.cancelAndJoin()
}
Thanks in advance for any answers !Slackbot
11/27/2022, 12:10 AMStylianos Gakis
11/27/2022, 1:31 AMdimsuz
11/28/2022, 1:24 PMJob
's ?
fun allJobs(): Job {
val job1: Job
val job2: Job
val job3: Job
return combine(job1, job2, job3) // HOW?
}
fun main() {
allJobs().cancel() // cancels all 3 jobs
}
Is it possible or do I have to use some pattern which is similar to this?dead.fish
11/29/2022, 4:58 PMval flow: StateFlow<T>
to consumers, while individual implementations can fill in the initial state via some protected abstract suspend fun initialState(): T
. But I can’t get lazy initialization working, i.e. my backing MutableSharedFlow<T>(replay = 1)
needs suspending to be converted into a StateFlow
or needs the initial value being given again (repeated), which defeats the purpose, i.e. val flow: StateFlow<T> get() = backingFlow.onStart { emit(initialState()) }.stateIn(someScope)
does not work, because stateIn(someScope)
is suspending. What am I doing wrong?ursus
11/30/2022, 12:16 AMtry catch
which doesn’t catch CancellationException
? or should I not care to rethrow that?czuckie
11/30/2022, 12:52 AMval executor = Executors.singleThreadExecutor()
fun oneAtATime(workToDo: Boop) {
return executor.submit {
doSomethingWIthThe(workToDo)
}.get()
}
Where oneAtATime(...)
may be invoked by multiple threads, but will only allow one operation at a time due to the single thread executor.czuckie
11/30/2022, 1:37 PMChannel
for an amount of time before abandoning the operation? None of the receive*
functions seem to have what I want.ursus
12/01/2022, 11:57 PMdelay
? Atleast that it won't resume sooner than the deadline?ursus
12/02/2022, 2:48 AMbeginsAt: Timestamp
on it
I need to render a list of those meetings, and color red those which are say, 15 minutes before beginning.
Now, I’d best like to use a combineLatest, however that requires a “time” stream to combine with.
I could have a stream which emits every say seconds, but that feels like a waste
So my attempt, I observe the stream, and calculate the delta time from now and “15min before beginning”, and timer
to emit then.
Now, the question is, should I use this only as a “tickle” into the combineLatest, and let it’s combiner to reevaluate the “15min before beginning” condition in it
combine(meetings, timer, ::combiner)
Or,
should I treat timer
as the trusted, and then just set the meeting’s color directly, i.e. don’t reevaluate the condition?
--
I like the former, however it requires for the timer
not emit before the given deadline.
Is it to be trusted to not do so?Tower Guidev2
12/02/2022, 1:38 PMval uiState by viewModel.uiState.collectAsStateWithLifecycle()
Marc Knaup
12/02/2022, 5:45 PMTestCoroutineScheduler.advanceTimeBy()
and get a callback whenever the currentTime
changes in order to execute side-effects? Or at least query the next event time?Marc Knaup
12/02/2022, 5:45 PMTestCoroutineScheduler.advanceTimeBy()
and get a callback whenever the currentTime
changes in order to execute side-effects? Or at least query the next event time?Dmitry Khalanskiy [JB]
12/03/2022, 11:01 AMMarc Knaup
12/03/2022, 10:30 PMDmitry Khalanskiy [JB]
12/04/2022, 10:52 AMX
, Y
, and Z
) that need to have virtual time. Also, let's, as you propose, designate the time as seen by X
as the real time.
Y
is registered as the first thing to get a callback for a time change from X
, Z
is registered as the second thing.
• X
has a task scheduled in 10 seconds,
• Y
has a task scheduled in 9 seconds,
• Z
has a task scheduled in 2 seconds.
X
proceeds to grab another task from the queue and announces: "The virtual time is now 10". Y
urgently runs the task, and then, Z
starts running its task. We're left with an insane and completely unrealistic interleaving.
The Proper™ thing to do here would be to have a queue of tasks shared by all three things and advance the virtual time for all of them. Here, the knowledge of when the next task is planned would help you, that's a nice use case for that.
The Proper™ thing may be too difficult to do though, depending on the provided APIs of X
, Y
, and Z
, so maybe something like this could be the best way instead:
val timerResolution = 50.milliseconds
while (testNotFinished) {
X.advanceTimeBy(timerResolution)
Y.advanceTimeBy(timerResolution)
Z.advanceTimeBy(timerResolution)
}
Marc Knaup
12/04/2022, 12:16 PMfun advanceTimeBy(delay: Long) {
var remainingDelay = delay
while (true) {
val delayToNextEvent = listOfNotNull(X.delayToNextEvent, Y.delayToNextEvent, Z.delayToNextEvent)
.minOrNull()
?.takeIf { it <= remainingDelay }
?: break
X.advanceTimeBy(delayToNextEvent)
Y.advanceTimeBy(delayToNextEvent)
Z.advanceTimeBy(delayToNextEvent)
remainingDelay -= delayToNextEvent
}
}
Dmitry Khalanskiy [JB]
12/04/2022, 1:34 PMX.advanceTimeBy(remainingDelay)
and such at the end so the virtual time is properly updated, but other than that, yeah, I agree that this would also work… unless someone is adding new tasks to X
, Y
, and Z
in parallel to advanceTimeBy
. So, the tricky thing with this API is the existence of code that runs outside the test dispatchers.
I think this issue relates to https://github.com/Kotlin/kotlinx.coroutines/issues/3189, which asks for notifications about the scheduler being idle, but this could be extended to also providing the information about the next pending task when not idle. It seems that, if we decide how to treat the code in other threads concerning idleness, both problems can be solved.