Sam Gammon
12/02/2022, 8:24 PMGioele Dev
12/04/2022, 3:09 PMviewModelScope.launch( <http://Dispatchers.IO|Dispatchers.IO>)
is it correct ?He
12/05/2022, 3:00 AMfun wait(){
val timeout= TimeHelper.getNow() + timeoutInterval // <- part of class constructor
if(processor.getLag() > 0){
val currentTIme = TimeHelper.getNow()
if(currentTime >= timeout){ System.exit(42) }
Thread.sleep(someTime)
}
}
And want to use a coroutine runBlocking
and delay
, would it be something like
fun wait() = runBlocking{
launch { delayMethod() }
if(processor.getLag() > 0){
val currentTIme = TimeHelper.getNow()
if(currentTime >= timeout){ System.exit(42) }
}
}
suspend fun delayMethod(){
delay(someTime)
}
Patrick Steiger
12/06/2022, 2:41 AMcoroutineScope { ensureActive() }
?Gioele Dev
12/06/2022, 3:32 AMrrva
12/06/2022, 8:47 PMcoroutinedispatcher
12/06/2022, 9:16 PMsuspend fun timeDifference() : Int {
val x = System.currentTimeMillis()
delay(5.toDuration(DurationUnit.SECONDS)
val y = System.currentTimeMillis()
return TimeUnit.SECONDS.ofSecond(y - x)
}
and then write a test for this:
@Test
fun test() = runTest {
val result = timeDifference()
advanceUntilIdle()
printLn(result)
}
Is it true that the output would be inconsistent? I tried it a lot of times and it doesn’t print the “predicted result” (in this case 5 seconds) but rather some other number.Jeong Rok Suh
12/07/2022, 6:07 AMAnuta Vlad Sv
12/07/2022, 8:46 AMinline fun <A> Flow<A>.bindState(
lifecycleOwner: LifecycleOwner,
crossinline bind: suspend (A) -> Unit,
) =
lifecycleOwner.lifecycleScope.launchWhenStarted {
collect(bind)
}
Recently I've updated the coroutines library to 1.6.4 (from 1.5.2) and looks like Flow.collect(crossinline action: suspend (value: T) -> Unit)
doesn't work with the newer version of coroutines dependencies. Flow.collect()
now takes a FlowCollector
functional interface argument instead of a suspend (T) -> Unit
. Any suggestions of how I can still keep this as an inline function?arekolek
12/08/2022, 3:36 PMfun main() = runBlocking {
val service = createFooService()
val response = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
service.foo()
}
println(response)
}
interface FooService {
@POST("foo")
suspend fun foo(): FooResponse
}
dependencies {
implementation("com.jakewharton.retrofit:retrofit2-kotlinx-serialization-converter:0.8.0")
implementation("com.squareup.okhttp3:okhttp:4.10.0")
implementation("com.squareup.retrofit2:retrofit:2.9.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.6.4")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.4.1")
}
Roberto Fernandez Montero
12/09/2022, 9:40 AMfun start() {
coroutineScope.launch {
socketIoServer.startAsync()
}
}
//Another class
socket.start()
or
fun start() {
socketIoServer.startAsync()
}
//Another class
scope.launch{ socket.start() }
George
12/09/2022, 11:11 AMStylianos Gakis
12/09/2022, 11:38 AMTestScope
provided inside the runTest{}
function to pass it to a launchIn
function to create a hot StateFlow.
Problem is, the test this way never ends since the coroutineScope always has something to do therefore runTest
times out and fails my test.
I’ve “fixed” this by adding the line this.coroutineContext.cancelChildren()
as the last line of my test after everything I’ve asserted was correct and so on. Is this considered okay or should I really be careful with something like this? Feels like I’m doing something in a way which doesn’t solve the root cause but patches it, but could be wrong.Stylianos Gakis
12/09/2022, 1:48 PMokhttp3.Authenticator
which unfortunately isn’t suspending and I am in there using runBlocking
to bridge this gap. My problem is as such:
In the body of that authenticator, everything is wrapped with a runBlocking
but that one in turn calls a suspending function from another service.
That function in turn, calls other suspending functions which at some point are using the CoroutineContext given from TestScope.backgroundScope (Or even TestScope itself).
This means that then runBlocking is waiting for that to run, and the test is hanging waiting for runBlocking
to finish at the same time, not giving me an opportunity to call runCurrent()
or something like that to make the coroutine actually run.
I guess this brings me to the question, should I be going with an UncofinedTestDispatcher here? I am kinda scared of using that usually since I don’t have good control of what’s going on, but I don’t know if that’s the more “appropriate” way to run tests with runTest {}
. I usually default to not using UnconfinedTestDispatcher unless really needed, is this a bad habit?stefano
12/09/2022, 3:55 PMval oas = specs.map {
async {
try {
client.get(it).body<OpenApiSpec>() // Ktor HTTP client
} catch (e: ClientRequestException) {
null
}
}
}.awaitAll()
I'd like to abstract away the exception handling, so I've tried something like this:
suspend inline fun <reified T> HttpResponse.bodyOrNull(): T? = try {
body()
} catch (e: ClientRequestException) {
null
}
val oas = specs.map {
async {
client.get(it).bodyOrNull<OpenApiSpec>()
}
}.awaitAll()
But instead of swallowing the exceptions and returning null as in the example above, I get the exception bubbling up. I suspect I'm not yet fully understanding the exception handling model for coroutines, could anybody help shed some light? Thanks in advance.
(Btw, I obtain the same result if I wrap the try/catch with coroutineScope
)janvladimirmostert
12/10/2022, 11:50 PMPossibly blocking call in non-blocking context could lead to thread starvation
Dispatchers.IO somehow manages to make such warnings go away, I want to make those warnings go away for my custom dispatcher too
launch(Dispatchers.CUSTOM) {
println(Thread.currentThread())
Thread.sleep(5_000) <<-- warning: Possibly blocking call ...
delay(5_000)
println("done!!!")
}
Osmium
12/11/2022, 3:48 PMspierce7
12/11/2022, 5:04 PMHakon Grotte
12/12/2022, 11:07 AMFlow
? I am trying to do with turbine and `TestCoroutineScheduler.currentTime`:
runTest {
val myFlow: Flow<Int> = flow<Int> {
(1..5).forEach {
delay(100)
emit(it)
}
}
myFlow.test {
repeat(5) {
awaitItem()
println(testScheduler.currentTime)
}
awaitComplete()
}
}
The described approach does not work: For each run my test prints different/random values, e.g. "300,500,500,500,500 " etc.
I have tried using the myFlow.flowOn()
operator with both test dispatchers available.ursus
12/12/2022, 7:00 PMinterface SpeedApi {
@Streaming
@GET
suspend fun download(@Url url: String): Response<ResponseBody>
}
val response = speedApi.download("...")
response.body.use {
it.byteStream().copyTo(...)
}
works, but the stream copying makes that blocking, and therefore not cancellableursus
12/12/2022, 7:01 PMbody.use {
it.byteStream().use { iss ->
var bytesCopied: Long = 0
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
var bytes = iss.read(buffer)
while (bytes >= 0 && coroutineContext.isActive) { <-------------
...
bytesCopied += bytes
bytes = iss.read(buffer)
}
bytesCopied
}
}
I do get that cancellation is cooperative etc etc — just looking for a idiomatic-retrofit wayjw
12/12/2022, 7:14 PMRon Aharoni
12/13/2022, 8:53 AMclass KafkaConsumer {
private val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
fun onMessage(message: KafkaMessage<K, V>?) {
print("before launch") // this always shows
scope.launch() {
print("in coroutine") // this stops showing sometimes
process() // Async operation that might take a second or two but also with some small runBlocking inside
}
}
}
Inside of process
we make use again of the IO dispatcher to make an HTTP call.
Are there debugging tools / techniques we could use?
Would using a separate dispatcher inside process
help with isolating the problem?
Thanksfranztesca
12/13/2022, 10:03 AMsynchronized
or a Mutex
. Both work. Which one should I use? On one side, synchronized
can block the thread of a coroutine but it's much faster, on the other side Mutex
never blocks the thread but it's slower. Is there a preferred option, in general? ThanksGordon
12/14/2022, 9:05 AMmaxmello
12/15/2022, 4:11 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) { new { ... } }
I have DB code from Exposed (new
entity creation) that is not suspending and has this parameter: init: T.() -> Unit
, which as you can see does not allow suspension.
If I now really need to call a suspended function, I would usually do so before the new
block and reference the result. Now I thought, what happens if I just call runBlocking
with my suspending code inside new
? Will it “block more” this way, or be the same since the new
block has no suspending capabilities anyway?jeggy
12/16/2022, 10:25 AMVikas Singh
12/16/2022, 10:57 AMExerosis
12/18/2022, 8:10 AMwithContext(CoroutineExceptionHandler { ... }) {
}
I feel like it's a very confusing system as it is now. If I had to explain it to someone else I would say that uncaught exceptions first try to go up to parent job somehow. Otherwise they look for a CoroutineExceptionHandler falling back on system unhandled exception handler. With runBlocking be a special case that ignores CoroutineExceptionHandler despite being "root". Supervisor scope being another special case where all direct children are "root". Finally you can: CoroutineScope(CoroutineExceptionHandler {}) but it will be overwritten if you provide one to launch.
What is the reason it doesn't just use the context's CoroutineExceptionHandler and have scope constructors add a default handler that delegates to system unhandled? (so every context has a CoroutineExceptionHandler and a Job)
Also is withContext(currentCoroutineContext()) the same as coroutineScope at the end of the day?Claude Brisson
12/18/2022, 3:02 PMsuspend fun someExternalSuspendFun(): String
fun returnResult(): String {
return runBlocking {
async {
return@async someExternalSuspendFun()
}.await()
}
}
Is there a simpler way to do this?Claude Brisson
12/18/2022, 3:02 PMsuspend fun someExternalSuspendFun(): String
fun returnResult(): String {
return runBlocking {
async {
return@async someExternalSuspendFun()
}.await()
}
}
Is there a simpler way to do this?efemoney
12/18/2022, 4:07 PMkevin.cianfarini
12/18/2022, 5:22 PMRuns a new coroutine and blocks the current thread until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
withContext(singleThreadedDispatcher)
Claude Brisson
12/18/2022, 6:20 PMkevin.cianfarini
12/18/2022, 6:48 PMval mySingleThreadedDispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
...
withContext(mySingleThreadedDispatcher) { someTemplateStuff() }
class SuspendingTemplateThing(private val templateThing: TemplateThing) {
private val singleThreadContext = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
suspend fun doAThing() = withContext(singleThreadContext) { templateThing.doAThing() }
}
This ensures your suspending functions are main safe.runBlocking
from a suspend function because iirc it can cause deadlock. If the docs mention you shouldn't do it, I doubt your use case is safe from the reasons they warn against using it.
Furthermore, runBlocking
will block the current thread. If you happen to be calling the templating thing from multiple different coroutines all running on different threads (such as with <http://Dispatchers.IO|Dispatchers.IO>
) then each runBlocking call to the templating thing would happen on more than one thread.Nick Allen
12/18/2022, 11:37 PMrunBlocking
is safe or not. The warning against using it may be accurate or this could be exactly the scenario that runBlocking
was designed for. It really depends on what code is calling returnResult
, the thread that it's runnng on.
If returnResult
is a callback from a Java class/library that calls its callbacks from a dedicated thread for callbacks and expects callbacks to block, then you are using runBlocking
as intended.
There is a gotcha even in the intended use-case that if it's possible for other code to post to that thread then a dispatcher could exist that posts to that thread and then you could potentially deadlock (runBlocking
calls suspend method that uses said dispatcher that queues coroutine to thread blocked by runBlocking
).kevin.cianfarini
12/19/2022, 1:10 AMreturnResult
above is run within a coroutine, which I believe it is since this is all happening in the context of a Ktor request, then runBlocking is not safe to use I don't believe.Nick Allen
12/19/2022, 7:32 AM// suspending wrapper of non-suspend API
suspend fun SingleThreadedClass.printStuff(getMessage: suspend () -> String) {
suspendCoroutine { cont ->
printStuff(
//Using runBlocking here:
getMessage = { runBlocking { getMessage() }},
onDone = { cont.resume(Unit) }
)
}
}
class SingleThreadedClass() {
private val executor = ThreadPoolExecutor(0, 1, 0, TimeUnit.MILLISECONDS, LinkedBlockingQueue())
/**
* Executes block on private background thread to determine what to print.
*/
fun printStuff(getMessage: () -> String, onDone:() -> Unit) {
executor.execute {
println(getMessage())
onDone()
}
}
}
While runBlocking
is being used from code that is part of a coroutine, it is not being called from a thread that is part of dispatching coroutines. When the docs say "This function should not be used from a coroutine" it means that it should not be called if a suspend function is on the call-stack. When execution moves to the private executor thread, there's no coroutine on the callstack . This sort of situation where a callback is called from a private thread(pool) is exactly what runBlocking
is intended for.
I could see this sort of situation being the case depending on the details of the "Java single threaded templating rendering" but it depends.kevin.cianfarini
12/19/2022, 4:05 PMJava single-threaded template renderingto mean that the templating engine isn't thread safe and must be confined to a single thread. In your above example if the templating engine is a worker of sorts then yes, I believe you're correct, runBlocking would be fine. Regardless it seems like it might be easier to decouple these two concerns. Separately fetch the DB data and pass that data to the templating engine and you wouldn't need to worry about concurrency semantics here.