Derek Berner
10/08/2019, 5:05 PMConcurrentHashMap
and ReadWriteLock
in a library anywhere?Robert Jaros
10/08/2019, 5:42 PMFlux<T>
to Channel<T>
?Paul Woitaschek
10/09/2019, 7:26 AMfun functionImpl(a: Int, b: String) {}
fun nonSuspending(function: (Int, String) -> Unit) {}
fun suspending(function: suspend (Int, String) -> Unit) {}
fun test() {
nonSuspending(::functionImpl)
suspending(::functionImpl)
}
ylemoigne
10/09/2019, 8:33 AMIn 'uncaughtExceptionHandler' ...
is called ?Jan Skrasek
10/09/2019, 1:04 PM2019-10-09 14:49:52.581 4418-4418/my.app.id E/AndroidRuntime: FATAL EXCEPTION: main
Process: my.app.id, PID: 4418
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=SupervisorJobImpl{Cancelled}@b43ae6d
val myEvent = Channel<Unit>(Channel.RENDEZVOUS)
viewModel.myEvent.consumeAsFlow()
.onEach {
Toast.makeText(requireContext(), R.string.error, Toast.LENGTH_LONG).show()
}
.launchIn(lifecycleScope)
1. it is not obivous what caused the crash from the stack trace; emiting to closed channel?
2. how can I workaround it?igorvd
10/09/2019, 5:20 PMMatt Thiffault
10/09/2019, 11:09 PMMatt Thiffault
10/10/2019, 2:05 AMgroostav
10/10/2019, 8:26 AMFlow<Change<oldValue: String, newValue: String>>
and I want to have a behaviour whereby:
1. we use debounce
for 200 ms, such that as fast edits are made to this text, no changes are published
2. when there is a 200ms pause, it publishes a Change
with the old value from before any changes were made, and the new value after all changes
3. this can be short-circuited by "focus lost", if focus changes from true to false the 200ms wait is interrupted, a change is published right away, and any pending changes are cancelled.
Anybody done something like this? Care to share?
If not, care to speculate with me?Dominaezzz
10/10/2019, 10:19 AMcoroutineScope
vs withContext
question. Does coroutineScope
change the coroutineContext
like withContext
does? (I'm referring to the intrinsic coroutineContext
and not CoroutineScope.coroutineContext
). The docs aren't explicit enough to inspire confidence.bnn
10/10/2019, 11:12 AMonReceiveOrNull
defined on ReceiveChannel
interface is now deprecated, and onReceiveOrNull
extension function defined on ReceiveChannel
is now ExperimentalCoroutinesApi
. I'm trying to use extension version of onReceiveOrNull but I can't. Is there anything wrong?Marko Mitic
10/10/2019, 11:45 AMdelay()
)Paul Woitaschek
10/11/2019, 6:27 AMRxJavaPlugins.setErrorHandler { }
val flow = rxSingle<Unit> { throw IOException("") }.toFlowable().asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
Why does this crash?miguelsesma
10/11/2019, 9:35 AMprivate val job = Job()
private val coroutineContext = job + Dispatchers.Main
private val scope: CoroutineScope = CoroutineScope(coroutineContext)
override fun onResume() {
super.onResume()
scope.launch { subscribe() }
}
private suspend fun subscribe() {
dataStore.subscribe()
val channel: ReceiveChannel = dataStore.receiveMessage()
for (message in channel) {
when (message.type) {
"error" -> textView_scrollable.addTextNl("error: ${message.message}")
"ticker" -> textView_scrollable.addTextNl("ticker: ${message.sequence}, ${message.productId}, ${message.price}")
}
}
}
override fun onPause() {
dataStore.unsubscribe()
job.cancel()
super.onPause()
}
But I have tried the same using consumeEach
instead the for loop:
private suspend fun subscribe() {
dataStore.subscribe()
val channel: ReceiveChannel = dataStore.receiveMessage()
channel.cosumeEach { message ->
when (message.type) {
"error" -> textView_scrollable.addTextNl("error: ${message.message}")
"ticker" -> textView_scrollable.addTextNl("ticker: ${message.sequence}, ${message.productId}, ${message.price}")
}
}
}
It still works receiving the information, but when I cancel the job I get a JobCancellationException
. consumeEach
Documentation says:
Performs the given action for each received element and cancels the channel after the execution of the block. If you need to iterate over the channel without consuming it, a regular for loop should be used instead.
I'm not sure what it means with cancels the channel after the execution of the block. Maybe it is related. I have tried other things like try-finally
, NonCancellable
, SupervisorScope
... But if I get rid of the exception I get a memory leak in exchange because the coroutine remains alive after the fragment dies.
I know that probably is something I'm understanding badly. But after reading everything I have found, I'm still lost.Sebouh Aguehian
10/11/2019, 1:21 PMDico
10/11/2019, 4:32 PMprivate val termination = CountDownLatch(1)
private val scope = dispatcher + Job().apply {
thread {
// use non-dameon thread to keep application alive until all tasks completed.
termination.await()
while (children.any { it.isActive }) Thread.sleep(50)
}
}
fun shutdown() {
termination.countDown()
}
SrSouza
10/12/2019, 2:04 PMdelay
?
I want to know because when a launch a new job in Dispatchers.Main and use Thread.sleep, the codes blocks, and its okay, this make sense, but, how delay
doesn't block when I use it in the Dispatchers.Main? He create a new single thread pool and waits three and use suspendCoroutine and resume after that?zak.taccardi
10/13/2019, 1:44 AMDispatchers.Main.immediate
on Android?napperley
10/14/2019, 2:41 AMNoSuchMethodError
when running a program that uses a Kotlin library I have developed called KMQTT Client ( https://gitlab.com/napperley/kmqtt-client ), which uses coroutines. The error occurs when invoking the publish
function. Below is the function's definition ( https://gitlab.com/napperley/kmqtt-client/blob/master/src/jvmMain/kotlin/org/digieng/kmqtt/client/MqttClient.kt#L37 ) in the library:
actual suspend fun publish(topic: String, msg: MqttMessage, timeout: Long): MqttError? = try {
client.publish(topic, PahoMqttMessage(msg.payload.toByteArray()))
null
} catch (mqttPersistenceEx: MqttPersistenceException) {
MqttError(mqttPersistenceEx.message ?: "Message persistence failed.", MqttStatus.MSG_PERSISTENCE_FAILED)
} catch (mqttEx: MqttException) {
MqttError(mqttEx.message ?: "Message delivery failed.", MqttStatus.MSG_DELIVERY_FAILED)
}
Below are some of the error details:
Exception in thread "main" java.lang.NoSuchMethodError: 'java.lang.Object org.digieng.kmqtt.client.MqttClient.publish$default(org.digieng.kmqtt.client.MqttClient, java.lang.String, org.digieng.kmqtt.client.MqttMessage, long, kotlin.coroutines.Continuation, int, java.lang.Object)'
Alexjok
10/15/2019, 8:14 AMmyanmarking
10/15/2019, 2:50 PMmainScope.launch{
onDateChanged // conflatedBroadcastChannel
.asFlow()
.distinctUntilChanged()
.collect{ ... }
onTimeChanged // conflatedBroadcastChannel
.asFlow()
.distinctUntilChanged()
.collect{ ... }
}
Tristan Caron
10/15/2019, 4:47 PMCasey Brooks
10/15/2019, 7:26 PMreadLine()
in a while loop?Rohan Maity
10/16/2019, 4:17 AMTsvetozar Bonev
10/16/2019, 10:50 AMDominaezzz
10/16/2019, 10:51 AMonStart
and onCompletion
of a Flow
. (Basically open a resource in onStart
and close it in onCompletion
) What's the most idiomatic way besides reinventing the wheel?Pablichjenkov
10/16/2019, 4:00 PMsuspend
. My naive response was:
Basically, if you declare a function `suspend`, when this function gets called, its actual invocation is placed in a Queue of Jobs to be executed in some Dispatcher.
If the Dispatcher Thread happen to be the same as the calling Thread, it may be possible that the invocation happens immediately in the same running loop.
After the *suspending function* executes free of exceptions, then code will resume at the line right after the function call site.
Then he replied: What happened to the original Thread that called the suspending function
.
Then I said: The calling Dispatcher Thread returns at the calling site. When the *suspend function* is over the Continuation mechanics make sure it resumes in the same caller Dispatcher, not necessarily the same caller Thread but the same caller Dispatcher
Is anything terribly misleading in my brief summary of suspension?napperley
10/17/2019, 3:24 AMException in thread "main" java.lang.NoSuchMethodError: org.digieng.kmqtt.client.MqttClient.connect$default(Lorg/digieng/kmqtt/client/MqttClient;Lorg/digieng/kmqtt/client/MqttConnectionOptions;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
at org.example.mqtt_test.MainKt$main$1.invokeSuspend(main.kt:11)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:270)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:79)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:36)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at org.example.mqtt_test.MainKt.main(main.kt:8)
at org.example.mqtt_test.MainKt.main(main.kt)
Which Issue Tracker should I use for creating the issue (Kotlin or KotlinX Coroutines)?alexfacciorusso
10/18/2019, 10:37 AMprivate val _state = liveData<LoginViewState> {
coroutineScope {
launch {
usernamePasswordChannel.consumeEach { (username, password) ->
validateLoginDataUseCase.validateLoginData(
username,
password
).perform(latestValue!!).also { emit(it) }
}
}
launch {
isLoggingInChannel.consumeEach {
usernamePasswordChannel.value.let { (username, password) ->
performLoginUseCase.login(
username,
password
)
}.perform(latestValue!!).also { emit(it) }
}
}
}
}
It looks a bit odd, has anyone any suggestion about something I’m misusing or that can be optimised in terms of code?Sujit
10/18/2019, 5:11 PMModule with the Main dispatcher had failed to initialize. For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used
. I do have the following in my unit tests:
@ObsoleteCoroutinesApi
private lateinit var mainThreadSurrogate: ExecutorCoroutineDispatcher
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
@Before
fun before() {
mainThreadSurrogate = newSingleThreadContext("UI thread")
Dispatchers.setMain(mainThreadSurrogate)
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
@After
fun tearDown() {
Dispatchers.resetMain() // reset main dispatcher to the original Main dispatcher
mainThreadSurrogate.close()
}
This only happens on one of many unit tests in the same class, which is in this place: https://github.com/UrbanCompass/Snail-Kotlin/blob/master/snail-kotlin/src/test/java/com/compass/snail/ObservableTests.kt#L221
I'm making this into a kotlin multiplatform project, and running unit tests through the androidTest
. Any pointers on this please?Sujit
10/18/2019, 5:11 PMModule with the Main dispatcher had failed to initialize. For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used
. I do have the following in my unit tests:
@ObsoleteCoroutinesApi
private lateinit var mainThreadSurrogate: ExecutorCoroutineDispatcher
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
@Before
fun before() {
mainThreadSurrogate = newSingleThreadContext("UI thread")
Dispatchers.setMain(mainThreadSurrogate)
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
@After
fun tearDown() {
Dispatchers.resetMain() // reset main dispatcher to the original Main dispatcher
mainThreadSurrogate.close()
}
This only happens on one of many unit tests in the same class, which is in this place: https://github.com/UrbanCompass/Snail-Kotlin/blob/master/snail-kotlin/src/test/java/com/compass/snail/ObservableTests.kt#L221
I'm making this into a kotlin multiplatform project, and running unit tests through the androidTest
. Any pointers on this please?zak.taccardi
10/18/2019, 5:12 PMCoroutineScope
runBlocking { }
or runBlockingTest { }
for unit testsSujit
10/18/2019, 5:13 PMobservable.debounce(Dispatchers.Default, delayMs = delayMs).subscribe(Dispatchers.Default, next = {
received.add(it)
})
Dispatchers.Main
beforezak.taccardi
10/18/2019, 5:14 PMlouiscad
10/18/2019, 5:14 PM@Before
annotated function.zak.taccardi
10/18/2019, 5:14 PMSujit
10/18/2019, 5:16 PMDispatchers
reference initialized in a different class, that gets injected to the class I'm testing. Is that not the way to do Coroutines@Before
annotation here. Here's what I've in setup:
private lateinit var subject: Observable<String>
private lateinit var strings: MutableList<String>
private var error: Throwable? = null
private var done: Boolean? = null
@ObsoleteCoroutinesApi
private lateinit var mainThreadSurrogate: ExecutorCoroutineDispatcher
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
@Before
fun before() {
runBlockingTest {
mainThreadSurrogate = newSingleThreadContext("UI thread")
Dispatchers.setMain(mainThreadSurrogate)
subject = Observable()
strings = mutableListOf()
error = null
done = null
subject.subscribe(Dispatchers.Unconfined,
next = { strings.add(it) },
error = { error = it },
done = { done = true }
)
}
}
runBlocking{}
scope, that throws out the same error. Sometimes, the test case, pass, but that error message of using setMain
still shows up in the logTestCoroutineDispatcher()
instead of newSingleThreadContext
😒imple_smile:zak.taccardi
10/18/2019, 6:27 PMSujit
10/18/2019, 6:28 PMzak.taccardi
10/18/2019, 6:29 PMinterface AppDispatchers {
val main: CoroutineContext get() = Dispatchers.Main
val default: CoroutineContext get() = Dispatchers.Default
val io: CoroutineContext get() = <http://Dispatchers.IO|Dispatchers.IO>
}
This allows for easy testingSujit
10/18/2019, 6:30 PMzak.taccardi
10/18/2019, 6:32 PMEspressoDispatchers()
fun EspressoDispatchers() : AppDispatchers = object : AppDispatchers {
override val default = AsyncTask.THREAD_POOL_EXECUTOR.asCoroutineDispatcher()
override val io = AsyncTask.THREAD_POOL_EXECUTOR.asCoroutineDispatcher()
}
CoroutineScope