kawmra
09/04/2019, 3:56 PMsend
without suspension?
It seems Channel(0)#send
will suspend unless any receivers are consuming values.
It is okay if values will be lost when no receivers are consuming.elizarov
09/05/2019, 4:57 AMkotlinx.coroutines
version 1.3.1
is released with several minor fixes.
Note: Kotlin/Native artifacts are now published with Gradle metadata format version 1.0, so you will need Gradle version 5.3 or later to use this version of kotlinx.coroutines in your Kotlin/Native project.
Full list of changes: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.3.1svenjacobs
09/05/2019, 10:47 AMchannelFlow
and callbackFlow
? Both have the same signature except latter is inline
. Is this the only difference?David Glasser
09/05/2019, 2:48 PMget()
method which launches a coroutine using CoroutineScope.future{}
. That coroutine should not be scoped to the get()
method's lifetime, because if get()
is called concurrently with the same key, both get()
calls will end up waiting on the same CompletableFuture, so cancelling the first coroutine shouldn't cause the second coroutine to fail. So I'm currently passing a CoroutineScope
into the constructor of the cache and using that to launch the `future`s.
However, my current implementation means that if any of the calls inside future{}
fail, the long-lived CoroutineScope associated with the cache is canceled, which I don't want. What I want basically is a supervisor scope nested inside the passed-in CoroutineScope, created at cache construction time. But supervisorScope
doesn't seem to be what I want — I can't call that at cache construction time because I don't want to actually wait before returning like a supervisorScope does! Is there some other API I should be using here? I feel like maybe the answer is SupervisorJob()
but I am still kinda confused about the distinction between Job and CoroutineScope — I can make a SupervisorJob(coroutineScope.coroutineContext[Job])
but I'm confused about how to extract a CoroutineScope from it that I can use for launching coroutines.Jan Stoltman
09/05/2019, 2:51 PMDavid Glasser
09/05/2019, 6:52 PMcache.getIfPresentAsync("x").let { deferred ->
assertNotNull(deferred)
assertTrue(deferred.isCompleted)
assertEquals(42, deferred.await())
}
IntelliJ is giving me an inspection "Deferred result is never used" on the word assertNotNull. Is there actually anything wrong with my code?
It looks like the inspection goes away if I delete the assertNotNull line and change the assertTrue to deferred!!.isCompleted
, but I'm wondering if I'm missing somethingzak.taccardi
09/06/2019, 12:56 AMFlow<T>
into a ReceiveChannel<T>
?
I’ve tried the following so far, but it’s deadlocking
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated("You should be using Flows, not channels.")
fun <T> Flow<T>.collectAsReceiveChannel(scope: CoroutineScope): ReceiveChannel<T> {
val flow = this
val channel = Channel<T>()
val flowJob = scope.launch {
flow
.onEach { item -> channel.send(item) }
.collect()
}
channel.invokeOnClose {
flowJob.cancel()
}
return channel
}
svenjacobs
09/06/2019, 6:01 AMcallbackFlow
, I now saw two different "official" approaches of how to deal with upstream exceptions returned by the wrapped callback. Should I either call close(exception)
or cancel(CancellationException("Error occurred", exception))
?davide
09/06/2019, 7:20 AMZach Klippenstein (he/him) [MOD]
09/06/2019, 6:26 PMcallbackFlow
and channelFlow
(e.g. https://github.com/Kotlin/kotlinx.coroutines/issues/1500).
The rationale for having two separate aliases for the same function is that “have different names to tailor their docs the the speicific use-case and to enable the code using them to convey your intention.”
Is the distinction between them really providing enough value to justify all the confusion they seem to be creating? It seems like the documentation could be merged, and I don’t fully understand how much value is really added by making “intent” clearer – in what case would choosing one over the other actually make code significantly clearer? If callbacks are being used, it will be apparent in the code because there’ll be lambdas and addListener
calls everywhere. Additionally, RxJava users have gotten used to using Observable.create { }
for integrating with callback-based APIs even though it doesn’t have “callback” in the name anywhere.matt tighe
09/06/2019, 7:53 PM@Test
fun listCanBeObserved() = runBlocking {
dao.insert(thing)
val data: List<Thing> = dao.observeAll().single()
val expected = Thing()
assertEquals(expected, data)
}
but if I manually collect each emission I can see the assertion is hit successfully in debug mode
fun listCanBeObserved() = runBlocking {
dao.insert(thing)
val data: List<Thing> = dao.observeAll().collect {
val expected = Thing()
assertEquals(expected, data)
}
}
but then the test will never completeRafa
09/07/2019, 3:31 PMGlobalScope.launch {
Log.d("rafa", "thread:${Thread.currentThread()}") // this prints Thread[DefaultDispatcher-worker-1,5,main]
someTextView.text = "new test"
}
Does anyone know why that doesn't crash ^? Even though it's changing text in the UI on a worker thread?darkmoon_uk
09/08/2019, 1:46 AMIterable<Deferred<T>>
?Sangeet
09/08/2019, 5:16 PM@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)
will there be any changes in above method in coming releases ?Adriano Celentano
09/09/2019, 2:15 PMDico
09/09/2019, 3:53 PMprivate val processor = scope.launch {
val batch = ArrayList<String>(MAX_BATCH)
suspend fun sendBatch() {
val payload = batch.joinToString(separator = "\n")
batch.clear()
return sendPayload(payload)
}
try {
while (isActive) {
batch.add(hitChannel.receive())
if (batch.size == MAX_BATCH) {
sendBatch()
}
}
} finally {
if (batch.isNotEmpty()) {
sendBatch()
}
}
}
I expect that final sendBatch
will throw exception because coroutine no longer isActive
. I can't do withContext(NonCancellable)
either at that point.mingkangpan
09/09/2019, 4:00 PMBroadcastArrayChannel
could someone try to explain it to me, especialy in which scenario this channel can be used?Kroppeb
09/09/2019, 4:58 PMstart
function
class EncryptionStep{
var key: SecretKey? = null
val input: ByteWriteChannel get() = _input
val output: ByteReadChannel get() = _output
private val _input = ByteChannel()
private val _output = ByteChannel()
@UseExperimental(InternalAPI::class)
fun CoroutineScope.start(encrypted: Pair<ByteReadChannel, ByteWriteChannel>) {
launch {
_input.read{
runBlocking{
when(val key = key){
null -> encrypted.second.writeAvailable(it)
else -> encrypted.second.writeAvailable(encryptData(key, it.moveToByteArray()))
}
}
}
}
}
}
fun CoroutineScope.encryptionStep(encrypted: Pair<ByteReadChannel, ByteWriteChannel>): EncryptionStep {
val step = EncryptionStep()
with(step){
start(encrypted)
}
return step
}
savrov
09/09/2019, 7:32 PMConflatedBroadcastChannel
, but i dont know how to do it. Pls share with your thoughtsrook
09/09/2019, 10:10 PME/File$object: kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getSendException(AbstractChannel.kt:1048)
at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:166)
at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt)
marcinmoskala
09/10/2019, 7:35 AMrunBlocking
starts by default on a thread named "main" instead of using Dispatchers.Default
?
fun main() = runBlocking() {
print(Thread.currentThread().name) // main
}
fun main() = runBlocking(Dispatchers.Default) {
print(Thread.currentThread().name) // DefaultDispatcher-worker-1
}
Eugen Martynov
09/10/2019, 8:47 AMrocketraman
09/10/2019, 4:36 PMfun main() {
runBlocking {
throw Exception("e")
}
}
I see the expected stack with the main.runBlocking
call in the stack. However, if I change the dispatcher to explicitly specify Dispatchers.Default
e.g. runBlocking(Dispatchers.Default)
, then all I see is the BaseContinuationImpl.resumeWith
in the stack, and the resume point. This is doubly weird, because I thought Dispatchers.Default
was... well, the default.Sergio C.
09/10/2019, 7:38 PMModule with the Main dispatcher is missing. Add dependency providing the Main dispatcher, e.g. 'kotlinx-coroutines-android'
??
started happening out of the blue! 😮vngantk
09/11/2019, 8:06 AMfun main() = runBlocking {
val job = Job()
launch {
delay(1000)
job.completeExceptionally(IllegalStateException("Failed"))
}
try {
job.join()
println("foo")
} catch (e: Exception) {
println("bar")
e.printStackTrace()
}
}
Adriano Celentano
09/11/2019, 8:39 AMvineethraj49
09/11/2019, 10:04 AMdave08
09/11/2019, 12:56 PMcurioustechizen
09/11/2019, 3:41 PMAaron Stacy
09/11/2019, 6:53 PMkotlinx.coroutines.scheduling.CoroutineScheduler$Worker.trySteal()Lkotlinx/coroutines/scheduling/Task
every time, which leads me to believe there's either a race in the coroutines library that's not affecting my code, or TSAN is unaware of some invariants in coroutines. Does anyone else have experience with TSAN and Kotlin Coroutines? Any idea if this is a bug or a config issue? I'm happy to share the trace if that'd help.Aaron Stacy
09/11/2019, 6:53 PMkotlinx.coroutines.scheduling.CoroutineScheduler$Worker.trySteal()Lkotlinx/coroutines/scheduling/Task
every time, which leads me to believe there's either a race in the coroutines library that's not affecting my code, or TSAN is unaware of some invariants in coroutines. Does anyone else have experience with TSAN and Kotlin Coroutines? Any idea if this is a bug or a config issue? I'm happy to share the trace if that'd help.Vsevolod Tolstopyatov [JB]
09/12/2019, 9:02 AMAaron Stacy
09/12/2019, 1:32 PM