zak.taccardi
11/18/2019, 5:27 PMbdawg.io
11/18/2019, 6:00 PM.singleCollector() shouldn’t be in the call stack. It builds a new stream that delegates with the additional stuff. It would have to actually consume the Flow and throw an exception for it to be in the call stack tracezak.taccardi
11/18/2019, 6:01 PMbdawg.io
11/18/2019, 6:04 PMFlow inside of that function, so you can’t return a wrapper Flow.
.singleCollector(), in your example, takes a Flow and returns a Flow, which returns before the flow is collected, so onStart and onCollect never runs in the .singleCollector() call stackbdawg.io
11/18/2019, 6:06 PMflow
.map { error("immediate error") }
.collect { println("never prints") } .map won’t be in the call stack because that function takes a Flow and returns a Flow without ever calling the lambda passed to itzak.taccardi
11/18/2019, 6:42 PMFlow issues very difficult to debugzak.taccardi
11/18/2019, 6:44 PMzak.taccardi
11/18/2019, 6:45 PMwithout ever calling the lambda passed to itCan you elaborate? which lambda are you referring to?
bdawg.io
11/18/2019, 6:50 PMCan you elaborate? which lambda are you referring to?The lambda passed to the
.map function is never called until .collect initiates the call stack
{ error("immediate error") } is the lambda I’m referring tobdawg.io
11/18/2019, 6:51 PM.map function call in the callstack
immediate error
at .emit
at .collect
at .collect
at .mainbdawg.io
11/18/2019, 6:59 PMintermediate operator functions won’t show up in the call stack, by design they are intermediate.
terminal operations are what produce the call stack where the .onStart and .onComplete are invoked, which is why you’re seeing .collect immediately prior to your SingleCollectFlow code’s callstack
java.lang.IllegalStateException: `.singleCollector()` was used - this Flow cannot be subscribed to again while it already has an active subscription. Please unsubscribe the first flow before subscribing a second.
at c.e.e.i.c.SingleCollectFlow$actual$1.invokeSuspend(FlowExtensionsInternal.kt:160)
at c.e.e.i.c.SingleCollectFlow$actual$1.invoke(Unknown Source:10)
at kotlinx.c.f.F$o$$i$unsafeFlow$1.collect(SafeCollector.kt:126)zak.taccardi
11/18/2019, 7:57 PM.combine() operator uses a channel internally, that is the .collect() that gets reported to the stacktrace
https://github.com/Kotlin/kotlinx.coroutines/blob/1.3.2/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt#L97zak.taccardi
11/18/2019, 7:58 PM.collect() to be reportedzak.taccardi
11/18/2019, 8:10 PM.collect() callzak.taccardi
11/18/2019, 8:11 PM.map { } (your example) or the .singleCollect() (my example) to include the stacktrace - I just want the final .collect(..) call to be in the stacktracebdawg.io
11/18/2019, 9:25 PMcollect on the flow its given to produce a new flow.
iirc, combine is eager and starts collecting at least one item immediately into the “fair” channel that you’re seeing. Have you tried commenting out the code after the .combine ... to see if you still get the exception? My suspicion is that it’s not the final .collect(..) that’s hitting your exceptionzak.taccardi
11/19/2019, 4:50 AMzak.taccardi
11/19/2019, 4:51 AM.collect() uses .combine(..) upstream which is where the error occurs