Felix
11/02/2021, 9:20 PMrunInterruptible
allows the cancellation of blocking operations that are sensitive to thread interrupts (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-interruptible.html). However in the java ecosystem there are several blocking operations that are not sensitive to interrupts, such as some sockets methods (IINM). In those cases, sometimes there is an alternative way to cancel the blocking operation, other than by interrupts. For instance, on sockets we can cancel a pending operation by closing the socket (from a different thread). In this context
1. Is there anything similar to runInterruptible
, where the cancelling action is not to send an interrupt but instead to perform a custom action, such as calling a close
on some object?
2. If there isn't, we could eventually try to build one. For that we probably need to register an invokeOnCompletion
that triggers on the cancelling
state and not only on the cancelled
state. This would allow us to perform an action when the coroutine transitions to cancelling
. There is an invokeOnCompletion
that allows that, however it is marked as @InternalCoroutinesApi
. Is there any alternative public and stable API to do that, i.e., trigger a callback when a coroutine transitions into the cancelling
state?
Thanks.rudolf.hladik
11/03/2021, 1:22 PM1.5.1-native-mt
and forces it by strictly
config. I use this library as api
gradle dependency in shared/commonMain
module. The problem is that I also use some dependencies in Android app that also use coroutines but not the 1.5.1-native-mt
branch.Endre Deak
11/03/2021, 7:11 PMsuspend fun callback() { ... }
suspend fun execute(request: Request): Response {
val response = ...
callback()
return response
}
Norbi
11/04/2021, 8:15 AMfun main() {
runBlocking {
suspendFun()
}
}
suspend fun suspendFun() {
normalFun()
}
fun normalFun() {
val runningInCoroutine = ???
check(!runningInCoroutine)
runBlocking {
...
}
}
Clament John
11/06/2021, 11:17 AMList<Message>
in the UI
2. messages = mutableStateListOf<Message>
in viewModel
3. messages depends on two actions getAllPaginated
or search
4. In both cases we observe SQLite, but only one flow should be active at a single instant (user is either viewing all or is searching)
5. Flow is used at SQLite because new messages can come from the network
Questions:
1. How to switch between getAll
and search
flow
2. When a new search happens the old search
flow should be cancelled and a new one should startGerard Klijs
11/07/2021, 10:14 AMStefan Oltmann
11/08/2021, 9:03 AM_Thread_._isMainThread_
which allows me to check that the current code is actually running in a Dispatchers.IO context for example?K Merle
11/08/2021, 11:59 AM.stateIn
extension?Clament John
11/08/2021, 12:14 PMcoroutine compiled as continuations
and more. It was a markdown
file in one of the main kotlin github repos (JetBrains/kotlin or Kotlin/kotlinx.coroutines).
Does anyone know where it is? I wanted to do some reading on how coroutines work and how they are compiledLilly
11/09/2021, 11:07 AMflow
operator for functions returning a single value. What's the purpose of doing this? Are there any benefits by doing this?Slackbot
11/09/2021, 2:08 PMMichael Clancy
11/09/2021, 3:22 PMAnsh Tyagi
11/09/2021, 5:58 PMmcpiroman
11/09/2021, 6:33 PMval foo = MutableStateFlow(10)
val baz = MutableStateFlow(10)
foo.collect { baz.value = it }
// Somewhere else
foo.value = 20
assertEquals(20, baz.value) // is this guaranteed? i.e. (how) can I expect value to be immediately updated?
Erik
11/10/2021, 9:18 AMOn Android,Where can I find that handler, as I'm interested to learn more about it? I can't find it on GitHub: https://github.com/search?l=Kotlin&q=uncaughtExceptionPreHandler&type=Code. I also cannot find it in the Android source: https://duckduckgo.com/?q=uncaughtExceptionPreHandler+site%3Ahttps%3A%2F%2Fandroid.googlesource.com.is installed as a global coroutine exception handler.uncaughtExceptionPreHandler
Clament John
11/10/2021, 10:21 AMmessages
from two flows (using flatMapLatest)
private val searchString = MutableStateFlow<String>("")
private val messages = searchString.flatMapLatest {
if (it == "") {
getAll()
} else {
search(it)
}
}.stateIn(viewModelScope, SharingStarted.Eagerly, null)
suspend fun requestSearch(search: String) = searchString.emit(search)
suspend fun requestAll() = searchString.emit("")
fun getAll(): Flow<List<Message>> = TODO()
fun search(search: String): Flow<List<Message>> = TODO()
But when I try emit
to update searchString
it doesn't work. Note: The initial condition of ""
was successfully executed.Olivier Patry
11/10/2021, 5:01 PMFragmentScenario
but I think it's something that could be discussed without considering such specific setup (let me know if it's not the case).
In a JUnit Android instrumentation test, I launch an Android fragment using FragmentScenario
, and my fragment impl uses view.doOnLayout{}
(would be the same with <http://view.post|view.post>{}
).
To make my test pass, I need the code being executed in such postponed code.
Here comes the link to coroutines:
My test is launched on the main thread, once fragment.onCreate
is called, I'd need to let "app logic" (here Android, but I guess it could be applicable to another use case) work and in particular get access to the main thread.
My idea was to suspend the execution of the test to give back the access to main thread to whoever needs it but I can't make it work.
@Test
fun testMyState() {
launchMyFragmentFragment(Lifecycle.State.RESUMED) {
mainCoroutineRule.runBlocking {
delay(100)
assertEquals(3, myState.count())
}
}
}
I would have expected that triggering a suspension using delay
would give back access to main thread to my "app logic" (here Fragment.view.doOnLayout
) but it doesn't work.
Is it, by chance, a use case some know how to address?Joseph Hawkes-Cates
11/10/2021, 7:39 PMClament John
11/11/2021, 7:36 AMMutableStateFlow<List<Item>>
does not trigger flow emission. Add / remove of an item triggers an emission.
I'm listening to this flow (cartItems) with a transform
which is never triggered when I modify the quantity of a product within the list. But the value does change in the list (Can confirm because if I click +
n times and then -
n times recomposition happens.
Question
How should I update an item MutableStateFlow<List<Item>>
such that emission
is triggered and transform
is triggered?Vivek Modi
11/11/2021, 10:21 AMsuspend fun fetchTwoDocs() =
coroutineScope {
val deferredOne = async { fetchDoc(1) }
val deferredTwo = async { fetchDoc(2) }
deferredOne.await()
deferredTwo.await()
}
and what the difference of
viewModelScope.launch { }
or
coroutineScope { }
Marko Novakovic
11/11/2021, 1:32 PMStateFlow
. I would like to observe data changes but my concern is collecting that StateFlow
. it never completes so how will it behave if I collect
it from ViewModel
? but using flow.value
does not give me anything over having a property. what am I missing here?martmists
11/11/2021, 4:59 PMUlrik Rasmussen
11/12/2021, 2:20 PMd
that I have obtained by calling .asCoroutineDispatcher()
on a single-threaded executor service. I want to launch long-running coroutines on this dispatcher from synchronous code. The obvious choice seems to be runBlocking(d) { launch { ... } }
, but that blocks the thread until the job launched by launch { }
completes. Can I submit a long-running job to d
from synchronous code without blocking the current thread?TwoClocks
11/12/2021, 6:00 PMnerses
11/12/2021, 10:52 PM.await()
vs .asDeffered().await()
.dimsuz
11/13/2021, 12:44 PMscan
operator:
Observable.just(1)
.doOnSubscribe { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.blockingSubscribe { println("got $it") }
println("===")
flowOf(1)
.onStart { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.collect { println("got $it") }
results and question in the thread →marcinmoskala
11/13/2021, 4:48 PMTestCoroutineDispatcher
is waiting for cancelled tasks.
@Test
fun test() = runBlockingTest {
val job = launch {
launch { delay(1000) }
launch { delay(1000) }
}
delay(100)
job.cancel()
// then
advanceUntilIdle()
assertEquals(100, currentTime) // Should be true, because after 100 everything is cancelled,
// but it says expected:<100> but was:<1000>
}
I've created an issue here.zsperske
11/15/2021, 2:05 AMlhwdev
11/15/2021, 6:09 AMcoroutineScope {}
will return when the provided suspend block and child coroutines end.
Instead, is there anything that returns immediately when the block returns, cancelling child coroutines? I thought there would be prebuilt builder.
This would be extremely useful on situations like:
coroutineScope { // this suspends until someFlow is finished; needs alternative one
val collected = someFlow.shareIn(this, ...)
work(collected)
}
Phil Richardson
11/15/2021, 12:36 PM<http://Dispatchers.IO|Dispatchers.IO>
, it does not fit into the use case in the slightest, as you still have a thread pool backing it
This is also a point at which coroutines start to confuse me some more.
Even if this IO dispatcher was single threaded, any suspending operations that happen can do just that, which could result in improper interleaving of two coroutine instances wanting to create an entry in this TarArchiveOutputStream
On the face, it would imply I should use runBlocking to ensure suspending operations don't suspect but get run to completion
But that still leaves the fact there can be more than one thread in the IO dispatcher don't this at the same time.
That then also implies I could create my own dedicated single thread executor service as a dispatcher
Then with any coroutine launched inside of it, use runBlocking here to complete the create entry, write stream close entry elements
But then this still tells me I am running afoul of the runBlocking rule that I should not use runBlocking from within a coroutine
So overall, are there any good examples of using something like the IO dispatcher for i/o on something that is not thread safe?Phil Richardson
11/15/2021, 12:36 PM<http://Dispatchers.IO|Dispatchers.IO>
, it does not fit into the use case in the slightest, as you still have a thread pool backing it
This is also a point at which coroutines start to confuse me some more.
Even if this IO dispatcher was single threaded, any suspending operations that happen can do just that, which could result in improper interleaving of two coroutine instances wanting to create an entry in this TarArchiveOutputStream
On the face, it would imply I should use runBlocking to ensure suspending operations don't suspect but get run to completion
But that still leaves the fact there can be more than one thread in the IO dispatcher don't this at the same time.
That then also implies I could create my own dedicated single thread executor service as a dispatcher
Then with any coroutine launched inside of it, use runBlocking here to complete the create entry, write stream close entry elements
But then this still tells me I am running afoul of the runBlocking rule that I should not use runBlocking from within a coroutine
So overall, are there any good examples of using something like the IO dispatcher for i/o on something that is not thread safe?Joffrey
11/15/2021, 12:48 PMTarArchiveOutputStream
is not thread safe isn't related to the fact that it's based on blocking IO.
any suspending operations that happen can do just thatDo what? Suspend? Yes, but if you're dealing with a blocking API, there is likely no suspend operation here. But in any case, I think what you're looking for is a coroutine Mutex. You can just do your operations under a mutex associated with your output stream (from any coroutine), and you'll be fine. You can also nest this under a
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
to solve the blocking-IO-in-coroutine problem (but these are really 2 different problems)val tarArchiveStream = TODO("whatever constructor or method gives you the tar archive stream")
val tarStreamMutex = Mutex()
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
tarStreamMutex.withLock {
// create an entry, add it to the stream, close the entry
}
}
TarArchiveOutputStream
that contains the mutex inside and provides atomic operations as methodsThat then also implies I could create my own dedicated single thread executor service as a dispatcher. Then with any coroutine launched inside of it, use runBlocking here to complete the create entry, write stream close entry elements. But then this still tells me I am running afoul of the runBlocking rule that I should not use runBlocking from within a coroutineIndeed, you should not use
runBlocking
inside a coroutine. It's true that on a single-threaded dispatcher, it would kind of have the effect of locking, but it's not supposed to be used as a synchronization primitive. Mutex is meant for synchronization, so you should rather use this.ephemient
11/15/2021, 6:35 PMJoffrey
11/15/2021, 6:38 PMuli
11/15/2021, 8:36 PMJoffrey
11/15/2021, 8:37 PMephemient
11/15/2021, 8:39 PMTarArchiveOutputStream
case, but it might for other APIs use their own recursive locks or thread local variables, for example), then you either need to switch to a single-threaded dispatcher or ensure that there are no suspend points within your "critical section" (can be guaranteed by writing it in a non-suspend fun, for example)