https://kotlinlang.org logo
Title
j

janvladimirmostert

02/19/2022, 11:11 AM
Is there a way in Coroutine Flows to say for example collect exactly 1 element, then collect 4 elements for example then collect more after that? flow.collect just drains the whole flow, flow.take(1) stops the flow, so this doesn't work:
flow.take(1).collect {
   println(it)
}
flow.take(4).collect {
    println(it)
}
This is for parsing a byte flow that I'm reading from a socket, so in the case of take(4), it would be nice to do fold(4) that then spits out an Int Just not sure what options there are to only read a limited number of bytes and then let the next flow operation continue
r

Rick Clephas

02/19/2022, 11:23 AM
You could use
produceIn
to convert your
Flow
to a
ReceiveChannel
and use
receive
on that channel to only get a single element at a time. https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/produce-in.html
a

Adam Powell

02/19/2022, 2:26 PM
☝️ That, however, reading single bytes from a channel or flow is an incredibly wasteful way to perform network data parsing
1
o

Orhan Tozan

02/19/2022, 2:28 PM
Looks like you need some sort of split operator for that
j

janvladimirmostert

02/19/2022, 3:01 PM
I've implemented it now that it reads to a ByteBuffer, and then I just read the bytes I need from the ByteBuffer and if it runs out of buffer, it rewinds the buffer and reads more data into the buffer Using flow at that level does seem wasteful
👍 1
o

Orhan Tozan

02/19/2022, 3:06 PM
I think you can do this with just using .take() and .drop()
c

corneil

02/20/2022, 5:58 PM
I usually use a statemachine for parsing and will be sending in bytes. Doing so from network is the best place. Then the statemachine can generate higher level events that makes sense to the application.
j

janvladimirmostert

02/22/2022, 5:43 PM
Parsing database responses doesn't require a very complicated state machine, but that's exactly what I did here Some of the responses return multiple messages which I'll need to combine to return something useful at the API level I was curious if I could use flow to stream results to browser, but I'll need to do that at a higher level, read until I have a complete row, emit it on the Flow, then continue reading until I've finished reading all the rows and then on the consumer side, my custom server will push the data via socket to the client, at least it's silly to do it at the byte level, streaming complete rows seems like a nice fit
c

corneil

02/22/2022, 6:10 PM
I want to make sure whatever you use to write to the browser that your thread isn't blocked. I would use a channel for that purpose.
j

janvladimirmostert

02/22/2022, 9:11 PM
the whole stack is using Coroutines and all interactions with sockets are asynchronous and coroutines-wrapped, so nowhere is anything blocked (that includes the home-grown webserver, the custom db drivers, all integrations with external services and all connections between the backend services) to get flow-like mechanics working on a DB I'll have to keep a connection per flow which actually scales poorly, so instead the flow will need to grab a connection from the pool when there's a pull, do its query to get the data emitted and then release the db connection again so that another flow can grab data. Channels are great, but the recommendation is to use to StateFlow or MutableStateFlow from what I remember I also want to explore flows over sockets, suspending AMQP messaging instead of the blocking consumers Sure, I can convert CompletableFutures to Coroutines, but where's the fun in that!