cy
06/14/2018, 9:15 AMkotlinx.coroutines
version 0.23.2
fixes bug in kotlinx-coroutines-io
that causes content damagepakoito
06/14/2018, 9:26 AMcoder82
06/14/2018, 9:24 PMeygraber
06/15/2018, 8:45 AMspand
06/15/2018, 12:49 PMval cache = ConcurrentHashMap<Int, Deferred<String>>()
suspend fun getById(id: Int) : String {
return cache.computeIfAbsent(id) {
async { asyncAction(id) }
}.await()
}
suspend fun asyncAction(id: Int): String = TODO()
uli
06/16/2018, 5:43 PMashdavies
06/16/2018, 11:19 PMfilterIsInstance
method available for ReceiveChannel
?
If not, should I propose one as a PR? on KEEP or on the main repo?Daniel Tam
06/17/2018, 12:59 PMhannesstruss
06/17/2018, 5:45 PMrx.Observable.switchMap
for Channels: https://gist.github.com/hannesstruss/927ec8120d7cb312d80685f230d50c6e
It's kind of working ("just" deadlocking occasionally), but I found the way there to be very brittle, so much depends on using the right coroutine contexts in the right places. E.g. when not using Unconfined
for produce
in the transform
function, the wrong channel will be selected in whileSelect
and the outcome will be wrong.
Any general hints on how to approach problems like that?Paul Woitaschek
06/18/2018, 7:19 AMuli
06/18/2018, 1:34 PMconsumeEach
will execute code after the consumeEach
. Is this expected behaviour?
In the below example, if job.cancel()
is called, while the job is waiting for new Items from receiveChannel
it will print out Done
job = launch(coroutineContext) {
receiveChannel.consumeEach { r ->
output.send(r)
}
println("Done")
output.close()
}
If relevant, I do have a self contained sample, which is an alternative implementation of @hannesstruss switchMap example.
Please let me know if I should post it.hannesstruss
06/18/2018, 1:35 PMuli
06/18/2018, 1:43 PMuli
06/18/2018, 3:01 PMproduce
, does `this.close()`cancel the producer? Is this documented?uli
06/18/2018, 3:42 PMA channel that was closed without a cause throws ClosedSendChannelException on attempts to send or receive. A channel that was closed with non-null cause is called a failed channel. Attempts to send or receive on a failed channel throw the specified cause exception.
Documentation for produce
mentions:
The running coroutine is cancelled when its receive channel is cancelled.
But does not talk about the SendChannel being closedEvgeniy Zaharov
06/20/2018, 3:19 PMlaunch
to async
and join
to await
everything start to work as expect, I see all three catch (as described in https://github.com/Kotlin/kotlinx.coroutines/issues/61). But with launch
its very strange that method-catch
doesn’t callFabio Tudone
06/21/2018, 1:01 PMpakoito
06/23/2018, 7:52 PMpakoito
06/24/2018, 10:44 AMScheduler.trampoline()
pakoito
06/24/2018, 1:03 PMpakoito
06/24/2018, 1:04 PMpakoito
06/24/2018, 1:24 PMdave08
06/24/2018, 3:34 PMChannel
like this:
val eventQueue = Channel<ApplicationInstallRequest>()
init {
initEventProcessor()
}
private fun initEventProcessor() = launch(Unconfined) {
for (request in eventQueue) {
processRequest(request)
}
}
override fun onEvent(event: ApplicationInstallRequest) {
eventQueue.sendBlocking(event)
}
from an Android SyncAdapter, but even when the sync is finished, the process keeps on going (some kind of leak...), does Unconfined have it's own thread or something?dave08
06/25/2018, 4:26 PMwhileSelect
, but what's the boolean that needs to be returned for? It doesn't look docuemented in the guide's examples..?dave08
06/26/2018, 5:20 PMlittlelightcz
06/27/2018, 8:06 AMwithContext(CommonPool) { ... }
just like withContext { ... }
? (similarly as async { } can be called on default pool)
However just withContext { ... }
might seem confusing, so maybe it would be nice to add some shorthand with a different name for this if there isn't anything like that already 🙂dave08
06/27/2018, 1:17 PMMutex
will anyways protect that var syncRequested
... I don't need complex logic, just to squash the next bunch of requests to only one syncRequest... thanks! Right now, we're still on targetSdk 22 🙈 (I know I'll have to change that sooner or later)... But I don't want the sync to suddenly be stopped in the middle if the user goes out of the app, so how can it be foreground? @louiscadDaniel Tam
06/27/2018, 1:20 PMdave08
06/27/2018, 1:23 PMLifecycleService
also manages its own thread?rocketraman
06/27/2018, 3:23 PMByteReadChannel
that I need to adapt to a rx2 API that uses Flowable<ByteBuffer>
. Here are the methods I've come up with, with some guidance from @Deactivated User and @cy on #ktor, to adapt these in both directions -- these seem to work but opinions on correctness, efficiency, and style are welcome:
Adapting Flowable<ByteBuffer>
to ByteReadChannel
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
try {
this@toByteReadChannel.consumeEach {
channel.writeFully(it)
}
} catch (e: Throwable) {
channel.close(e)
}
}.channel
and in the other direction:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
return rxFlowable {
this@toFlowable.lookAheadSuspend {
consumeEachRemaining {
// we have to make a copy as the ByteBuffer here comes from a reused ring buffer
this@rxFlowable.channel.send(it.copy())
true
}
this@rxFlowable.close()
}
}
}
Note the copy()
in the second code snippet. I don't like this but don't know how to get rid of it.rocketraman
06/27/2018, 3:23 PMByteReadChannel
that I need to adapt to a rx2 API that uses Flowable<ByteBuffer>
. Here are the methods I've come up with, with some guidance from @Deactivated User and @cy on #ktor, to adapt these in both directions -- these seem to work but opinions on correctness, efficiency, and style are welcome:
Adapting Flowable<ByteBuffer>
to ByteReadChannel
suspend fun Flowable<ByteBuffer>.toByteReadChannel(parent: Job = Job()) = writer(Unconfined, parent = parent, autoFlush = true) {
try {
this@toByteReadChannel.consumeEach {
channel.writeFully(it)
}
} catch (e: Throwable) {
channel.close(e)
}
}.channel
and in the other direction:
suspend fun ByteReadChannel.toFlowable(): Flowable<ByteBuffer> {
return rxFlowable {
this@toFlowable.lookAheadSuspend {
consumeEachRemaining {
// we have to make a copy as the ByteBuffer here comes from a reused ring buffer
this@rxFlowable.channel.send(it.copy())
true
}
this@rxFlowable.close()
}
}
}
Note the copy()
in the second code snippet. I don't like this but don't know how to get rid of it.this@rxFlowable.close()
was occasionally causing the library consuming the Flowable
to throw an error. I've removed it, but is that ok to do in this case?