Hi, can anyone help me figure out how "cancel" an ...
# flow
Hi, can anyone help me figure out how "cancel" an action inside a flow based on incoming values? Ex:
root flow: OperationStart ------ AnotherOperation ------ OperationCancel -------- AnotherBOperation
next flow: OperationStart, perform long running action while !OperationCancel
I don't want either flow actual cancel though, i just want the long running op to stop if a stop event is emitted upstream
Like a download usecase where the user can cancel but i don't want to cancel the flow
Not quite following but does this help?
channelFlow {
    val job: Job? = null
    rootFlow.collect {
        when (it) {
            is OperationStart -> {
                job = launch { send(resultOfLongRunningThing()) }
            is OperationCancel -> {
um might work, i'll mess around to see if that interleaves the way i hope
only problem i see off the bat is that it makes the flow stateful, cause it has to retain a job
How do you imagine cancelling work without a reference to that work?
i'm not sure but in rxjs this is possible with takeUntil. I know its not a direct mapping to flows but here is an rxjs example:
import { ajax } from 'rxjs/ajax';

const fetchUserEpic = action$ => action$.pipe(
  mergeMap(action => ajax.getJSON(`/api/users/${action.payload}`).pipe(
    map(response => fetchUserFulfilled(response)),
that is when upstream emits FETCH_USER do the ajax and map to a result, but durring that async upstream emits FETCH_USER_CANCELLED, then don't take the ajax result
IE there is a reference somewhere, but I was hoping it was something being managed by FLOW not me, as I am sure I will mess it up 🙂
(not a JS dev, trying to map the rxjs in my head to rxjava equivalent)
ya thats my problem too, map to java then to flow
not sure which operators to use
Note used to
but this is what the translation seems like to me:
private fun <T> Flow<T>.myTakeUntil(other: Flow<Any?>): Flow<T> {
    val DONE = Any()
    val untyped = merge(this, other.map { DONE })
        .takeWhile { it != DONE }
    return untyped as Flow<T>

fun fetchUserFulfilled(response: Any?): Any? = null
fun downloadFromService(path: String): Flow<Any> = emptyFlow()

fun fetchUserEpic(rootFlow: Flow<MyOperation>): Flow<Any?> {
    return rootFlow
        .flatMapMerge { fetchUser ->
                .map { response -> fetchUserFulfilled(response) }
ok that will take me a while to parse and I am in meetings the rest of the day. I'll try it out later. Thanks so much trying to help!
Note: I'm guessing
should really just be a suspend method that returns a value instead of returning a
because I'm guessing it only returns one value. But I left it as-is to try to make it as equivalent as possible.