sjthn
06/09/2019, 5:09 PMfun onCreate() {
val user = profile.getUser()
if (!user.contains("token")) {
coroutineScope.launch(coroutineExceptionHandler) {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
try {
val token = network.fetchToken()
// ...
} finally {
withContext(Dispatchers.Main) {
view.hideProgressBar()
}
}
}
}
} else {
// ...
}
}
I wrote a test function to verify if network.fetchToken()
is called when condition is met. I wrapped it in runBlocking
but test fails. Any ideas on how to test this?cypher121
06/10/2019, 5:25 AMMatej Drobnič
06/10/2019, 8:11 AMrunBlockingTest
(so it will automatically advance non-time tasks, but not time-based tasks)?Andrew Gazelka
06/10/2019, 9:18 PMCLOVIS
06/11/2019, 3:56 AMbrett.wooldridge
06/11/2019, 6:16 AMbrett.wooldridge
06/11/2019, 6:16 AMbrett.wooldridge
06/11/2019, 6:16 AMMatej Drobnič
06/11/2019, 8:00 AMmaxmello
06/11/2019, 9:44 AMfun a() {
Log.i("Log info")
launch {
myAtomicLong.set(SystemClock.elapsedRealtime())
Log.i("Log info in launch")
}
}
The second Log.i never gets executed. It’s inside a class implementing coroutineScope on Dispatchers.IO. The AtomicLong is used in multiple of those launch blocks.mbonnin
06/11/2019, 12:38 PMGlobalScope.launch {
val deferred = CompletableDeferred<Unit>()
Thread {
val result = doSomething()
if (result == SUCCESS) {
deferred.complete(Unit)
} else {
deferred.completeExceptionally(Exception("oopps"))
}
}.start()
deferred.await()
}
This is working well in the nominal case but in the error case, nothing in the backstack indicates where the await() is done. Is there any chance I could have that info somehow to help debug a crash ?Marko Mitic
06/11/2019, 2:59 PMMarc Knaup
06/11/2019, 3:11 PMJob.cancel()
thread-safe?Reisishot
06/11/2019, 3:54 PMkeys.forEach { priority ->
get(priority)?.forEachLimitedParallel(callable)
?: throw IllegalStateException("No list found for priority $priority")
}
asad.awadia
06/12/2019, 11:53 AMbloder
06/12/2019, 12:12 PMConflatedBroadcastChannel
, this is what I want:
val conflated = ConflatedBroadcastChannel<Int>()
val stringConflated: ConflatedBroadcastChannel<String>() = conflated.toStringConflated()
launch {
stringConflated.consumeEach { println(it) }
}
launch {
conflated.send(2) // will print "2" (as string)
}
I've tried some approachs to toStringConflated
such as:
suspend fun ConflatedBroadcastChannel<Int>.toStringConflated(): ConflatedBroadcastChannel<String> =
ConflatedBroadcastChannel<String>().apply {
this@toConflatedString.consumeEach { // map value here }
}
But when I apply a consumerEach
to my ConflatedBroadcastChannel<Int>
my ConflatedBroadcastChannel<String>()
stops to consume values, is there a way to do what I'm trying?kevin.cianfarini
06/12/2019, 1:42 PMzak.taccardi
06/12/2019, 2:38 PMactor
+ a ConflatedBroadcastChannel
to safely update state and observe changes to that state. Does this pattern change at all with the introduction of Flow
?
class CountRepository: CoroutineScope by CoroutineScope(Dispatchers.Default) {
private val stateChannel = ConflatedBroadcastChannel<Int>(0)
private val actor = actor<Int>(capacity = Channel.UNLIMITED) {
var count = stateChannel.value
for (message in channel) {
count += message
stateChannel.send(count)
}
}
fun states(): ReceiveChannel<Int> = stateChannel.openSubscription()
fun increment() {
actor.offer(1)
}
fun decrement() {
actor.offer(-1)
}
}
Paul N
06/12/2019, 7:15 PMJoan Colmenero
06/12/2019, 8:54 PMbloder
06/12/2019, 9:43 PMclass ConflatedTests: CoroutineScope by CoroutineScope(Dispatchers.Unconfined) {
private val channel = ConflatedBroadcastChannel<Int>()
@ExperimentalCoroutinesApi
fun main() = launch {
launch {
channel.consumeEach {
println(it)
}
}
dispatch(channel, 1)
dispatch(channel, 2)
dispatch(channel, 3)
dispatch(channel, 4)
}
@ExperimentalCoroutinesApi
private fun <T> CoroutineScope.dispatch(channel: ConflatedBroadcastChannel<T>, action: T) = launch {
channel.send(action)
}
}
the problem here is that is just consuming the first and last emission printing 1
and 4
, probably the other emissions are being suspended for any reason... Could anyone help me with that? What am I missing here?voben
06/13/2019, 12:52 AMcoroutineScope
function is doing. How come we can’t hit myDeferred2
line without waiting 7secs. Shouldn’t myDeferred2 get scheduled as soon as myDeferred1
is suspended?
viewModelScope.launch {
val myDeferred1 = coroutineScope {
delay(7000)
}
val myDeferred2 = coroutineScope {
delay(2000)
}
}
mzgreen
06/13/2019, 5:43 AMasync
block and inside of this method I use synchronized
block? Will it work or I have to use Mutex
as stated here https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html#mutual-exclusion ?Jonathan Mew
06/13/2019, 7:40 AMHarun
06/14/2019, 5:18 AMlouiscad
06/14/2019, 7:45 AMbroadcastIn
for Flow
is not @ExperimentalCoroutinesApi
but still @FlowPreview
in 1.3.0-M1?Serge
06/14/2019, 7:11 PMfun main() {
val list = listOf("1", "2", "3", "4", "5") //listOf("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15" )
val threadPool = Executors.newFixedThreadPool(3)!!
//val threadPoolAsCouritineDispatcher = threadPool.asCoroutineDispatcher()
fun <A> Iterable<A>.parallelIterWithExecutorAttempt(
exec: ExecutorService,
block: (A) -> Unit) {
this.asIterable().map { exec.submit { block(it) } }.forEach { it.get() }
}
suspend fun work(item: String) =
withContext(Dispatchers.Default) {
//<https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-cancel-07.kt>
//<https://medium.com/@elizarov/blocking-threads-suspending-coroutines-d33e11bf4761>
val result = withTimeoutOrNull(timeMillis = 130L) {
repeat(10) { i ->
println("ListItem=$item -> I'm sleeping on iteration -> $i ...")
delay(50L)
//Thread.sleep(50L) //to mimic non-couroutine external I/O operaion
}
println("process=$item ->Done") // will get cancelled before it produces this result
}
}
println("parallelIterWithExecutor")
runBlocking {
list.parallelIterWithExecutorAttempt(threadPool) { item ->
launch {
work(item)
} //timeout...
}
}
println("end of life for process")
}
nimtiazm
06/15/2019, 1:23 AMawait()
extension for CompletableFuture and been able to use Mono
results. What do I do for Flux
results? How can the results be collected in a list asynchronously on a Publisher interface?Alexjok
06/15/2019, 10:33 AMMrPowerGamerBR
06/16/2019, 8:51 PMdelay(...)
stops working, no matter what dispatcher I use (Dispatchers.Default
, <http://Dispatchers.IO|Dispatchers.IO>
, custom dispatcher from a custom executor, etc). It just hangs on the delay
call and never continues.
The dispatcher/thread pool isn't overloaded, because the application is still alive and processing every request without any delay... except when there is a delay
call somewhere, then the code hangs)
Restarting the JVM magically fixes the issue, until after a few days it stops working again. Using JVM 12, Kotlin 1.3.21 + Coroutines 1.2.1 (Yes, I know, it is outdated, I didn't update yet because I couldn't get Kotlin's script engine working on 1.3.31... if reeeally needed I can update, but I decided to ask here before I do it to figure out if anyone knows what could cause this issue (or if it was already fixed))MrPowerGamerBR
06/16/2019, 8:51 PMdelay(...)
stops working, no matter what dispatcher I use (Dispatchers.Default
, <http://Dispatchers.IO|Dispatchers.IO>
, custom dispatcher from a custom executor, etc). It just hangs on the delay
call and never continues.
The dispatcher/thread pool isn't overloaded, because the application is still alive and processing every request without any delay... except when there is a delay
call somewhere, then the code hangs)
Restarting the JVM magically fixes the issue, until after a few days it stops working again. Using JVM 12, Kotlin 1.3.21 + Coroutines 1.2.1 (Yes, I know, it is outdated, I didn't update yet because I couldn't get Kotlin's script engine working on 1.3.31... if reeeally needed I can update, but I decided to ask here before I do it to figure out if anyone knows what could cause this issue (or if it was already fixed))delay(...)
, no matter what dispatcher I use.
https://cdn.discordapp.com/attachments/393332226881880074/589923294916902961/SayRQ1OGN5.gif▾
https://cdn.discordapp.com/attachments/392476688614948897/589958622000185549/unknown.png▾
https://cdn.discordapp.com/attachments/392476688614948897/589958706410553356/unknown.png▾
delay
seems to have that strange issue. (even suspending functions works fine, except if they call delay
)elizarov
06/16/2019, 11:34 PMdelay
schedule. It is called “Default Executor”.MrPowerGamerBR
06/17/2019, 12:52 AMval coroutineExecutor = createThreadPool("Coroutine Executor Thread %d")
val coroutineDispatcher = coroutineExecutor.asCoroutineDispatcher()
fun createThreadPool(name: String): ExecutorService {
return Executors.newCachedThreadPool(ThreadFactoryBuilder().setNameFormat(name).build())
}
elizarov
06/17/2019, 12:54 AMdelay
in the way you describe.MrPowerGamerBR
06/17/2019, 12:55 AMsuspend
methods, etc), but not delayelizarov
06/17/2019, 12:58 AMdelay
its thread is automatically shutsdown, so next time you need to delay it needs to create thread……. But at this thread dump I see the thread up & runningMrPowerGamerBR
06/17/2019, 1:03 AMelizarov
06/17/2019, 1:03 AMjmap
?jmap -dump:live,format=b,file=heap.bin <pid>
MrPowerGamerBR
06/17/2019, 1:04 AMelizarov
06/17/2019, 1:06 AMMrPowerGamerBR
06/17/2019, 1:07 AMelizarov
06/17/2019, 1:08 AMMrPowerGamerBR
06/17/2019, 1:11 AMelizarov
06/17/2019, 1:12 AMdelay
in your code?MrPowerGamerBR
06/17/2019, 1:17 AMdelay
)elizarov
06/17/2019, 1:17 AMdelay(Long.MAX_VALUE)
to wait forever?MrPowerGamerBR
06/17/2019, 1:24 AMnow - 100 years in milliseconds
)elizarov
06/17/2019, 1:27 AMMrPowerGamerBR
06/17/2019, 1:31 AMelizarov
06/17/2019, 1:32 AMMrPowerGamerBR
06/17/2019, 1:33 AMelizarov
06/17/2019, 1:34 AMSystem.nanoTime()
(and jumps back and forth and something like that), but that kind of broken nano-time would wreak a lot of havoc elsewhere. I doubt a running system can have a problem with it.MrPowerGamerBR
06/17/2019, 1:34 AMelizarov
06/17/2019, 1:38 AMMrPowerGamerBR
06/17/2019, 1:42 AMelizarov
06/17/2019, 1:43 AMdelay
after all, but those threads were not available (missing) at the time?MrPowerGamerBR
06/17/2019, 1:48 AMExecutors.newCachedThreadPool(ThreadFactoryBuilder().setNameFormat(name).build())
Only Kotlin Coroutines uses that executor, so it can't be something else.
Also, wouldn't GC remove cached thread (that are unused) after 60s, causing them to create new threads (which will have a new ID assigined to them)?
And if it was the dispatcher, wouldn't using a different dispatcher work? (like Dispatchers.Default
or <http://Dispatchers.IO|Dispatchers.IO>
)elizarov
06/17/2019, 3:36 AMGlobalScope.launch(Dispatchers.Unconfined) {
println("(0) at ${Instant.now()}")
delay(1)
println("(1) at ${Instant.now()}")
}
Do you have console output of that process redirected somewhere?MrPowerGamerBR
06/17/2019, 10:03 AM(0)
line but never displays the (1)
elizarov
06/17/2019, 3:25 PMdelay
uses.MrPowerGamerBR
06/17/2019, 3:40 PM-Dkotlinx.coroutines.debug
to the JVM arguments
6. Added a task that keeps creating coroutines, waits 60s, then prints how much time it took and logs it.
7. Added a way to dump currently running coroutines (with the DebugProbe)
8. Enabled remote debugging
I know some of the changes doesn't have any relation to the issue, but I was like "let's try changing everything and see what sticks"elizarov
06/17/2019, 11:04 PMDefaultExecutor
object instance.MrPowerGamerBR
06/18/2019, 12:34 AMelizarov
06/19/2019, 9:01 PMMrPowerGamerBR
06/19/2019, 11:31 PMelizarov
06/20/2019, 3:42 AMdelay()
? In theory it should not be a problem, since delay
queue uses O(log n) algorithm.MrPowerGamerBR
06/20/2019, 1:26 PMDefaultExecutor
, while most of them I was able to, getting the _queue
and _delayed
values from the EventLoopImplBase
(which I guess are the ones that you will need) is kinda difficult, they don't show up on the declaredMembers
list, only on the declaredFields
list, which then returns a AtomicReferenceFieldUpdater
on the _queue$FU
field... but what are the variable's types? In Kotlin it is a atomic<Any?>(null)
with AtomicFu, but what would that be in Java itself?elizarov
06/20/2019, 1:29 PMObject
in Java. Atomics are erased during compilation.MrPowerGamerBR
06/20/2019, 5:08 PMAtomicReferenceFieldUpdater
has two types? <*, *>
... well, actually I don't need to know the two types because you can just cast it with <*, *>
, however I wanted to know what value should be used on the _queue.get(SomeObjectHere)
. I tried looking at the Kotlin Coroutines code but because it uses AtomicFu, it is just a _queue.value
callelizarov
06/20/2019, 7:30 PM_queue.value
is actually stored in _queue
field.MrPowerGamerBR
06/20/2019, 8:54 PM_delayed
and _queue
fields were null, but that's because I never spawned any coroutine task.
Anyway, here's some of the dumped variables from the EventLoopImplBase
EventLoopImplBase:
_queue = kotlinx.coroutines.internal.LockFreeTaskQueueCore@44170e37
Size: 0
_queue$FU = java.util.concurrent.atomic.AtomicReferenceFieldUpdater$AtomicReferenceFieldUpdaterImpl@671fab50
_delayed = kotlinx.coroutines.internal.ThreadSafeHeap@3d30a5ae
Size: 3300
_delayed$FU = java.util.concurrent.atomic.AtomicReferenceFieldUpdater$AtomicReferenceFieldUpdaterImpl@3ad4a630
isCompleted = false
_delayed
ThreadSafeHeap, but maybe that's a normal size if you are running a big application (my test instance returns 9
for that same var)elizarov
06/20/2019, 9:05 PMa
inside ThreadSafeHeap
instance. It contains instance of DelayedTask
. Can you dump all of them? I’d need index
and nanoTime
for each one to see if it is consistentMrPowerGamerBR
06/20/2019, 10:05 PMnanoTime
, there doesn't seem to be anything wrongelizarov
07/01/2019, 4:53 PMSystem.nanoTime()
is not monothonic at your system.delay(Long.MAX_VALUE)
, weren’t you?