louiscad
03/25/2020, 11:56 AMrrva
03/25/2020, 1:19 PMSam Garfinkel
03/25/2020, 2:32 PMCoroutineScope
(with a delegate) on a class so that you can use Deferred
as the value of its properties? This is kind of useful for creating lazily computed properties that are backed by network I/O (I’m using Ktor):
class Foo(val client: HttpClient) : CoroutineScope by GlobalScope {
val bar: Deferred<String> by lazy {
async {
client.get<String>("<https://www.example.com>")
}
}
}
vineethraj49
03/25/2020, 5:46 PMvineethraj49
03/25/2020, 5:49 PMsuspend fun foo() {
val context = coroutineContext
dbi.withHandle { conn ->
async(context) {
bizLogic.suspendingFunction()
}
}.await()
}
Maciek
03/25/2020, 8:08 PMrunBlockingTest
function. From this test I'm running code that launches coroutine from MainScope
object that is suppose to have TestCoroutineDispatcher
but I can't seem to run delay
function inside that coroutine because I'm getting UncompletedCoroutinesError
exception. Is my test setup wrong or I just can't test a code that has delay
functions?
I don't really need that delay but was just wondering why it doesn't work, I thought that runBlocking and TestCoroutineDispatcher will take care of such casesthana
03/26/2020, 10:02 AMinvokeSuspend
takes a considerbale amount of time. How do you cope with this situation? can we expect the situation will improve when the jvm itself supports suspendable methods?Pacane
03/26/2020, 2:19 PMsuspend
functions, but I could get my code to work with the GlobalScope.launch
function. Now I've read in several places that I should avoid using the GlobalScope. So my question is, how can I create a coroutine scope for a given method (not the main) and still use coroutines for a fire and forget type of work.
I've tried doing something like fun myMethod() = runBlocking { .... }
and then inside use CoroutineScope.launch
but the code after the launch
block is still blocked by the launch
block. What could I do to make this work?Czar
03/26/2020, 2:57 PMfun serviceSseRequest(sseEmitter: SseEmitter) {
val liveOutput: Channel<String> = doSomethingInBackground()
liveOutput.consumeAsFlow().collect { sseEmitter.emitServerSideEvent(it) }
}
How do I determine that there won't be any more messages in the channel? Maybe there is some other component that I can use instead of the channel here?Orhan Tozan
03/26/2020, 3:53 PM// option 1
interface HomeViewModel {
fun onLoginButtonClick()
}
// option 2
interface HomeViewModel {
val loginButtonClicks: SendChannel<ButtonClick>
object ButtonClick
}
// option 3
interface HomeViewModel {
val loginButtonClicks: Flow<ButtonClick>
object ButtonClick
}
David Glasser
03/26/2020, 5:51 PMwithTimeout(5000L) {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
somePotentiallySlowThreadBlockingFunction()
}
}
Note that I'm perfectly fine with the fact that the timeout won't actually stop the work from happening in somePotentiallySlowThreadBlockingFunction
. This is happening at global process shutdown so the important thing is being able to move on to the next step of shutdown in finite time, not absolutely making sure that somePotentiallySlowThreadBlockingFunction
doesn't continue.
Like, I know withTimeout
alone won't work with non-cooperative code, but does the withContext maybe make it work?Tmpod
03/26/2020, 11:30 PMasyncio.create_subprocess_shell
)JP
03/27/2020, 12:16 PMAdam Grzybkowski
03/27/2020, 12:25 PMmaster
and sqldelight
On master branch I've tried to reproduce the original problem I've encountered when testing SqlDelight but without the SqlDelight code.
The problem
Tests are stuck when TestCoroutineDipsatcher
is injected and used in flowOn
operator here.
fun getData(): Flow<Pair<String?, String?>> {
val flow1 = channel1.asFlow()
.flowOn(dispatcher)
val flow2 = channel2.asFlow()
.flowOn(dispatcher)
return flow1.combine(flow2) { data1, data2 ->
data1 to data2
}
.flowOn(dispatcher)
}
suspend fun addData1(value: String?) = withContext(dispatcher) {
channel1.send(value)
}
@Test
fun `receives data1`() = runBlocking {
exampleDataSource.getData()
.test {
assertEquals(null to null, expectItem())
exampleDataSource.addData1("data1")
assertEquals("data1" to null, expectItem())
cancel()
}
}
So far I've been able to make the tests pass by removing the flowOn
operator or by changing the injected dispatcher to Dispatchers.Unconfined
Question
Why is this happening? Could anyone explain me why using flowOn
operator is causing this?
I may be using this wrong coming from Rx word 😕
repository url https://github.com/AdamGrzybkowski/coroutineTestingProblemMichael de Kaste
03/27/2020, 1:07 PMOrhan Tozan
03/28/2020, 4:27 PMval textField: Flow<String>
that depends on a val textInputEvents: Channel<TextInputEvent>
and a val clearTextFieldEvents: Channel<ClearTextInputEvent>
. I'm trying to initialize the textField flow by combining the textInputEvents and clearTextFieldEvents channels and decide the value of the text field based of those two event streams, only problem is that just
val textField: Flow<String> = textInputEvents
.consumeAsFlow()
.combine(clearTextFieldEvents.consumeAsFlow()) { textInputEvent, clearTextFieldEvent ->
// Which one of the two just emitted?
// return textInputEvent.text or ""
}
wouldn't do it, since I have to know which one of the channels emitted value is the most recent one, which isnt possible.
So my question kinda is: is there a combine operator that also tells you which flow emitted the most recent value?
EDIT: perhaps this is a usecase for the select expression (https://kotlinlang.org/docs/reference/coroutines/select-expression.html) ?Erik
03/28/2020, 8:30 PMChannel
and bro.receiveAsFlow()
, but that results in the fan-out principle where the four values sent are handled once by any of the available collectors, but not each by all collectors. The alternative is to use a BroadcastChannel
and bro.asFlow()
instead, but then the collectors receive... nothing! :\
1. Why don't they receive any values? I.e. what am I doing wrong?
2. How can I make this work?
As far as I understand BroadcastChannel
can be used to send a value to multiple receivers. And the asFlow()
extension on it should open a new subscription for every collector? Clearly I'm doing something wrong, but I don't see it.
Bonus questions I don't know the answer of:
3. Is this possible at all with channels and flows? I found this open issue (https://github.com/Kotlin/kotlinx.coroutines/issues/1261) about a Flow.share
operator, which obviously doesn't exist yet. Is this what I might be looking for?jeggy
03/29/2020, 4:02 PMrunBlockingTest
and my test fails with "This job has not completed yet" what are some ways of figuring out what is causing this?
I like that it does fail, but it doesn't bring any information on where the problem is.myanmarking
03/29/2020, 6:56 PMJakub Pi
03/29/2020, 10:46 PMMap<FileKey, Deferred<String>>
where I want to cache (expensive-to-compute) SHA-512 results. My objective is to only calculate hash values once, on the first retrieval from the Map. So I know I want some kind of Future object. I know I can start the computation in the getter (unless it's already complete) and then just return the result normally. But is Deferred the best way to do this or is there a different construct/pattern I should be looking at instead? I probably want to block on the retrieval, so Deferred may not be the best choice.vineethraj49
03/30/2020, 6:30 AM<http://Dispatchers.IO|Dispatchers.IO>
"clone" which has an even lesser number of threads?
I see that <http://Dispatchers.IO|Dispatchers.IO>
is defined as LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
, should it be possible to re-use the same implementation?Paul Woitaschek
03/30/2020, 3:32 PMkevinherron
03/30/2020, 8:06 PMFlow.debounce
that has a “failsafe” timeout - so that in the face of continuously emitted events at a rate faster than the debounce timeout something is at least occasionally emittedspierce7
03/31/2020, 4:54 AMFlow
if nothing else is output, and it completes, what’s the best way to do that?Jan Skrasek
03/31/2020, 9:42 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.*
fun <T> Flow<T>.share(): Flow<T> {
val channel = ConflatedBroadcastChannel<T>()
val counter = AtomicInteger()
var job: Job? = null
return channel
.asFlow()
.onStart {
if (counter.incrementAndGet() == 1) {
job = GlobalScope.launch {
this@share
.catch {
channel.close(it)
}
.collect {
channel.offer(it)
}
}
}
}
.onCompletion {
if (counter.decrementAndGet() == 0) {
job?.cancelAndJoin()
job = null
}
}
}
Pacane
03/31/2020, 12:26 PMsuspend fun fastSubscribe(path: String): Value =
suspendCancellableCoroutine { cont: CancellableContinuation<Value> ->
val handler = object : Handler<SubscriptionValue> {
override fun handle(event: SubscriptionValue) {
requester.unsubscribe(path, this)
cont.resume(event.value)
}
}
requester.subscribe(path, handler)
}
Here I want to finish the execution with the value in the callback when I have one, but I'd like to add a timeout for if I never get a value (ie: when handle
is never called)
I think I should use a withTimeout
in there but I don't know where exactly. On timeout I need to call requester.unsubscribe(handler)
, so having the withTimeout
on the call site of fastSubscribe
isn't really a solution. Is there anything I can do with this?Antoine Gagnon
03/31/2020, 7:06 PMrrva
04/01/2020, 5:35 PMMartinZaubitzer
04/01/2020, 8:35 PMAndrey Stepankov
04/02/2020, 1:57 PMclass MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
private val singleThread = this + newSingleThreadContext("SingleThread")
override fun onCreate(savedInstanceState: Bundle?) {
Timber.plant(Timber.DebugTree())
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val api = PokemonApi.createApi()
button.setOnClickListener {
singleThread.launch {
try {
val names = async { api.getPokemonNames() }
val types = async { api.getPokemonTypes() }
val stages = async { api.getPokemonStages() }
val owners = async { api.getPokemonOwners() }
names.join()
types.join()
stages.join()
owners.join()
} catch (e: UnknownHostException) {
// exception is not consumed
}
}
}
val crashHandler = Thread.getDefaultUncaughtExceptionHandler()
val exceptionHandler = Thread.UncaughtExceptionHandler { thread, exception ->
try {
Timber.tag("UncaughtException").e(exception)
} finally {
crashHandler?.uncaughtException(thread, exception)
}
}
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}
interface PokemonApi {
@GET("/names")
suspend fun getPokemonNames()
@GET("/types")
suspend fun getPokemonTypes()
@GET("/stages")
suspend fun getPokemonStages()
@GET("/owners")
suspend fun getPokemonOwners()
companion object {
fun createApi(): PokemonApi {
return Retrofit.Builder()
.baseUrl("<https://unavailable-host.com>")
.build()
.create(PokemonApi::class.java)
}
}
}
}
stacktrace
E/UncaughtException: <http://java.net|java.net>.UnknownHostException: Unable to resolve host "<http://unavailable-host.com|unavailable-host.com>": No address associated with hostname
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:156)
at <http://java.net|java.net>.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:103)
at <http://java.net|java.net>.InetAddress.getAllByName(InetAddress.java:1152)
at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:199)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:108)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:76)
at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:245)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:82)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:74)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:197)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:502)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:919)
Suppressed: <http://java.net|java.net>.UnknownHostException: Unable to resolve host "<http://unavailable-host.com|unavailable-host.com>": No address associated with hostname
... 24 more
Caused by: android.system.GaiException: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
at libcore.io.Linux.android_getaddrinfo(Native Method)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:200)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:135)
... 23 more
Caused by: android.system.GaiException: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
at libcore.io.Linux.android_getaddrinfo(Native Method)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:200)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:135)
... 23 more
E/AndroidRuntime: FATAL EXCEPTION: SingleThread
Process: com.example.myapplication, PID: 24936
Andrey Stepankov
04/02/2020, 1:57 PMclass MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
private val singleThread = this + newSingleThreadContext("SingleThread")
override fun onCreate(savedInstanceState: Bundle?) {
Timber.plant(Timber.DebugTree())
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val api = PokemonApi.createApi()
button.setOnClickListener {
singleThread.launch {
try {
val names = async { api.getPokemonNames() }
val types = async { api.getPokemonTypes() }
val stages = async { api.getPokemonStages() }
val owners = async { api.getPokemonOwners() }
names.join()
types.join()
stages.join()
owners.join()
} catch (e: UnknownHostException) {
// exception is not consumed
}
}
}
val crashHandler = Thread.getDefaultUncaughtExceptionHandler()
val exceptionHandler = Thread.UncaughtExceptionHandler { thread, exception ->
try {
Timber.tag("UncaughtException").e(exception)
} finally {
crashHandler?.uncaughtException(thread, exception)
}
}
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}
interface PokemonApi {
@GET("/names")
suspend fun getPokemonNames()
@GET("/types")
suspend fun getPokemonTypes()
@GET("/stages")
suspend fun getPokemonStages()
@GET("/owners")
suspend fun getPokemonOwners()
companion object {
fun createApi(): PokemonApi {
return Retrofit.Builder()
.baseUrl("<https://unavailable-host.com>")
.build()
.create(PokemonApi::class.java)
}
}
}
}
stacktrace
E/UncaughtException: <http://java.net|java.net>.UnknownHostException: Unable to resolve host "<http://unavailable-host.com|unavailable-host.com>": No address associated with hostname
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:156)
at <http://java.net|java.net>.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:103)
at <http://java.net|java.net>.InetAddress.getAllByName(InetAddress.java:1152)
at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:199)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:108)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:76)
at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:245)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:82)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:74)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:197)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:502)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:919)
Suppressed: <http://java.net|java.net>.UnknownHostException: Unable to resolve host "<http://unavailable-host.com|unavailable-host.com>": No address associated with hostname
... 24 more
Caused by: android.system.GaiException: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
at libcore.io.Linux.android_getaddrinfo(Native Method)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:200)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:135)
... 23 more
Caused by: android.system.GaiException: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
at libcore.io.Linux.android_getaddrinfo(Native Method)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:200)
at libcore.io.ForwardingOs.android_getaddrinfo(ForwardingOs.java:74)
at <http://java.net|java.net>.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:135)
... 23 more
E/AndroidRuntime: FATAL EXCEPTION: SingleThread
Process: com.example.myapplication, PID: 24936
streetsofboston
04/02/2020, 2:02 PMAndrey Stepankov
04/02/2020, 2:03 PMlouiscad
04/02/2020, 2:03 PMasync
in a local coroutineScope
, and put your try
block around it. Without it, the launch
gets the crash, cancels all children coroutines and crashes its parent scope.streetsofboston
04/02/2020, 2:05 PMawait()
on async calls’ Deferred<T>
results be sufficient?Andrey Stepankov
04/02/2020, 2:05 PMlouiscad
04/02/2020, 2:10 PMCancellationException
) will cancel its parent scope. So if you want to handle errors without crashing the parent scope, you need a middle scope (coroutineScope { ... }
), and catch exceptions/throwables around it. The intent of the behavior is to cancel fast work in the same scope if one fails, to avoid pursuing unneeded work that cannot be used as a part already failed.streetsofboston
04/02/2020, 2:12 PMSupervisorJob
MainActivity’s CoroutineScope in the should be sufficient to not cancel the MainActivity’s CoroutineScope after a failure happens.
In my opinion, if names
fails, you want the others (types
, stages
, etc) to be cancelled as well.Andrey Stepankov
04/02/2020, 2:13 PMlaunch {
try {
getCats()
} catch() {
// no-op
}
}
// getCats will throw UnknownHostException
suspend fun callApi() = httpClient.getCats()
louiscad
04/02/2020, 2:15 PMlifecycleScope
extension brought by AndroidX Lifecycle runtime KTX 2.2.0+ that already uses a SupervisorJob
for the case where you use independant async
where one can fail while letting the other still be useful on its own.Andrey Stepankov
04/02/2020, 2:17 PMMainScope() + newSingleThreadContext("SingleThread")
streetsofboston
04/02/2020, 2:17 PMval names = async { api.getPokemonNames() }
val types = async { api.getPokemonTypes() }
val stages = async { api.getPokemonStages() }
val owners = async { api.getPokemonOwners() }
val listOfResults = listOf(names, types, stages, owners).awaitAll()
... or call `await()` on each one individually depending on your use-case...
lifecycleScope
instead. It is all set up correctly already (lifecycleScope.launch { …. }
) 🙂Andrey Stepankov
04/02/2020, 2:28 PM