dave08
02/10/2020, 5:15 PMFlow
has singleOrNull
but not firstOrNull
?Erik
02/11/2020, 3:54 PMTestCoroutineDispatcher
or TestCoroutineScope
, then any exceptions are swallowed (although the stack traces are printed). What is the general pattern to work with expected exceptions in unit tests and coroutines on Android?Jiri Bruchanov
02/11/2020, 9:51 PM'More than one file was found with OS independent path 'META-INF/kotlinx-coroutines-core.kotlin_module'
?voben
02/11/2020, 11:43 PMsuspend fun doSomething(): String {
val myFlow = flow { emit("hello") }
myFlow.collect { myString ->
return myString
}
}
mng
02/12/2020, 12:51 AMFlows
, is a channelFlow
what I need for my particular use-case? There would be one singular source-of-truth that multiple call sites will be subscribing to for the User’s location and would be a cold stream as the location would only be updated based on a method invocation.Abhishek Bansal
02/12/2020, 4:49 PMflowOf(
remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
.catch { error -> Timber.e(error) },
remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
Timber.i("Response Received")
}
my expectation here was that I will get first result first and then second after sometime as response comes from server. But the problem here is collect only gets called after server returns result.Arkadii Ivanov
02/12/2020, 9:18 PMCoroutineDispatcher
implementation using this implementation as a reference: https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt However Delay
interface is marked as InternalCoroutinesApi
. Is there a good way to implement delaying?LastExceed
02/12/2020, 11:45 PMimport io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.*
import kotlinx.coroutines.*
import <http://kotlinx.coroutines.io|kotlinx.coroutines.io>.*
import <http://java.net|java.net>.InetSocketAddress
import java.nio.channels.ClosedChannelException
suspend fun main() {
val echoServer = EchoServer()
supervisorScope {
launch {
echoServer.start()
}
while (readLine() != "exit") {
println("unknown command")
}
println("stopping server")
echoServer.stop()
}
}
class EchoServer {
private val listener: ServerSocket =
aSocket(ActorSelectorManager(<http://Dispatchers.IO|Dispatchers.IO>)).tcp().bind(InetSocketAddress(12345))
private val clients = mutableListOf<Socket>()
suspend fun start() {
coroutineScope {
while (true) {
val client =
try {
listener.accept()
} catch (_: ClosedChannelException) {
break
}
launch {
handleClient(client)
}
}
}
}
private suspend fun handleClient(client: Socket) {
println("new client connected")
clients.add(client)
val reader = client.openReadChannel()
val writer = client.openWriteChannel(true)
while (true) {
val message = reader.readUTF8Line() ?: run {
println("client disconnected")
return
}
writer.writeStringUtf8("$message\n")
}
}
fun stop() {
listener.close()
clients.forEach {
it.close()
}
clients.clear()
}
}
CLOVIS
02/13/2020, 12:46 PMmap
, etc) in parallel and send elements as soon as possible, even if the order is lost?taer
02/13/2020, 3:05 PMsample
operation is almost exactly what I want. Except for that small warning Note that the latest element is not emitted if it does not fit into the sampling window.
I want to collapse elements(just like sample does), but the last element at any time is kinda important. Any pointers?vineethraj49
02/13/2020, 7:59 PMblock: suspend () -> T
and block: suspend CoroutineScope.() -> T
types?getabu
02/13/2020, 9:21 PMsvenjacobs
02/14/2020, 8:39 AMCoroutineScope
that keeps on running even if the scope is cancelled and if another (new) scope is accessing the same function/computation, the already computed value should be returned? Sounds like a job for a ConflatedChannel
, right? I tried something with Deferred
but the code (test code!!) is far from beautiful:
var result: Deferred<Int>? = null
fun longComputationAsync(): Deferred<Int> {
result = result ?: GlobalScope.async {
println("longComputation")
delay(1000)
1337
}
return result!!
}
val scope1 = CoroutineScope(Dispatchers.Default)
scope1.launch {
println("scope 1")
println(longComputationAsync().await())
}
scope1.cancel()
val scope2 = CoroutineScope(Dispatchers.Default)
val job = scope2.launch {
println("scope 2")
println(longComputationAsync().await())
}
runBlocking {
job.join()
}
Chills
02/14/2020, 11:16 AMEvan R.
02/14/2020, 3:52 PMSuspendingView
class which allows you to invoke a suspendingAction { }
block as a coroutine replacement for asyncAction { }
(i.e. the block gives you a CoroutineScope for invoking suspending functions). It also auto-cancels launched coroutines if the view is undocked.
Is this something anyone else would be interested in? If so, what’s the best way to contribute this back to the TornadoFX project?
Here’s the current implementation in my project: https://github.com/emanguy/GitlabTimeTracker/blob/master/src/main/kotlin/ui/util/SuspendingView.kt
And here’s somewhere I use it to call suspending functions: https://github.com/emanguy/GitlabTimeTracker/blob/master/src/main/kotlin/ui/view/LoginView.kt#L33
Slack ConversationCLOVIS
02/15/2020, 3:48 PMnext()
or something similar), and from one-to-many producers to add new data to it.
That sounds awfully similar to what a Channel is, however from my understanding the consumer·s of a channel act as soon as an element is produced, rather than when they want to?
Basically I'm searching for a way to have the client decide when new elements are produced, and that data structure would have some kind of buffer of size n
so the client could almost always get the data instantaneously, and the producer·s would get to work immediately when the buffer gets too small to add new elements.Ahmed Ibrahim
02/15/2020, 9:30 PMPublishProcessor
in the Coroutines world?Saket Poddar
02/17/2020, 9:02 AMgsala
02/17/2020, 9:17 AMfun connect(bluetoothDevice: BluetoothDevice) : Observable<RxBleConnection>
. The reason this returns an Observable
instead of a Single
is that the life-cycle of the Observable
is tied to the connection. So this observable will only ever emit one item, but it will stay alive without completing so the connection stays alive until we dispose.
I want to wrap this to use coroutines instead.
I'm thinking about having a function like suspend fun connect(bluetoothDevice : BluetoothDevice) : RxBleConnection
,
Any ideas on how to handle the life-cycle of the Observable? If I just use connectObservable.awaitFirst()
it will take the first value, and dispose, which will stop the connection.myanmarking
02/17/2020, 1:13 PMrkeazor
02/17/2020, 3:06 PMMichael Friend
02/17/2020, 5:26 PMRobert Jaros
02/18/2020, 1:59 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) { }
when using ConcurrentHashMap
inside a suspending function?Brendan Weinstein
02/18/2020, 7:22 PMMichael Friend
02/18/2020, 8:35 PMtry {
exceptionalMethod()
catch (e: Exception) {
if (e is CancellationException) {
throw e
}
// handle other exceptions
}
Would it be worth writing a some sort of top level function similar to runCatching that does the above? Or are there cleaner solutions?Mohamed Ibrahim
02/18/2020, 10:43 PMasync
and await
?JoakimForslund
02/19/2020, 1:54 PMExpected property 'PLATFORM_DISPATCHER' has no actual declaration in module
The following declaration is incompatible because return type is different:
internal actual var PLATFORM_DISPATCHER: [ERROR : CoroutineDispatcher]
I can't compile my iosArm32 module after updating to 1.3.61Mohamed Ibrahim
02/19/2020, 2:07 PMcontinuation
object which is passed around with suspend functions?Sanket Mehta
02/19/2020, 2:45 PMimport kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import java.util.stream.IntStream
fun main() {
runBlocking {
val asyncs = IntStream.range(0, 6).mapToObj {
GlobalScope.async {
handleData("handler-$it")
}
}
asyncs.forEach {
runBlocking {
it.await()
}
}
}
}
private fun handleData(handlerName: String) {
println(handlerName)
while (true) {
}
}
Current output:
handler-0
Expected output:
handler-0
handler-1
handler-2
handler-3
handler-4
handler-5myanmarking
02/19/2020, 3:56 PM