I’m curious if this example is my fault, or if it’...
# coroutines
z
I’m curious if this example is my fault, or if it’s a coroutines issue - but I want to get an easy to debug stacktrace https://github.com/Kotlin/kotlinx.coroutines/issues/1661
b
.singleCollector()
shouldn’t be in the call stack. It builds a new stream that delegates with the additional stuff. It would have to actually consume the Flow and throw an exception for it to be in the call stack trace
z
is there something that can be done to make it a part of the call stack?
b
You’d have to collect the
Flow
inside of that function, so you can’t return a wrapper
Flow
.
.singleCollector()
, in your example, takes a
Flow
and returns a
Flow
, which returns before the flow is collected, so
onStart
and
onCollect
never runs in the
.singleCollector()
call stack
take this example,
Copy code
flow
    .map { error("immediate error") }
    .collect { println("never prints") }
.map
won’t be in the call stack because that function takes a
Flow
and returns a
Flow
without ever calling the lambda passed to it
z
this is going to make any type of
Flow
issues very difficult to debug
my concern is that it will be very difficult to trace these exceptions if this crash happens in prod :(
without ever calling the lambda passed to it
Can you elaborate? which lambda are you referring to?
b
Can you elaborate? which lambda are you referring to?
The lambda passed to the
.map
function is never called until
.collect
initiates the call stack
{ error("immediate error") }
is the lambda I’m referring to
See https://pl.kotl.in/hgKKzsel_ There is no
.map
function call in the callstack
Copy code
immediate error
    at .emit
    at .collect
    at .collect
    at .main
intermediate
operator functions won’t show up in the call stack, by design they are
intermediate
.
terminal
operations are what produce the call stack where the
.onStart
and
.onComplete
are invoked, which is why you’re seeing
.collect
immediately prior to your
SingleCollectFlow
code’s callstack
Copy code
java.lang.IllegalStateException: `.singleCollector()` was used - this Flow cannot be subscribed to again while it already has an active subscription. Please unsubscribe the first flow before subscribing a second.
        at c.e.e.i.c.SingleCollectFlow$actual$1.invokeSuspend(FlowExtensionsInternal.kt:160)
        at c.e.e.i.c.SingleCollectFlow$actual$1.invoke(Unknown Source:10)
        at kotlinx.c.f.F$o$$i$unsafeFlow$1.collect(SafeCollector.kt:126)
z
digging further into the stacktrace, it looks like because the
.combine()
operator uses a channel internally, that is the
.collect()
that gets reported to the stacktrace https://github.com/Kotlin/kotlinx.coroutines/blob/1.3.2/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt#L97
where I would want the downstream
.collect()
to be reported
It sounds like the issue is that when you have a flow that internally collect other flows, you lose the downstream
.collect()
call
I don’t actually need the
.map { }
(your example) or the
.singleCollect()
(my example) to include the stacktrace - I just want the final
.collect(..)
call to be in the stacktrace
b
Most operators
collect
on the flow its given to produce a new flow. iirc,
combine
is eager and starts collecting at least one item immediately into the “fair” channel that you’re seeing. Have you tried commenting out the code after the
.combine ...
to see if you still get the exception? My suspicion is that it’s not the final
.collect(..)
that’s hitting your exception
z
I was able to identify what call throws the error through trial and error, but my concern is a future scenario if it ever happened elsewhere
my final
.collect()
uses
.combine(..)
upstream which is where the error occurs