raulraja
05/31/2019, 9:04 AMFlow
a user may destructure and compute over the Flow with:
val flow = flowOf(1, 2, 3, 4, 5)
fx {
val n = !flow
n + 1 //n is each of the elements in the flow
}
// flowOf(2, 3, 4, 5, 6)
This same idiom can work with Deferred
, Observable
, IO
, and pretty much anything that contains or can implement a flatMap
method.
For this to work with pure values and across all data types we have to reset the continuation stack so the continuation can be invoked multiple times.
We would like to request that some kind of mechanism for library authors to be able to access the intrinsics of the kotlin.coroutines.jvm.internal.ContinuationImpl
.
At the moment resetting the coroutine stack is an exercise of heavy java reflection when this could be avoided if we had protected access to those fields or a means to obtain/clone the continuation state.
https://github.com/arrow-kt/arrow/blob/7d54b31fe240688b4481b8f91767cbb82b9ad78c/modules/core/arrow-typeclasses/src/main/kotlin/arrow/typeclasses/ContinuationUtils.kt#L7
The use of reflection and manual copying the fields like that imposes unnecessary overhead and does not give us a solid solution going forward.
As far as I understand suspension, suspension has been added to the Kotlin std library so other libraries can build on top of it.
This is great because we don’t have the issues in Kotlin I had in Scala from people abusing Future
just because it was in the std lib and opens the doors for others to create async/concurrent and in general suspended based frameworks just with the suspension apis.
While this in concept is great, at the moment it just seems to be serving KotlinX Coroutines Core library use cases.
There is other libraries like Arrow Fx where single shot / eager async continuations are not enough to cover these use cases.
Can we get access to those fields without reflection or does anyone know of an alternative to accomplish this that does not require dealing with the ContinuationImpl
stack?
We are happy to collaborate with changes in the std lib or whatever it takes to get passed this issue.orangy
elizarov
05/31/2019, 12:24 PMarrow-kt
succeed, so I’m open to suggestions on what kind of safe primitive we could add to stdlib that would enable it.elizarov
05/31/2019, 12:27 PMimmutable
modifier), then one could safely introduce a Continuation.copy()
extension that creates a separately-resumable copy of continuation or throws exception if that continuation closed over some mutable state that cannot be safely resumed again. But we don’t have this distinction, so we cannot provide this method. Without the appropriate safety precautions it is too brittle.raulraja
05/31/2019, 4:57 PMimmutable
modifier contain only val
in their property list?, or would it require all inner properties to also be immutable
?
We are happy to work on a KEEP if you think this is desirable and a better path to support copying the continuation state.
I think an interim solution based on intrinsics may also work. Currently we have suspendCoroutineUninterceptedOrReturn
, could we have a similar utility method that is explicit to what it does and automatically copies/rewinds the state? Our current pattern where this is needed looks like:
suspendCoroutineUninterceptedOrReturn { c ->
val labelHere = c.stateStack // save the whole coroutine stack labels
value = flatMap { x: B ->
c.stateStack = labelHere
c.resume(x)
value
}
COROUTINE_SUSPENDED
}
This is necessary for a couple of reasons:
- Accessing the inner A
value in for example a Flow<A>
is multi shot since the Flow may contain multiple values
- Other data types such as IO
produce pure values that can be invoked a la carte N times by a user. If we don’t rewind the state they can only invoke this once.elizarov
05/31/2019, 5:01 PMsuspendCoroutineUninterceptedOrReturn
is not safe, but it is easy to clearly specify under which conditions it can be used. The problem with continuation copying is that there is no way to clearly specify when it is safe to do so. And generally we’d like to avoid this kind of “I feel lucky” functions in Kotlin standard libraryelizarov
05/31/2019, 5:03 PMraulraja
05/31/2019, 5:13 PMraulraja
05/31/2019, 5:14 PMfor comprehensions
and it’s what users demand and are familiar with for the most part.elizarov
05/31/2019, 5:23 PMraulraja
05/31/2019, 5:24 PMflatMap
method similar to how Kotlin treats today Iterator
raulraja
05/31/2019, 5:24 PMfor .. in
pakoito
05/31/2019, 6:38 PMelizarov
05/31/2019, 8:27 PMpakoito
05/31/2019, 10:09 PMelizarov
05/31/2019, 10:15 PMelizarov
05/31/2019, 10:21 PMpakoito
06/01/2019, 10:19 AMelizarov
06/03/2019, 9:29 AMflow
implementation would not work with the notation you’ve outlined at the beginning of the thread, because if a flow
happens to use mutable data structure it will get corrupted by an attempt to resume it twice. However, you can use a different notation. Instead of val n = !flow; ...rest...
write flow.collect { n -> ... rest ... }
and you’ll get the same effect using only single-shot continuations.elizarov
06/03/2019, 9:31 AMpakoito
06/03/2019, 11:15 AMbecause if aCould you please expand on this? It it because it captures a user-defined mutable data structure, or because one's used to define some operators? AFAIK Flows are lazy and do not memoize unless explicitly requested and if that's the case then any reuse is bogus anyway, putting it on a different category than observables (!).happens to use mutable data structure it will get corrupted by an attempt to resume it twiceflow