rrva
01/03/2020, 1:02 PMjimn
01/04/2020, 9:45 AMrrva
01/04/2020, 5:58 PMstringBuilder.append(charBuffer)
an Inappropriate blocking method call when used in a coroutine if I inspect the code in IntelliJ ?CLOVIS
01/04/2020, 9:43 PM<http://Dispatchers.IO|Dispatchers.IO>
or is there something else that is recommended instead? From what I understand, it is the dispatcher created to handle blocking IO stuffmzgreen
01/05/2020, 6:14 PMGlobalScope.async {
while (isActive) {
println("test")
}
}
This works on Android and jvm but doesn’t work on iOS. I don’t get any error, it just doesn’t do anything. Is it possible to make it work somehow? GlobalScope.launch(context = <http://Dispatchers.IO|Dispatchers.IO>)
would be fine too I guess but I can’t see <http://Dispatchers.IO|Dispatchers.IO>
on iOS 🤔diesieben07
01/06/2020, 8:59 PMFlow
and I'd like to introduce backpressure (the API has a request
mechanic, with which I can request more elements from upstream). I'd like to only request more elements from upstream if the channel (created by callbackFlow
) doesn't already have a full buffer (i.e. my consumer is slower than the producer).Dominaezzz
01/06/2020, 11:27 PMsuspendCancellableCoroutine
be enough or should I wrap it in async(<http://Dispatchers.IO|Dispatchers.IO>)
? I'm not completely sure how suspendCancellableCoroutine
works here.rrva
01/07/2020, 6:51 PMbob
01/07/2020, 7:45 PMdiesieben07
01/08/2020, 10:14 AMonCompletion
, but it has no information about if the flow was completely consumed, or if consumption was cancelled using e.g. take
operator.
onCompletion
also has no context information:
flow {
val callback = MyCallbackHandler(this@flow)
api.call(callback)
}.onCompletion { t ->
// Was the stream cancelled? I don't know.
// and how do I access callback here to (maybe) tell it to cancel the request
}
myanmarking
01/08/2020, 10:43 AMMaciek
01/08/2020, 11:58 AMvoben
01/08/2020, 7:43 PMShmuel Rosansky [G]
01/08/2020, 10:32 PMdiesieben07
01/09/2020, 9:36 AMflow
solution (it doesn't work for bidirectional streaming), so I am back to fighting with channels.
I need to explicitly request more elements from upstream when the channel runs empty (i.e. receive
needs to suspend because the buffer is empty).
I thought I could "just implement ReceiveChannel", but this proved difficult, because it's not just a simple interface with suspend fun receive
...
override suspend fun receive(): E {
val element = delegate.poll()
if (element != null) return element
api.request(bufferSize)
return delegate.receive()
}
This is not correct, because other ways to receive (such as onReceive
) would have to all be implemented from scratch... This doesn't sound right...
Is there really no simple way to do this?diesieben07
01/09/2020, 9:43 AMFlow.asObservable
it just dumps the flow into the emitter as fast it can, without regard for the backpressure mechanisms 😞 (https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt#L80-L107). The opposite (convert Observable into ReceiveChannel) does the same: https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt#L65-L96dave08
01/09/2020, 1:03 PMFlow
that sends three states Starting
, Progress
and Finished
(from a sealed class
), into a Flow
that assures receiving Starting
and Finished
but conflates Progress
for backpressure on latency of the collector...?Animesh Sahu
01/10/2020, 4:13 AMMatej Drobnič
01/10/2020, 6:11 AMprivate var actorChannel = Channel(Capacity.UNLIMITED)
private var actorJob : Job? = null
fun performSomeOtherCommand() {
if (actorJob?.isActive != true) {
restartActor()
}
actor.offer(...)
}
suspend fun stopActor() {
actor.offer(StopCommand)
actorJob?.join()
}
private fun restartActor() {
actorJob = GlobalScope.launch {
actorChannel.consumeEach {
...
if (it == StopCommand) {
return@launch
}
}
}
}
But the problem with above example is that it is not thread safe. One thread could call performSomeOtherCommand()
while the actor is still being stopped by stopActor()
. Easiest solution would be to just not stop the actor and let it live forever, but since it's in GlobalScope
, it would never be garbage collected when not needed. I could use mutexes and synchronized blocks but I feel that this kinda ruins the point of having an actor.thana
01/10/2020, 1:34 PMMono
into a Deferred
? I can see we can transofrm it into a Flow
which feels wrong as the Mono
provides at most one valuejimn
01/10/2020, 2:35 PMLuis Munoz
01/10/2020, 7:48 PMSowmya V
01/11/2020, 2:42 AMBig Chungus
01/12/2020, 11:25 AMMutableSet
from different threads with coroutines on native without getting the InvalidMutablityException
?Nikky
01/12/2020, 8:22 PMrunBlocking
in coroutines-core-jsBig Chungus
01/13/2020, 11:40 AMBig Chungus
01/14/2020, 8:29 AMfun runPeriodicallyAsync(intervalMS: Long, action: () -> Unit): () -> Unit {
val job = GlobalScope.launch {
coroutineScope {
while (true) {
println("Executing")
action()
println("Delaying $intervalMS ms")
delay(intervalMS)
}
}
}
println("Returning")
return {
println("Disposing")
job.cancel()
}
}
Flo
01/14/2020, 10:59 AMval clicked = mutableListOf(false, false, false)
val buttonFlow = baseFlow.map { it.clicked = clicked[it.idx] }.onEach { buttonLiveData.postValue(it) }
val job = buttonFlow.launchIn(scope)
val onClickEvent: Channel<List<Boolean>> = Channel()
.consumeEach { job.cancel(); job = buttonFlow.launchIn(scope) }
fun buttonWasClicked(idx: Int) { onClickEvent.offer(/* do our list creation logic */) }
But that seems awfully overkill.dkhusainov
01/14/2020, 4:32 PMFlow
? With callbackFlow
or a channel and a separate coroutine that reads from socket and sends to channel I end up with a Flow
which isn't cancelable because socket.receive
is a blocking call(which supposedly never returns) inside a while loop.jeggy
01/14/2020, 6:27 PMwithContext
function, it waits for that coroutine to finish before continuing. But I want to launch multiple coroutines into another context and then later on, I will call join on that coroutine when I know I'm finished. How would I achieve something like this? I wrote a bare minimum example of what I'm trying to achieve here: https://pl.kotl.in/Ns6Hwa_8u