AG
08/23/2020, 8:57 AMIllegalStateException Flow invariant is violated emission from another coroutine is detected
when running this code, is it possible to fix this without using channelFlow ?
flow<Action> {
val result = downloadFile(url, directory) { event: ProgressEvent ->
emit(DownloadProgress(event))
}
emit(DownloadFinished("path"))
}.onEach { L.d(it) }.launchIn(viewLifecycleOwner.lifecycleScope)
downloadFile method
suspend fun downloadFile(
url: String, fileDir: String, onProgressEvent: suspend (event: ProgressEvent) -> Unit = {}) {
val progressChannel = Channel<ProgressEvent>()
CoroutineScope(coroutineContext).launch {
progressChannel.consumeAsFlow().collect { onProgressEvent.invoke(it) }
}
/* rest of the code */
}
Carrascado
08/23/2020, 3:34 PM(1..3).asFlow().onEach { number -> println("$number") }.collect()
And
(1..3).asFlow().collect { number -> println("$number") }
Why would I want to use onEach
when I can use collect
directly?Carrascado
08/23/2020, 4:06 PMlaunch {}
continues being executed instantly (although because structured concurrency, coroutine will finally wait for its children to finish).
• async: Creates a new coroutine. Code below async {}
continues being executed instantly (although because structured concurrency, coroutine will finally wait for its children to finish).
• runBlocking: Creates a new coroutine. Execution is blocked. Code below runBlocking {}
won't execute until runBlocking has ended.
• coroutineScope: Creates a new coroutine. Execution is suspended (it's the suspending version of runBlocking). Code below coroutineScope {}
won't execute until coroutineScope has ended.
Now, if this is correct, the only coroutine builder whose name makes sense is runBlocking 🤷♂️ The other ones seem to have absolutely random names. The 4 coroutine builders "launch" the coroutine, two of them are "asynchronous" in some sense (both launch and async), and 4 of 4 again have a "coroutine scope", what am I not understanding here? Why do they have these names?Gopal S Akshintala
08/24/2020, 11:55 AMdelay()
)uhe
08/24/2020, 2:27 PMResult
as a return type and ran into a weird issue. Does anybody know if this is already known? I couldn't find anything in the issue tracker.
suspend fun foo() = suspendCancellableCoroutine<Unit> {
thread {
it.resumeWithException(RuntimeException("foo"))
}
}
suspend fun fooWithResult(): Result<Unit> = runCatching { foo() }
@Test
fun resultTest() = runBlocking {
// when fooWithResult is not inlined the exception is not caught by "runCatching"
val result = fooWithResult()
Assertions.assertTrue { result.isFailure }
delay(5000)
Unit
}
The test passes if I mark fooWithResult
as inline. Without it the exception is not caught by runCatching
ryan413
08/24/2020, 3:08 PMfun returnFuture(): CompleteableFuture<String> {
return runBlocking {
future {
// blocking code
"result"
}
}
}
or
fun returnFuture(): CompleteableFuture<String> {
return GlobalScope.future {
// blocking code
"result"
}
}
}
We have coroutine code further up the chain that then calls GraphQL-java, which in turn is wired with DataFetchers that implement an interface returning a CompleteableFuture. Ideally we want to share the same coroutine context throughout (so we can leverage MDC via MDCContext
, etc.)jean
08/24/2020, 6:20 PMthis.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
class CoroutineTestRule : TestWatcher(), TestCoroutineScope by TestCoroutineScope() {
override fun starting(description: Description?) {
super.starting(description)
Dispatchers.setMain(this.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher)
}
override fun finished(description: Description?) {
super.finished(description)
Dispatchers.resetMain()
}
}
What does it do exactly?
While debugging, I see that this.coroutineContext[ContinuationInterceptor]
returns a TestCoroutineDispatcher
, is that because the rule implement TestCoroutineScope which has a TestCoroutineDispatcher as default dispatcher?camdenorrb
08/24/2020, 7:03 PMByteBuffer.allocateDirect(bufferSize)
suspending, would you need to make a threadpool and do callbacks to unsuspend a coroutine or is there a better way?Andrea Giuliano
08/25/2020, 7:28 AMLuis Munoz
08/26/2020, 3:27 AMLukas Lechner
08/26/2020, 7:48 AMCancellationExceptions
. If we would catch CancellationExceptions
, the Coroutine would continue to run until the next suspension point instead of cancel itself. Is there some convenience function in the stdlib that does this? Or have you written your own one?Simon Lin
08/26/2020, 7:56 AMAndrea Giuliano
08/26/2020, 7:57 AMsuspend fun myFun() = coroutineScope {}
I wonder, given that a suspending function must be called within a scope, what’s the point of creating a new scope like in there?rrva
08/26/2020, 11:49 AMAndrea Giuliano
08/26/2020, 2:58 PMsuspend fun doSomething() = coroutineScope {
val channel: Channel<Int> = Channel(capacity)
val results = aggregator.aggregate(channel)
channel.close()
results
}
where aggregator
is a class that exposes a suspending function aggregate
that uses the channel to listen to messages with channel.receive()
Now, would channel.close()
be ever invoked before the aggregator finishes to receive all the messages? Or since I’m not creating a new scope (by using launch or async for example) guarantees that each line of my snippet is executed one after the other finishes? And if that’s the case what’s the purpose of marking a function like aggregate()
with a suspend modifier (although it suspends since it will wait for mesages in the channel)?Carrascado
08/26/2020, 6:43 PMproduce
? I don't want to avoid produce, it's just for curiosityrrva
08/26/2020, 9:06 PMimport kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withTimeout
private val concurrentRequests = Semaphore(50)
suspend fun foo() {
withTimeout(10) {
concurrentRequests.acquire()
}
try {
doStuff()
} finally {
concurrentRequests.release()
}
}
Fernando Branbila Cunha Junior
08/26/2020, 11:07 PMAndrea Giuliano
08/27/2020, 10:14 AMinterface MyGuy {
suspend fun CoroutineScope.doSomething()
}
or is there a better way when it comes to interface declaration?Fabio
08/28/2020, 1:47 AMLukas Lechner
08/28/2020, 8:53 AMbsimmons
08/28/2020, 11:56 AMDeferred<T>
s and reuse them? Or is this a coroutine anti-pattern? I'm pretty new to coroutines so I'm just wonder if there is a better way.
private val notesDeferred = HashMap<String, Deferred<Resource<String>>>()
suspend fun getNotes(id: String): Resource<String>{
if(notesDeferred.containsKey(id) && notesDeferred.get(id)!!.isActive){
return notesDeferred.get(id)!!.await()
} else {
val newRequest = GlobalScope.async {
remoteSource.getNotes(id)
}
notesDeferred.put(id, newRequest)
return newRequest.await()
}
}
Andrea Giuliano
08/28/2020, 4:59 PM@Test
fun testingScope() = runBlocking {
myfun1()
}
suspend fun myfun1() = coroutineScope {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
launch(handler) {
throw Exception()
}
}
the test is failing because the exception gets bubbled up to the runblocking although I’m using an exception handler. Doing some debugging I’ve seen that the handler is set in the right child context, but still never called. Does anybody know why?pambrose
08/28/2020, 6:48 PM| +--- io.ktor:ktor-client-core-jvm:1.4.0
| | +--- org.jetbrains.kotlin:kotlin-stdlib:1.4.0 (*)
| | +--- org.slf4j:slf4j-api:1.7.30
| | +--- org.jetbrains.kotlin:kotlin-stdlib-common:1.4.0
| | +--- org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.8-native-mt-1.4.0-rc -> 1.3.9 (*)
as well as ktor-network:
| | +--- io.ktor:ktor-network:1.4.0
| | | \--- io.ktor:ktor-network-jvm:1.4.0
| | | +--- org.jetbrains.kotlin:kotlin-stdlib:1.4.0 (*)
| | | +--- org.slf4j:slf4j-api:1.7.30
| | | +--- org.jetbrains.kotlin:kotlin-stdlib-common:1.4.0
| | | +--- org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.8-native-mt-1.4.0-rc -> 1.3.9 (*)
| | | \--- io.ktor:ktor-utils:1.4.0 (*)
louiscad
08/29/2020, 5:43 PMRyan Pierce
08/29/2020, 9:07 PMorg.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9
coroutines and the 1.4.0-release-Studio4.0-1
Kotlin plugin. However, I don't see coroutines in my debugger menu as shown below. I've restarted and have Disable coroutine agent
un-checked. What am I missing?Simon Lin
08/31/2020, 9:48 AMinfix fun <E, R> ReceiveChannel<E>.zip(
other: ReceiveChannel<R>
): ReceiveChannel<Pair<E, R>> (source)
is deprecated. What is the alternative?Maciek
08/31/2020, 12:02 PMSebastien Leclerc Lavallee
08/31/2020, 2:55 PMAuthState.performActionWithFreshTokens(callback: (freshToken: String?))
which uses a callback to give me the fresh token. And my transport service uses coroutines, something like:
class TransportService {
suspend fun getAllTasks(): List<Task> {
authState.performActionWithFreshToken() { freshToken ->
return ktorClient.get<List<Task>>()
}
}
}
Now, I have an error where I use the ktor client: suspend function can be called only within coroutines scope
. That is because the callback function is not a coroutine scope. If I create a new scope within the callback, will this work well with the main function? What can I do to keep my transport with suspend function and not use completion handler everywhere? Thanks 🙂ansman
08/31/2020, 3:56 PMansman
08/31/2020, 3:56 PMbezrukov
08/31/2020, 5:28 PMansman
08/31/2020, 5:29 PMensureActive()
after my withContext
since I don't think it'll throw if the job has been cancelled during the delaybezrukov
08/31/2020, 5:32 PMansman
08/31/2020, 5:45 PMrunBlocking {
val parent = Job()
val child = launch {
withContext(parent) {
delay(100L)
}
println("After")
}
delay(50L)
child.cancel()
}
This code still prints afterensureActive()
before println
it works as expectedbezrukov
08/31/2020, 5:50 PMansman
08/31/2020, 5:50 PM