https://kotlinlang.org logo
#flow
Title
# flow
j

Justin Tullgren

08/02/2021, 1:15 PM
Hi, can anyone help me figure out how "cancel" an action inside a flow based on incoming values? Ex:
Copy code
root flow: OperationStart ------ AnotherOperation ------ OperationCancel -------- AnotherBOperation
next flow: OperationStart, perform long running action while !OperationCancel
I don't want either flow actual cancel though, i just want the long running op to stop if a stop event is emitted upstream
Like a download usecase where the user can cancel but i don't want to cancel the flow
n

Nick Allen

08/02/2021, 5:55 PM
Not quite following but does this help?
Copy code
channelFlow {
    val job: Job? = null
    rootFlow.collect {
        when (it) {
            is OperationStart -> {
                job = launch { send(resultOfLongRunningThing()) }
            }
            is OperationCancel -> {
                job?.cancel()
            }
            ...
        }
    }
}
j

Justin Tullgren

08/02/2021, 5:58 PM
um might work, i'll mess around to see if that interleaves the way i hope
only problem i see off the bat is that it makes the flow stateful, cause it has to retain a job
n

Nick Allen

08/02/2021, 6:08 PM
How do you imagine cancelling work without a reference to that work?
j

Justin Tullgren

08/02/2021, 6:10 PM
i'm not sure but in rxjs this is possible with takeUntil. I know its not a direct mapping to flows but here is an rxjs example:
Copy code
import { ajax } from 'rxjs/ajax';

const fetchUserEpic = action$ => action$.pipe(
  ofType(FETCH_USER),
  mergeMap(action => ajax.getJSON(`/api/users/${action.payload}`).pipe(
    map(response => fetchUserFulfilled(response)),
    takeUntil(action$.pipe(
      ofType(FETCH_USER_CANCELLED)
    ))
  ))
);
that is when upstream emits FETCH_USER do the ajax and map to a result, but durring that async upstream emits FETCH_USER_CANCELLED, then don't take the ajax result
IE there is a reference somewhere, but I was hoping it was something being managed by FLOW not me, as I am sure I will mess it up 🙂
n

Nick Allen

08/02/2021, 6:14 PM
(not a JS dev, trying to map the rxjs in my head to rxjava equivalent)
j

Justin Tullgren

08/02/2021, 6:15 PM
ya thats my problem too, map to java then to flow
not sure which operators to use
n

Nick Allen

08/02/2021, 6:56 PM
Note used to
pipe
but this is what the translation seems like to me:
Copy code
private fun <T> Flow<T>.myTakeUntil(other: Flow<Any?>): Flow<T> {
    val DONE = Any()
    val untyped = merge(this, other.map { DONE })
        .takeWhile { it != DONE }
    return untyped as Flow<T>
}

fun fetchUserFulfilled(response: Any?): Any? = null
fun downloadFromService(path: String): Flow<Any> = emptyFlow()

fun fetchUserEpic(rootFlow: Flow<MyOperation>): Flow<Any?> {
    return rootFlow
        .filterIsInstance<FetchUser>()
        .flatMapMerge { fetchUser ->
            downloadFromService("/api/users/${fetchUser.payload}")
                .map { response -> fetchUserFulfilled(response) }
                .myTakeUntil(rootFlow.filterIsInstance<FetchUserCancelled>())
        }
}
j

Justin Tullgren

08/02/2021, 6:59 PM
ok that will take me a while to parse and I am in meetings the rest of the day. I'll try it out later. Thanks so much trying to help!
n

Nick Allen

08/02/2021, 7:25 PM
Note: I'm guessing
downloadFromService
should really just be a suspend method that returns a value instead of returning a
Flow
because I'm guessing it only returns one value. But I left it as-is to try to make it as equivalent as possible.