https://kotlinlang.org logo
d

Dariusz Kuc

07/03/2023, 9:11 PM
hello 👋 whats the proper way to start a flow based on another flow? i.e. currently I just map incoming flow to the result flow.... but that has a problem that it is sequentially processing elements instead of returning parallel flows
k

kevin.cianfarini

07/03/2023, 9:14 PM
I don't know if I'm understanding your question correctly, but it seems like you might want
flatMapLatest
?
d

Dariusz Kuc

07/03/2023, 9:15 PM
i don't want to cancel old flow though
i.e. trying to implement a protocol on a websocket
so got a stream of incoming messages and for each one of them I need to return a flow of responses
k

kevin.cianfarini

07/03/2023, 9:17 PM
Do you need to return a flow of responses, or just launch an associated job?
d

Dariusz Kuc

07/03/2023, 9:17 PM
it is a streaming response
k

kevin.cianfarini

07/03/2023, 9:17 PM
So you have a single incoming stream of element and a single outgoing stream, yeah?
Or does it fan out into multiple outbound connections?
d

Dariusz Kuc

07/03/2023, 9:18 PM
well it should fan out but its all on the same session
generic spring websocket impl just follows a basic flux
Copy code
session.send(session.receive().map { toWhatever })
k

kevin.cianfarini

07/03/2023, 9:22 PM
I'm personally not familiar with how websockets work at all so if you can distill the problem down to one that doesn't require that domain knowledge I might be able to help
I'm still unsure what you need unfortunately
Like, are the flow of 'responses' intended to be infinite? Finite?
d

Dariusz Kuc

07/03/2023, 9:25 PM
so at high level • given incoming flow of messages (
start
,
subscribe
,
ping
,
complete
) • return a flow of messages based on the incoming message (i.e.
subscribe
-> return flow of arbitrary length, others are just flow of 1 element or just cancel) If I'm processing the response for
subscribe
I still need to respond to other incoming requests (e.g. users can
ping
me OR send
complete
to stop the existing flow)
actually now that I'm thinking about it
k

kevin.cianfarini

07/03/2023, 9:25 PM
I think you want
flatMapConcat
?
d

Dariusz Kuc

07/03/2023, 9:26 PM
I think that
flatMapConcat
/
flatMapMerge
does work fine -> it is just my test setup that is causing issues
k

kevin.cianfarini

07/03/2023, 9:32 PM
It want things to all run concurrently, and not sequentially, you definitely want
flatMapMerge
.
d

Dariusz Kuc

07/04/2023, 12:28 AM
well actually found a problem in my code, I'm doing something along the lines
Copy code
val topLevelScope = CoroutineScope(SupervisorJob)
val runningStuff = ConcurrentHashMap<String, Job>

// within function
someFlow.flatMapMerge { 
  x -> channelFlow {
    when (x)
      is Foo -> send(response)
      is Bar -> {
        val longRunningProcess = launch {
          generateAnotherFlow().collect {
            send(it) // <---- problematic as channelFlow will be closed
          }
        }
        runningStuff[id] = longRunningProcess
      }
  }
}
previously I was wrapping the
launch
in a
channelFlow
scope.... but that had another side effect that scope was blocked until the
launch
completed so not the behavior I want
any thoughts?
doh 🤦 have to just launch it from the flow scope....
topLevelScope
is just for another timeout job
looks like it all works now