Stefan Oltmann
02/10/2022, 1:36 PMcancel()
throw a JobCancellationException
to the calling thread?
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2444":StandaloneCoroutine{Cancelling}@6b6879d8
at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1605)
at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:189)
Marcelo Hernandez
02/10/2022, 9:36 PM1.5.2
to use a 3rd party library that is built with Coroutines 1.6.0
. I see in the changelog that there are some breaking changes which is why I ask.Matthias Geisler
02/10/2022, 10:08 PMfun randomFunction() {
val channel = Channel<String>()
val lambda = suspend {
channel.send("abc")
}
runBlocking(randomContext) {
lambda()
println(channel.receive())
}
}
Ahmad Dudayef
02/12/2022, 1:46 AMShreyas Patil
02/12/2022, 8:34 AMinline fun <T, R> Flow<T>.concurrentMap(
concurrency: Int,
@BuilderInference crossinline block: suspend (T) -> R
) = flow<R> {
val scope = CoroutineScope(Executors.newFixedThreadPool(concurrency).asCoroutineDispatcher())
val deferredResults = mutableListOf<Deferred<R>>()
collect { value ->
deferredResults.add(scope.async { block(value) })
}
deferredResults.awaitAll().forEach { newValue -> emit(newValue) }
scope.cancel()
}
martmists
02/13/2022, 3:01 PMTristan
02/13/2022, 11:37 PMclass UserRepository(
private val userDataSource: UserDataSource,
private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) {
private val userMutex = Mutex()
private var user: User? = null
suspend fun fetch(userId: String): User? {
println("Fetch user")
println("context = " + coroutineContext) // prints Main
if (user === null) {
val result = withContext(coroutineScope.coroutineContext) {
println("Switch context")
println("context = " + kotlin.coroutines.coroutineContext) // prints Dispatched
adsSdkConfigurationDataSource.getUser(userId)
}
publisherConfigurationMutex.withLock {
println("Mutation")
println("context = " + coroutineContext) // prints Main
publisherConfigurationApiModel = result
}
}
return publisherConfigurationMutex.withLock {
publisherConfigurationApiModel
}
}
}
My logs look like
Instantiate UserRepository
context = [StandaloneCoroutine{Active}@2272df8, MainDispatcher]
Is UserRepository frozen? false
Fetch user
context = [StandaloneCoroutine{Active}@2272df8, MainDispatcher]
Switch context
context = [DispatchedCoroutine{Active}@3921848, WorkerCoroutineDispatcherImpl@2225538]
Mutation
context = [StandaloneCoroutine{Active}@2272df8, MainDispatcher]
Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen com.me.UserRepository@227a228
Why after coming back to the context that created the UserRepository
I cannot perform a mutation?Can Korkmaz
02/14/2022, 8:50 AMjuliocbcotta
02/14/2022, 10:21 AMoverride fun sumSelectedFees(): Single<Float> = rxSingle(workDispatcher) {
dao.findDeliveryModeShippingOptions(true)
.fold(0f) { total, shippingOption -> total + shippingOption.feeValue }
}
and I trying to call this in my test
@Test
fun `should sum all inserted fees for delivery`() = runTest {
val dataSource = dataSource = DeliveryModesDataSourceImpl(dao, UnconfinedTestDispatcher())
dataSource.sumSelectedFees()
.test()
.assertValue(0f)
}
but my test is failing since the test is not waiting the rx call .test()
to complete.. I Used UnconfinedTestDispatcher
here, but I am not sure what I dispatcher I should be using in this case.kschlesselmann
02/14/2022, 11:36 AMFlux.just(…).flatMap { someAsnyStuff(it) }.subscribe()
would be with a Flow? It seems that myFlow.flatMapMerge { someSuspendingStuff(it) }.collect()
does not result in the same concurrency/throughput 😕Sean Proctor
02/14/2022, 2:17 PMflow { getSubscriptionFlow().collect { emit(it) } }
would work, but it looks weird. Is there a simpler way to write that?Lost Illusion
02/14/2022, 9:51 PMincoming.filterIsInstance<WampRawTransportPacket.Handshake>().first()
however, I also want to have a timeout around it as to not hang forever if the remote server does something weird. So I used the withTimeoutOrNull function, but I encounter some odd behavior.
withTimeoutOrNull(connectionConfig.handshakeTimeout) {
incoming.filterIsInstance<WampRawTransportPacket.Handshake>().first()
} ?: throw WampTimeout(
remoteAddress,
"Timed out waiting ${connectionConfig.handshakeTimeout}ms for a handshake back!"
)
It will always time out after 5 seconds (that is what handshakeTimeout is currently configured at). If I take it out of the timeout, it works flawlessly and returns nearly instantly. Is there some side effect of the coroutine that timeout provides that I don't know about?expensivebelly
02/15/2022, 8:25 AMCallable
in a coroutine?kschlesselmann
02/15/2022, 2:23 PMand/or
.flowOn`would result in parallelism so in my mind it'd take 300ms for simple
to complete and then again 300 ms to process the last element of simple
=> ~600ms to complete everything.Anshulupadhyay03
02/15/2022, 3:18 PMsuspend fun flowsWithCombine() {
val myFlow = flowOf("some value")
val lettersFlow = flowOf("A", "D1", "C", "E", "F", "G", "H").onEach { delay(500) }
myFlow.map { value->
val newValue = lettersFlow.first()
println("before $newValue")
if(newValue == "D1") {
return@map "D1"
}else {
delay(1000)
val newValue = lettersFlow.first() // here i still get A only though i expect newValue as D1
println("after $newValue")
if(newValue == "D1") return@map "D1" else return@map value
}
}.collect {
println(it) // This should print D1
}
}
Can Korkmaz
02/16/2022, 4:29 PMLaunchedEffect(key1 = true) { sharedViewModel.getAllTasks() }
val allTasks by sharedViewModel.allTasks.collectAsState()
This is from Stevdza's course. How can he be sure that getAllTasks() will have completed (fetched all tasks from room) before the second line.Mikhail Buzuverov
02/17/2022, 4:22 AMimport kotlinx.coroutines.*
fun main() = runBlocking {
val average = GlobalScope.calculateAverageAsync(emptyList())
try {
println(average.await())
} catch (ex: ArithmeticException) {
println("Calculation error")
}
}
fun CoroutineScope.calculateAverageAsync(list: List<Int>): Deferred<Int> = async {
list.sum() / list.size
}
And it prints (as expected):
Calculation error
Process finished with exit code 0
But if I call calculateAverageAsync
in runBlocking's scope
:
import kotlinx.coroutines.*
fun main() = runBlocking {
val average = this.calculateAverageAsync(emptyList())
try {
println(average.await())
} catch (ex: ArithmeticException) {
println("Calculation error")
}
}
fun CoroutineScope.calculateAverageAsync(list: List<Int>): Deferred<Int> = async {
list.sum() / list.size
}
I got another result:
Calculation error
Exception in thread "main" java.lang.ArithmeticException: / by zero
at MainKt$calculateAverageAsync$1.invokeSuspend(main.kt:14)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:279)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at MainKt.main(main.kt:3)
at MainKt.main(main.kt)
Process finished with exit code 1
As I can see the exception handler worked and printed "Calculation error". So exception was caught. But as different from previous case runBlocking
was cancelled and main thread got the same ArithmeticException
.
I don't understand how to handle exception in case of structured concurrency. Nothing I tried works - exception cancels jobs and reaches runBlocking
's outer codeExerosis
02/17/2022, 6:29 AMclass Test<Value> {
val listeners = ArrayList<suspend (Value) -> (Unit)>()
val queue = ArrayDeque<() -> (Unit)>()
suspend fun fire(value: Value) = suspendCoroutine<Unit> { continuation ->
synchronized(queue) {
if (queue.isEmpty()) {
for (listener in listeners)
listener(value)
continuation.resume(Unit)
while (queue.isNotEmpty())
queue.removeLast()()
} else {
queue.add {
for (listener in listeners)
listener(value)
continuation.resume(Unit)
}
COROUTINE_SUSPENDED
}
}
}
fun listen(listener: suspend (Value) -> (Unit)) {
listeners += listener
}
}
If I have something along these lines, I know this isn't entirely valid... but the idea is that you fire an event and all the listeners should be fired with the given value before any of them are fired with a new value from elsewhere. Would it be possible to detect when a listener fires "recursively" for example:
test.listen { if (it <= 0) test.fire(50) }
And then instead of deadlocking, simply leave that listener "partially executed" and move to the next one until all of the listeners have had a chance to see the first value... then fire this recursively changed value out... then finish executing the partially executed listener... and finally if any other changes were pushed out concurrently elsewhere they would be fired.
Ik this is a confusing question, but I figured the only way I could get close to this functionallity was by using continuatons.charleskorn
02/19/2022, 4:21 AM<http://Dispatchers.IO|Dispatchers.IO>
for this, but that seems to only exist on the JVM. Is there something else I should use for Kotlin/Native?Ruben Quadros
02/19/2022, 8:11 AMCoroutineScope
as a dependency which every module can access then we can launch different coroutines using this one scope? Each child coroutine can then have its own CoroutineExceptionHandler
as well. Is my understanding correct?janvladimirmostert
02/19/2022, 11:11 AMflow.take(1).collect {
println(it)
}
flow.take(4).collect {
println(it)
}
This is for parsing a byte flow that I'm reading from a socket, so in the case of take(4), it would be nice to do fold(4) that then spits out an Int
Just not sure what options there are to only read a limited number of bytes and then let the next flow operation continueAbhi
02/21/2022, 7:10 AMinternal interface ExclusiveObject {
val mutex: Mutex
}
internal suspend inline fun <R> ExclusiveObject.lockByCoroutineJob(block: () -> R): R {
return if(mutex.holdsLock(coroutineContext.job)) block()
else mutex.withLock(coroutineContext.job) {
block()
}
}
Is this safe? Each Co-routine checks if the mutex is held by itself, identified by the co-routine job, if yes, it executes the block, else, it executes the block, with lock.
what are the pitfalls? Will this scale when the pressure is high on acquiring the mutex?Jared Rieger
02/21/2022, 11:53 AMmap
function in kotlin. If I contrast this to Javas parallel
stream, the java stream is far faster than the standard kotlin map
function.
suspend fun <I, O> Iterable<I>.pMap(f: suspend (I) -> O): List<O> = coroutineScope {
map {
async { f(it) }
}.awaitAll()
}
val aP = (1..1000000).toList()
@OptIn(ExperimentalTime::class)
suspend fun main(args: Array<String>) {
val timeP = measureTime {
aP.parallelStream().map { it.plus(6) }
aP.parallelStream().filter { it == 800000 }
}
println(timeP)
val time = measureTime {
aP.map { it.plus(6) }
aP.filter { it == 800000 }
}
println(time)
}
What’s going on here? my expectation would have been that the pMap
function to be on par with java’s parallelStream
.mboudraa
02/21/2022, 3:15 PMStore
which is essentially a wrapper on top of 2 flows. From this store I can dispatch events and those events will trigger coroutines to be executed on a given scope.
However when I cancel the scope, the coroutines aren't cancelled...
suspend fun dispatch(action: Action) {
val currentState = stateFlow.value
_actionFlow.tryEmit(action)
supervisorScope {
sideEffects.forEach { sideEffect ->
launch {
sideEffect(currentState, action)?.let { dispatch(it) }
}
}
}
}
Here's how my test is written
@Test
fun should_cancel_side_effect() = runTest {
launch {
val store = createStore(this) {
registerSideEffect sideEffect { _, action ->
if (action !is TestAction.Add) return@sideEffect null
delay(1_000)
return@sideEffect TestAction.Remove(3)
}
}
store.stateFlow.test {
CoroutineScope(Job()).launch childScope@{
store.dispatch(TestAction.Add(3))
cancel()
}.join()
assertEquals(TestCounterState(0), awaitItem())
this@launch.cancel()
}
}.join()
}
But the side effect returns the TestAction.Remove(3)
like I never called cancel()
on the scope that launched the coroutineharry.singh
02/21/2022, 8:46 PMcoroutineScope
by launching a coroutine inside coroutineScope
and then cancelling it. I was expecting the child coroutine to cancel but seems it continue to run even after being cancelledGabriel Melo
02/22/2022, 5:26 PM.map
operator or something similar to a StateFlow
and get another StateFlow
back?eneim
02/23/2022, 1:02 AMK Merle
02/23/2022, 7:53 AMzokipirlo
02/23/2022, 9:52 AMsupervisorScope{
list.map {
async { doSomeApiCall(it) }
}.awaitAll()
}
2.
supervisorScope {
list.map {
async { doSomeApiCall(it) }
}
}.awaitAll()
George
02/24/2022, 2:23 PMoverride suspend fun acceptCode(code: String, account: SolidAccount): Unit = coroutineScope {
val codeVerifierDeferred = async { sessionStorage.retrieveCodeVerifier(account) }
val urlDeferred = async { sessionStorage.retrieveUserUrl(account) }
authRequest(codeVerifierDeferred, urlDeferred, code, account)
}
override suspend fun acceptCode(code: String, account: SolidAccount): Unit = coroutineScope {
val codeVerifierDeferred = async { sessionStorage.retrieveCodeVerifier(account) }
val urlDeferred = async { sessionStorage.retrieveUserUrl(account) }
authRequest(codeVerifierDeferred.await(), urlDeferred.await(), code, account)
}
Is there a real difference between these two? Thanks in advance !George
02/24/2022, 2:23 PMoverride suspend fun acceptCode(code: String, account: SolidAccount): Unit = coroutineScope {
val codeVerifierDeferred = async { sessionStorage.retrieveCodeVerifier(account) }
val urlDeferred = async { sessionStorage.retrieveUserUrl(account) }
authRequest(codeVerifierDeferred, urlDeferred, code, account)
}
override suspend fun acceptCode(code: String, account: SolidAccount): Unit = coroutineScope {
val codeVerifierDeferred = async { sessionStorage.retrieveCodeVerifier(account) }
val urlDeferred = async { sessionStorage.retrieveUserUrl(account) }
authRequest(codeVerifierDeferred.await(), urlDeferred.await(), code, account)
}
Is there a real difference between these two? Thanks in advance !Matthew Gast
02/24/2022, 2:41 PMauthRequest
, the second block will call authRequest after both sessionStorage.retrieveCodeVerifier
and sessionStorage.retrieveUserUrl
have completed execution. If you are familiar with Java, Future
is an analogue to Deferred
(noting that Future.get
will block while Deferred.async
suspends).George
02/24/2022, 2:45 PM<http://webClient.post|webClient.post>().apply {
val codeVerifier = codeVerifierDeferred.await()
bodyValue(TokenRequestBody(code, codeVerifier).asUrlBody())
val opUrl = urlDeferred.await()
val dpopHeader = generateDPoPHeader(<http://HttpMethod.POST|HttpMethod.POST>, OpUrl(opUrl))
header("DPoP", dpopHeader)
header("content-type", "application/x-www-form-urlencoded")
uri(opUrl.toTokenEndpoint()).exchangeToFlow {
flowOf(it)
}.collect {
My guess is that it would make a difference if was really awaiting much later in the fun?Matthew Gast
02/24/2022, 2:51 PMGeorge
02/24/2022, 3:29 PM