nitrog42
04/20/2020, 2:38 PMflow {
while (true) {
delay(1000)
emit(Unit)
}
}
?Marc Knaup
04/20/2020, 3:53 PMFlow.distinctUntilChangedOrCollected()
or a simple way to implement it?
Basically a value in the Flow
should be ignored if
a) it’s equal to the previous value and
b) the previous value hasn’t been collected downstream yet (due to mapLatest
being slow).
If I only use distinctUntilChanged()
then the second value would obviously be ignored independent of b).gabin
04/20/2020, 5:40 PMBrendan Weinstein
04/21/2020, 1:03 AMflow.colle
is not the extension function that takes a lambda but the collect that takes a flowCollector. Not sure if there is something the library could do to improve that, or if it's more something for intellij (if I'm always selecting the 2nd suggestion, make it the first suggestion from there on?).Rechee Jozil
04/21/2020, 8:31 AMnapperley
04/22/2020, 2:47 AMzak.taccardi
04/22/2020, 8:23 PMval parentScope = CoroutineScope(Dispatchers.Default)
val childScope = parentScope + Job(parent = scope.coroutineConext[Job]!!)
zak.taccardi
04/22/2020, 9:11 PMCoroutineScope
and make sure realize the side effects of breaking structured concurrency. ex: CoroutineScope(Dispatchers.Default)
explicitly breaks structured concurrency.
This has testing implications like passing tests even when exceptions are thrown https://gist.github.com/ZakTaccardi/e77d5983660ce4e8e0fa2f2ef0d582eavoben
04/23/2020, 3:52 PMviewModelScope.launch {
val myResponse = fetchMyData() // okhttp moves this work to a background thread
doSomethingWithMyResponse(myResponse) // Does this need to be done on a background thread using withContext?
}
Rechee Jozil
04/23/2020, 6:02 PMhallvard
04/23/2020, 6:45 PMhallvard
04/23/2020, 7:32 PMDavid Glasser
04/23/2020, 8:58 PMfetcher
), like this:
return someCoroutineScope.future(start = UNDISPATCHED) { fetcher() }
UNDISPATCHED has exactly the semantics I need — the code in fetcher
can safely call the "only before suspending" DataLoader methods as long as it does so before suspending.
This works in the sense that it allows me to write code that works. However, if I make an error and call a DataLoader method after having suspended, my code just hangs (because that's the symptom I'm trying to avoid). I'd prefer that it failed fast. (Well, I'd really prefer that none of this junk was necessary, there's an issue on the library about fixing it... but trying to work around today.)
So my question is: If I'm running code inside the context of a CoroutineScope.future(start = UNDISPATCHED) { ... }
, is there any way to know if I'm still in the undispatched part of the execution? (Happy to track this state in a boolean or something if there's a way to get a hook on dispatch.)taer
04/23/2020, 11:11 PMinstanceScope
)Diogo Ribeiro
04/24/2020, 11:00 AMCoroutineScope(Default).launch {
//do on background
formDataDTO?.ad?.characteristics?.find { it.id == GENERATION }?.let { cha ->
withContext(Main) {
//do on ui thread
characteristicBinder.bind(cha)
}
}
this.cancel("has done his job")
}
Pacane
04/24/2020, 5:30 PMConflatedBroadcastChannel
) and ideally, one subscriber could unsubscribe without disrupting the other subscribers.Pacane
04/24/2020, 5:35 PMConflatedBroadcastChannel
but I need to make sure everyone subscribes at the same time, before any data comes to the channel for all the initial data to be there, for everyone, which is not exactly what I wantDavid Glasser
04/25/2020, 12:27 AMwithLoggingField("key", "value") {
// maybe do other stuff, maybe this next line is actually in some other function called by this block
<http://log.info|log.info>(...)
}
which, if we're not in a suspend function just does an MDC.put and resets it, but if we are in a suspend function uses kotlinx.coroutines.slf4j.MDCContext to make the MDC context thread-local workchristophsturm
04/25/2020, 11:43 AMjackson.readValue(string, class)
as Inappropiate blocking call in a suspend method?Andrew
04/25/2020, 9:20 PMMiguel Vargas
04/26/2020, 5:05 AMAnimesh Sahu
04/27/2020, 8:24 AMJoan Colmenero
04/27/2020, 10:12 AMLuis Munoz
04/28/2020, 7:17 AMprotected suspend fun stdout(message: String, emit: suspend (WfResult) -> Unit) {
emit(WfResult.Stdout(message))
}
protected suspend fun stderr(message: String, emit: suspend (WfResult) -> Unit) {
emit(WfResult.Stderr(message))
}
open fun main() : Flow<Any> = flow {
stdout("someString" , ::emit)
}
How do I make my own flow so I can use it like this (or how do I extend flow to add more methods):
open fun main() : Flow<Any> = myFlow {
stdout("someString")
stderr("someString")
}
Adam Grzybkowski
04/28/2020, 8:46 AMCody Engel
04/28/2020, 8:47 PMCoroutineScope
which I want to run in the background while it's waiting for updates to be sent via a Channel, but once the update is sent, I want the result to be executed on the main thread. In that case, would I want to use Dispatchers.Main.immediate
, something like this CoroutineScope(Dispatchers.Main.immediate + Job())
?
I think I'm getting a little hung up as I'm not sure when I would have a Scope that doesn't use that dispatcher (since I can launch a suspending function with the scope, but telling it to switch to a different dispatcher, say IO
).taer
04/28/2020, 11:14 PMBroadcastChannel<T>.asFlow()
The docs state: `
If the flow consumer fails with an exception, subscription is cancelled.
I have a
broadcastChannel.asFlow().onCompletion{ printSomething } .map { codeThatThrowsException }.catch { print MessageAboutError } . collect { collector}
The onCompletion fires, then the catch. Afterwards, the collect never sees a new message.. I just added the onComplete to validate. I expected the .catch
to handle the error so the flow doesn't get canceled. I can easily fix the codeThatThrowsException
but want to catch unexpected errors in the future. It looks like the downstream catch doesn't get a chance to handle the error prior to the upstream channel getting unsubscribed fromsnowe
04/29/2020, 4:40 AMprivate suspend fun DeepRecursiveScope<Any?, Unit>.ppAny(obj: Any?) {
DeepRecursiveFunction<Any?, Unit> { obj ->
ppObj(obj)
}
}
private suspend fun DeepRecursiveScope<Any?, Unit>.ppObj(obj: Any) {
ppAny(obj)
}
I need to declare both as suspend and both as extension functions in order for them to be callable from within the DeepRecursiveScope. But ppAny
is not callable from within ppObj
, with the error
[NON_LOCAL_SUSPENSION_POINT] Suspension functions can be called only within coroutine body
Now, I understand the issue (I think), in that I need to be calling from within a context with, with(this@DeepRecursiveScopeImpl)
, but I do not have access to DeepRecursiveScopeImpl
. Can I use another scope for this? If not, how can I get access to this scope to call my method with? Am I doing stuff completely wrong here???Marc Knaup
04/29/2020, 7:19 AMMutex
deadlocks caused by recursion?igor.wojda
04/29/2020, 8:18 AMsuspend fun sleepSort(list: List<Int>): List<Int> {
val listInt = listOf(1, 2, 3)
val listDeferred = list.map {
GlobalScope.async {
println("start $it")
}
}
listDeferred.first().await()
return listOf()
}
The above code gives me this result
start 1
start 2
start 3
I wonder why all deferred jobs are launched instead of jus the first one 🤔igor.wojda
04/29/2020, 8:18 AMsuspend fun sleepSort(list: List<Int>): List<Int> {
val listInt = listOf(1, 2, 3)
val listDeferred = list.map {
GlobalScope.async {
println("start $it")
}
}
listDeferred.first().await()
return listOf()
}
The above code gives me this result
start 1
start 2
start 3
I wonder why all deferred jobs are launched instead of jus the first one 🤔octylFractal
04/29/2020, 8:19 AMlist.map
is not lazy -- it returns a List itself.asSequence()
firstigor.wojda
04/29/2020, 8:20 AMaraqnid
04/29/2020, 9:21 AM