Justin Tullgren
08/02/2021, 1:15 PMroot flow: OperationStart ------ AnotherOperation ------ OperationCancel -------- AnotherBOperation
next flow: OperationStart, perform long running action while !OperationCancel
Nick Allen
08/02/2021, 5:55 PMchannelFlow {
val job: Job? = null
rootFlow.collect {
when (it) {
is OperationStart -> {
job = launch { send(resultOfLongRunningThing()) }
}
is OperationCancel -> {
job?.cancel()
}
...
}
}
}
Justin Tullgren
08/02/2021, 5:58 PMNick Allen
08/02/2021, 6:08 PMJustin Tullgren
08/02/2021, 6:10 PMimport { ajax } from 'rxjs/ajax';
const fetchUserEpic = action$ => action$.pipe(
ofType(FETCH_USER),
mergeMap(action => ajax.getJSON(`/api/users/${action.payload}`).pipe(
map(response => fetchUserFulfilled(response)),
takeUntil(action$.pipe(
ofType(FETCH_USER_CANCELLED)
))
))
);
Nick Allen
08/02/2021, 6:14 PMJustin Tullgren
08/02/2021, 6:15 PMNick Allen
08/02/2021, 6:56 PMpipe
but this is what the translation seems like to me:
private fun <T> Flow<T>.myTakeUntil(other: Flow<Any?>): Flow<T> {
val DONE = Any()
val untyped = merge(this, other.map { DONE })
.takeWhile { it != DONE }
return untyped as Flow<T>
}
fun fetchUserFulfilled(response: Any?): Any? = null
fun downloadFromService(path: String): Flow<Any> = emptyFlow()
fun fetchUserEpic(rootFlow: Flow<MyOperation>): Flow<Any?> {
return rootFlow
.filterIsInstance<FetchUser>()
.flatMapMerge { fetchUser ->
downloadFromService("/api/users/${fetchUser.payload}")
.map { response -> fetchUserFulfilled(response) }
.myTakeUntil(rootFlow.filterIsInstance<FetchUserCancelled>())
}
}
Justin Tullgren
08/02/2021, 6:59 PMNick Allen
08/02/2021, 7:25 PMdownloadFromService
should really just be a suspend method that returns a value instead of returning a Flow
because I'm guessing it only returns one value. But I left it as-is to try to make it as equivalent as possible.