zeugederunity
06/30/2021, 1:07 PMDaniele Segato
06/30/2021, 10:59 PMMutableSharedFlow<Something?>(
replay = 1,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
vs
MutableStateFlow<Something?>(null)
napperley
07/01/2021, 2:32 AMkotlin.IllegalStateException: There is no event loop. Use runBlocking { ... } to start one.
at kfun:kotlinx.coroutines.takeEventLoop#internal (0x312759)
at kfun:kotlinx.coroutines.DefaultExecutor#dispatch(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){} (0x312570)
at kfun:kotlinx.coroutines.internal#resumeCancellableWith@kotlin.coroutines.Continuation<0:0>(kotlin.Result<0:0>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){0§<kotlin.Any?>} (0x30d572)
at kfun:kotlinx.coroutines.intrinsics#startCoroutineCancellable$default@kotlin.coroutines.SuspendFunction1<0:0,0:1>(0:0;kotlin.coroutines.Continuation<0:1>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?;kotlin.Int){0§<kotlin.Any?>;1§<kotlin.Any?>} (0x3118a4)
at kfun:kotlinx.coroutines.AbstractCoroutine#start(kotlinx.coroutines.CoroutineStart;0:0;kotlin.coroutines.SuspendFunction1<0:0,1:0>){0§<kotlin.Any?>} (0x2f8f69)
at kfun:kotlinx.coroutines#launch$default@kotlinx.coroutines.CoroutineScope(kotlin.coroutines.CoroutineContext?;kotlinx.coroutines.CoroutineStart?;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,kotlin.Unit>;kotlin.Int){}kotlinx.coroutines.Job (0x2f9d37)
at _636f6d2e636861706d616e3a626f6c742d646174612d636f6e73756d6572_kncfun86 (0x333419)
at MQTTClient_run (0x34a3c3)
at (0x7f7e4de366db)
at clone (0x7f7e4d947a3f)
at ((nil))
I want to run IO bound tasks (network orientated) that run on a separate thread. The main thread with the linuxX64 target is mainly used for tasks that need to block (eg printing messages to the console). Linux unlike JS (the JavaScript platform) for example allows for multiple threads to run at once.Sourabh Rawat
07/02/2021, 2:52 AMclass Executor {
// private val scope = CoroutineScope(Job() + Executors.newFixedThreadPool(10).asCoroutineDispatcher())
private val context = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
suspend fun submit(value: Any) {
withContext(context) {
logger.debug { "Successfully received: $value" }
}
}
fun stop() {
// TODO
}
}
How can I cancel the submitted jobs? And fail submission of a new job if the context is "cancelled"William Reed
07/02/2021, 12:05 PMflow {
emit(suspendFunction())
}
I know I could use flowOf(…)
but the call site is not in a suspend function alreadyFoRRestDp
07/03/2021, 4:56 PMReadWriteLock
in a library anywhere?Erik
07/05/2021, 8:12 AMCoroutineContext
from Thread.currentThread()
? I don't control the current thread instance, but I want to specifically switch to it using e.g. withContext(/*..*/) {}
.Noushad Chullian
07/06/2021, 6:41 AMoverride suspend fun getProducts()=flow<State<List<ProductListFromEntity>>> {
try {
val it = productDao.getProducts()
if (it.isEmpty()) {
loadProducts()
} else {
emit(State.success(it))
}
}catch(e:Exception){
emit(State.error(e.localizedMessage.toString()))
e.printStackTrace()
}
}
//Dao function
@Query("SELECT product.productId, product.productCode, product.name FROM Product product")
abstract suspend fun getProducts(): List<ProductListFromEntity>
//viewModel
suspend fun getProducts() {
productRepository.getProducts().collectLatest {
when (it) {
is State.Error -> {
_productListStates.value =
ProductListStates.Error(it.message, UIComponentType.Dialog())
}
is State.Success -> {
parseGivenProductList(it.data)
}
}
}
}
i am calling this function from viewmodel (android), But it is always throwing error saying, Child of the scoped flow was cancelled
without any stacktrace. Am i doing anything wrong here...ermac10k
07/06/2021, 1:04 PMtask.join()
and this join never returns than i do have a memory leak, don’t i?Slackbot
07/06/2021, 3:39 PMxxfast
07/06/2021, 11:53 PMFlow<T>.flowOn(dispatcher)
equivalent that that affects downstream? so that i can do
results
.map { result -> format(result) }
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collectOn(Dispatchers.Main)
.collect { presenter.display(it) }
instead of
withContext(Dispatchers.Main) {
results
.map { result -> format(result) }
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collect {
presenter.display(it)
}
}
Gus
07/07/2021, 10:45 AMBroadcastChannel
, but when I upgrade to coroutines 1.4.0 (or 1.5.0), the consumer's lambda function doesn't get called (even though my debugging shows that ackChannel.sendBlocking(...)
is indeed called). Here's how it's consumed:
ackChannel
.asFlow()
.onEach {
<http://logger.info|logger.info>("onEach called with $it")
ackObserver.onNext(it.toCargoDeliveryAck())
}
.onCompletion { ackObserver.onCompleted() }
.collect()
(Full method definition on GitHub)
I've also tried replacing the BroadcastChannel
with MutableSharedFlow<String>(0, extraBufferCapacity = 1)
(and replacing calls to ackChannel.sendBlocking()
with mutableFlow.tryEmit()
), but the exact same issue persists.
Any idea what I'm doing wrong? I created a PR where this issue is easily reproducible because it breaks a unit test.Stefan Oltmann
07/07/2021, 12:03 PMursus
07/07/2021, 4:15 PMfun OkhttpAuthenticator.authenticate() {
...
synchronized(this) {
runBlocking { retrofitService.suspendingRefreshTokens() }<---
...
}
}
Could this get broken somehow? runBlocking seems to swap to caller thread context after the network call, so the mutex should be released
or is it not guaranteed? (Because I see certain bugs in production where the code just stops; and not sure if its runBlocning or retrofit related)jeggy
07/07/2021, 11:02 PMEmission from another coroutine is detected.
zsperske
07/07/2021, 11:37 PMfrankelot
07/08/2021, 2:01 PM.map
on a StateFlow
returns a Flow
?melatonina
07/09/2021, 12:22 PMinterface BindableMutableStateFlow<T> : MutableStateFlow<T> {
val coroutineScope: CoroutineScope
fun bind(other: StateFlow<T>)
fun unbind()
}
and
fun <T> ViewModel.BindableMutableStateFlow(initialValue: T) : BindableMutableStateFlow<T> =
MutableStateFlow<T>(initialValue).let { _stateFlow ->
object : BindableMutableStateFlow<T>, MutableStateFlow<T> by _stateFlow {
override val coroutineScope: CoroutineScope get() = viewModelScope
private var job: Job? = null
override fun bind(other: StateFlow<T>) {
unbind()
job = viewModelScope.launch {
other.collect {
value = it
}
}
}
override fun unbind() {
job?.cancel()
job = null
}
}
}
Am I duplicating or misusing (under-using) any existing Kotlin API?
Do you see any problems with this code?William Reed
07/09/2021, 12:30 PMMutableStateFlow<Map<..., ...>>
what’s the most idiomatic way to mutate the map and update the StateFlow
? seems like I need to make a copy of the current value and then update it?
something like this seems verbose and awkward
stateFlow.value = stateFlow.value.toMutableMap().apply { … modify …}
Vsevolod Tolstopyatov [JB]
07/09/2021, 3:11 PMkotlinx.coroutines
1.5.1 is here!
• Bi-directional cancellation in Play Services integration
• Behavioural improvements in CopyableThrowable
• Fixed annoying but rare crash with java.lang.ClassCastException: kotlin.coroutines.jvm.internal.CompletedContinuation cannot be cast to kotlinx.coroutines.internal.DispatchedContinuation
• New API reference site: https://kotlin.github.io/kotlinx.coroutines/
• Various bug fixes and improvements
Full changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.5.1Lilly
07/09/2021, 5:04 PMchannelFlow
hot? I would like to open a stream but defer the collecting. What I'm currently doing:
// cold
fun readByteArrayStream(): Flow<ByteArray> = channelFlow { .. }
// hot
val bytesChannel: Channel<ByteArray> by lazy { Channel(Channel.UNLIMITED) }
suspend fun openByteArrayStream() {
readByteArrayStream().onEach { bytesChannel.trySend(it) }.collect()
}
Dominaezzz
07/10/2021, 1:48 PMval stream = MutableSharedFlow<Unit>()
val unused = stream.buffer(Channel.RENDEZVOUS).produceIn(GlobalScope)
stream.emit(Unit)
Eugen Martynov
07/12/2021, 9:28 AMrunBlockingTest
And it swallows exception without failing testWilliam Reed
07/12/2021, 2:18 PMsuspend fun
exposed from a library like Retrofit
or Room
or SqlDelight
- is there a need to change the dispatcher to IO
or since they are code generating a suspend fun
is it safe to assume that its ‘main-safe’?Daniele Segato
07/13/2021, 2:40 PMgetDataFlow()
.onStart {
coroutineScope {
launch { repository.refresh() }
} // this wait refresh to be finished
}
the above code in onStart wait the refresh to finish according to coroutineScope
documentation, I need the repository.refresh()
to run in parallel and just be canceled with the flowVikas Singh
07/13/2021, 5:08 PMjulian
07/13/2021, 9:23 PMmarcinmoskala
07/14/2021, 6:47 AMOlli Helenius
07/14/2021, 7:19 AMval x = (async { foo() }).await()
ever different from just val x = foo()
?Andrew Ebling
07/14/2021, 3:37 PMhwData
. Rather than sleeping/spinning until the data becomes available, is there a coroutines-based solution to this?
embeddedServer(Netty, 8080) {
routing {
get("/my_endpoint") {
triggerDataReadFromHardware()
while(hwData == null) { Thread.sleep(100) }
call.respond("${hwData!!}\n")
hwData = null
}
}
}.start(wait = false)
Andrew Ebling
07/14/2021, 3:37 PMhwData
. Rather than sleeping/spinning until the data becomes available, is there a coroutines-based solution to this?
embeddedServer(Netty, 8080) {
routing {
get("/my_endpoint") {
triggerDataReadFromHardware()
while(hwData == null) { Thread.sleep(100) }
call.respond("${hwData!!}\n")
hwData = null
}
}
}.start(wait = false)
ephemient
07/14/2021, 3:44 PMAndrew Ebling
07/14/2021, 3:50 PMstreetsofboston
07/14/2021, 4:19 PMDeferred<HwData>
... then you can do call.respond("${triggerDataReadFromHardware().await()}\n")
Andrew Ebling
07/14/2021, 4:27 PMephemient
07/14/2021, 4:59 PM.complete()
streetsofboston
07/14/2021, 7:21 PMDeferred<HwData> hwData = async(<http://Dispatchers.IO|Dispatchers.IO>) { hardware.poll(...) }
hardware.trigger(...)
return hwData