Dariusz Kuc
07/03/2023, 9:11 PMkevin.cianfarini
07/03/2023, 9:14 PMflatMapLatest
?Dariusz Kuc
07/03/2023, 9:15 PMDariusz Kuc
07/03/2023, 9:15 PMDariusz Kuc
07/03/2023, 9:16 PMkevin.cianfarini
07/03/2023, 9:17 PMDariusz Kuc
07/03/2023, 9:17 PMkevin.cianfarini
07/03/2023, 9:17 PMkevin.cianfarini
07/03/2023, 9:17 PMDariusz Kuc
07/03/2023, 9:18 PMDariusz Kuc
07/03/2023, 9:20 PMsession.send(session.receive().map { toWhatever })
kevin.cianfarini
07/03/2023, 9:22 PMkevin.cianfarini
07/03/2023, 9:22 PMkevin.cianfarini
07/03/2023, 9:22 PMDariusz Kuc
07/03/2023, 9:25 PMstart
, subscribe
, ping
, complete
)
• return a flow of messages based on the incoming message (i.e. subscribe
-> return flow of arbitrary length, others are just flow of 1 element or just cancel)
If I'm processing the response for subscribe
I still need to respond to other incoming requests (e.g. users can ping
me OR send complete
to stop the existing flow)Dariusz Kuc
07/03/2023, 9:25 PMkevin.cianfarini
07/03/2023, 9:25 PMflatMapConcat
?Dariusz Kuc
07/03/2023, 9:26 PMflatMapConcat
/ flatMapMerge
does work fine -> it is just my test setup that is causing issueskevin.cianfarini
07/03/2023, 9:32 PMflatMapMerge
.Dariusz Kuc
07/04/2023, 12:28 AMval topLevelScope = CoroutineScope(SupervisorJob)
val runningStuff = ConcurrentHashMap<String, Job>
// within function
someFlow.flatMapMerge {
x -> channelFlow {
when (x)
is Foo -> send(response)
is Bar -> {
val longRunningProcess = launch {
generateAnotherFlow().collect {
send(it) // <---- problematic as channelFlow will be closed
}
}
runningStuff[id] = longRunningProcess
}
}
}
Dariusz Kuc
07/04/2023, 12:29 AMlaunch
in a channelFlow
scope.... but that had another side effect that scope was blocked until the launch
completed so not the behavior I wantDariusz Kuc
07/04/2023, 12:29 AMDariusz Kuc
07/04/2023, 12:30 AMtopLevelScope
is just for another timeout jobDariusz Kuc
07/04/2023, 12:31 AM