Devan Lai
03/15/2022, 10:50 PMimport kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.concurrent.thread
var readCount = 0
fun blockingSerialRead(): String {
val message = "MSG$readCount"
readCount += 1
// Pretend to take some time, blocking.
Thread.sleep(500L)
return message
}
fun main(args: Array<String>) {
runBlocking {
val messages = Channel<String>()
thread(start = true) {
while (true) {
val message = blockingSerialRead()
println("Read message $message")
launch {
messages.send(message)
}
}
}
// Start a coroutine that does work on each received message
launch {
var takeExtraTime = false
messages.consumeEach {
// Do some suspendable work with the message
// For example, maybe we'll send a message back
// and wait for the response.
if (takeExtraTime) {
// Pretend to do some expensive suspendable work
delay(1000L)
takeExtraTime = false
} else {
takeExtraTime = true
}
println("Processed message $it")
}
}
}
}
allan.conda
03/16/2022, 9:23 AMcoroutineScope {
var count = 0
val mutableList = mutableListOf<Int>()
launch {
while(isActive) {
mutableList.add(count++)
delay(1000)
}
}
launch {
while(isActive) {
delay(10000)
mutableList.take(10).also(mutableList::removeAll)
}
}
}
Michal Klimczak
03/16/2022, 10:59 AManImportantBlockingCall
resembles okhttp.Authenticator.uthenticate
- it's a blocking call, which in my case will run a runBlocking coroutine which will synchronise a few things and wait for them. So it really is important that in this test case this remains blocking. 🧵christophsturm
03/17/2022, 10:36 AMwithTimeout
is it possible to get a stacktrace of the thread where the timeout occurred when it occurred?Tower Guidev2
03/17/2022, 10:56 AMBrais Gabin
03/18/2022, 9:49 AMUnconfinedTestDispatcher
. Can I use UnconfinedTestDispatcher
as a StandardTestDispatcher
but that it does the advanceUntilIdle
automatically for me? Am I missing something here? I know that I'll lose some flexibility but in a lot of my tests I don't need that flexibility.hfhbd
03/20/2022, 2:18 PMsharedFlow
builder with the same signature like the cold flow
builder: fun<T> sharedFlow(block: suspend FlowCollector<T>): SharedFlow<T>
? It always requires a CoroutineScope
...Kamila
03/21/2022, 7:13 AMCompletableFuture
of different entities, for example:
getFoo: CompletableFuture<List<FooDto>>
getBar: CompletableFuture<List<BarDto>>
getWhatever: CompletableFuture<List<WhateverDto>>
Would it be possible to use coroutines to trigger each of the call in parallel, and combine final object to be:
Object {
foo: List<FooDto>
bar: List<BarDto>
whatever: List<WhateverDto>
}
It is not so easy and obvious to solve with `CompletableFuture`s in Java, maybe it would be easier with coroutines
? 🤔 The point is I need to fetch multiple different entities before can calculate the final object. I can easily fetch them in sequence, but they are not dependant on each other, so should really be fetched in parallelSergio C.
03/22/2022, 5:29 PMcallbackFlow { }
but the code inside the block never gets called, am I doing something wrong?rkeazor
03/23/2022, 4:22 AMclass SimpleChannelTest {
val simpleChannel = Channel<Int>()
fun simpleFunction() {
runBlocking {
launch {
simpleChannel.consumeEach { println(it) }
}
var x = 0
repeat(10) {
simpleChannel.send(x++)
println("sent Message")
}
simpleChannel.close()
}
}
I would expect each time send is called , the main coroutine will suspend until the data is received, however I end up with this as the result
0
sent Message
sent Message
1
2
sent Message
sent Message
3
4
sent Message
sent Message
5
6
sent Message
sent Message
7
just wondering why this is?Sam Stone
03/23/2022, 1:29 PMbnn
03/24/2022, 2:03 AMdelay
function call which is used inside the Library implementation never complete. As a result, Future.get() blocks forever.
I know it’s not recommended to call Future.get() is not recommended practice. But, I would like to avoid this issue because it’s too disastrous given library clients has no idea if the delay
suspend function is used under the hood.
The following is the complete example code that reproduces the issue.
https://gist.github.com/bananaumai/da23494ed2ba1d22c39af23a7c07ba2f#file-mainactivity-kt
Any advise and/or recommendation would be appreciated.
Thanks in advance.Paul Woitaschek
03/24/2022, 6:36 PM@Test
fun test() = runTest {
repeat(5) {
launch {
println("enter")
Thread.sleep(Random.nextLong(10))
println("exit")
}
}
println("start yield")
yield()
}
Will print:
start yield
enter
exit
enter
exit
...
Is there any way to use runTest and having parallelism?Alexander Maryanovsky
03/24/2022, 7:22 PMlifecycleScope.launch{
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED){
flow.collect{
...
}
}
}
I would like the collection, and the entire coroutine to stop when the flow completes. But repeatOnLifecycle
never returns, even after collect
returns, and if the activity is stopped and restarted, collection is launched again.
It seems I need runOnceOnLifecycle
.Colton Idle
03/25/2022, 9:15 PMFlowable
but that smells like hungarian notiation... but I do remember a book/blog post (effective java?) that said it should be clear when a method is doing stuff with threads/async.
Thoughts?
interface MyRepository {
suspend fun getLatestBooks(): List<Book>
fun getLatestBooksWithLiveUpdates(): Flow<List<Book>>
suspend fun getBookDetail(): BookDetail
fun getBookDetailWithLivesUpdates(): Flow<BookDetail>
}
Thomas
03/26/2022, 1:08 AMlimitedParallelism
on Kotlin/Native? When I try to use it I just get a ClassCastException
. More details here: https://github.com/Kotlin/kotlinx.coroutines/issues/3223janvladimirmostert
03/26/2022, 4:59 PMBrendan Campbell-hartzell
03/28/2022, 2:10 AMrunTest
doesn't seem to wait for a collector running in another scope to fully process the emitted value before returning. Keeping everything in the runTest scope and context just hangs the test because Flow.collect on a sharedflow never returns.ursus
03/28/2022, 10:43 PMtimer
type flow? it’s not really idiomatic, is it? or maybe akin to backpressure, i.e. downstream telling upstream to stop?Exerosis
03/30/2022, 1:14 AMsuspend fun debug() {
println("${coroutineContext[STreeEvent]}")
}
fun main() = runBlocking {
val test = SSubject(0)
SComponent().apply {
println("Component: $this")
println("Test: $test")
test(this, BEFORE) {
println("${coroutineContext[STreeEvent]}")
debug()
}
...
Can someone help me understand why :
println("${coroutineContext[STreeEvent]}")
debug()
prints:
null
1212899836
Why on earth would the context change between these two places?Pablo
03/30/2022, 11:24 PMViewModel
I have injected a CoroutineDispatcher
something like
class FooViewModel @Inject constructor(private val dispatcher: CoroutineDispatcher){}
Then when I'm trying to do the test I'm seeing that the test and the code in the ViewModel
are working on a different thread. I know it because I've added some logs
println("[${Thread.currentThread().name}] viewModel")
println("[${Thread.currentThread().name}] test")
And in the console I see :
[Test worker] test
[Test worker @coroutine#3] viewModelIn the test I'm doing it as :
private val testDispatcher = UnconfinedTestDispatcher()
private val testCoroutineScope = TestScope(testDispatcher)
@Test
fun test() = testCoroutineScop.runTest {
val viewModel = FooViewModel(testDispatcher)
...
}
What I'm missing?Kamila
03/31/2022, 6:53 AMprivate suspend fun fetchExternalData(futures: List<CompletableFuture<Reply>>): List<Reply> = coroutineScope {
futures.map {
async {
it.join()
}
}.awaitAll()
}
I have a list of CompletableFuture, which I would run async and do action when all of them complete. Is this the right place where I call join
(or get()
) to get reply?
The idea is to get a bunch of request, send them in parallel (fork) and wait for all of them (join), then combine the resultThomas Kranzer
03/31/2022, 10:41 AMTestCoroutineDispatcher
, it fails (TimeoutCancellationException
) with the new coroutines version and the UnconfinedTestDispatcher
.. any ideas how this issue can be resolved?
// ViewModel
val loading = MutableStateFlow(false)
fun onReloadClicked() {
viewModelScope.launch {
loading.value = true
// load data
loading.value = false
}
}
// Test Code
private val testDispatcher = UnconfinedTestDispatcher()
@Before
fun setUp() {
Dispatchers.setMain(testDispatcher)
}
@After
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun testLoading() = runTest(testDispatcher) {
viewModel.loading.test {
assertFalse(awaitItem())
viewModel.onReloadClicked()
assertTrue(awaitItem()) // fails with TimeoutCancellationException
assertFalse(awaitItem())
}
}
Ruben Quadros
03/31/2022, 1:20 PMList<Flow<T>>
what is the best way to keep collecting from each Flow<T>
?CRamsan
03/31/2022, 2:59 PMVladimir
03/31/2022, 8:50 PMmp
04/01/2022, 9:38 AMkotlin.concurrent
@kotlin.internal.InlineOnly
public inline fun <T> Lock.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
lock()
try {
return action()
} finally {
unlock()
}
}
but it says The 'checkRegistry' suspension point is inside a critical section
Newbee with corutines\suspend so - can anyone share document reference that will help me to understand what I’ve missCLOVIS
04/01/2022, 8:37 PMval task = scope.async {
foo()
}
suspend fun bar() {
// All calls require the task to be over,
// however only the first one(s) will really suspend,
// since it will be ready for the others
val foo = task.await()
doSomething(foo)
}
The problem is that this requires a scope
to be available when the task is instantiated. Since I do not care whether the task starts when it is instantiated or when the first call to bar
is made, is there a way to adapt this pattern without requiring the scope
?CRamsan
04/03/2022, 3:27 AMKhan
04/04/2022, 11:51 AMgetDataUseCase().onEach { _ ->
}.launchIn(viewModelScope)
Khan
04/04/2022, 11:51 AMgetDataUseCase().onEach { _ ->
}.launchIn(viewModelScope)
stojan
04/04/2022, 12:00 PMKhan
04/04/2022, 1:11 PM