if I have a `Flow` of data that has separators in ...
# coroutines
b
if I have a
Flow
of data that has separators in it, is there an easy way to split it into a flow of separated chunks? ie:
"@some data@more data@
->
[some data, more data]
j
That's like a
flatMapConcat(item -> split(item).toFlow())
b
but the initial flow are the individual characters, so how would you split it?
(without waiting to collect the whole thing)
j
oh so in this example it's like a flow of characters?
b
right, in reality its a flow of bytes with a special "message separator" byte
j
ah, got it. i would probably write a custom operator that kept a StringBuilder until i saw the delimiter and then emit and reset. can write an example later, but for now i'm off for a few hours.
i'm sure there's some crazy contrivance with built-in operators, but Flow makes it so easy to write custom operators it shouldn't be too hard.
b
I sort of did that, but I'm having trouble with determining when the initial flow is complete and stopping
np, no rush really, I'm off for a while too
I've just been reading through and trying to figure it out for a while so I posted it here
this is the best i've got so far (byte version):
Copy code
fun Flow<ByteArray>.parse() = flow {
    // message separator
    val eom = 0x7e.toByte()
    var data = flatMapConcat { it.asList().asFlow() }

    while (true) {
        data = data.dropWhile { it != eom }.drop(1)
        val msg = data.takeWhile { it != eom }
                .toList().toByteArray()

        if (msg.isEmpty())
            break

        emit(msg)

        data.drop(msg.size)
    }
}
but the
msg.isEmpty()
is a problem, if there's an empty message it will stop early, but if it's not there the loop never ends
z
That looks more complicated than it needs to be, I think Jake was suggesting something like this:
Copy code
val eom = 0x7e.toByte()
fun Flow<ByteArray>.parse() = flow {
  val data = flatMapConcat {
    it.toList().asFlow()
  }
  val buffer = mutableListOf<Byte>()

  data.collect { byte ->
    if (byte == eom) {
      emit(buffer.toList())
      buffer.clear()
    } else {
      buffer += byte
    }
  }
  // Emit the last buffer.
  emit(buffer.toList())
}
I would recommend using something like Okio’s
Buffer
instead of a simple mutable list, since it’s probably more efficient, but same idea.
u
how come its so easy to write that operator compared to rxjava, you dont even need synchronization on the
buffer
?
l
@ursus You don't even need synchronization because it's all sequential.
u
@louiscad what if it were multiple
async { }
? then it would be needed right#
l
@ursus It'd not work if you call
emit
from inside them because the
CoroutineContext
would not be the same (different
Job
). Calls to
emit
must be called on the came
CoroutineContext
, it's specified in the documentation, and checked at runtime.
u
@louiscad okay thanks, but how about plain coroutines without flow?
l
@ursus What's your question exactly? How what?
u
@louiscad wether regular old multithreading issues of shared stuff apply for parallel async blocs etc
l
@ursus Depends on what you do. You can create concurrency issues even with a single thread. The key is avoiding shared mutable state, and using kotlinx.coroutines solutions like
Mutex
or
actor
when you still need it.
u
@ursus ok thanks, so lets say gathering multiple asyncs results into array list needs synchronization?
j
RxJava is all sequential as well
l
@ursus Put all the
Deferred
into a
List<Deferred>
and call
awaitAll()
on it. You shouldn't need extra synchronization if you don't have shared mutable state as I said.
b
Ah I was trying to avoid the use of a mutable buffer
z
Why?
b
i was trying to think of a way to do it with just immutable/functional calls, only thing missing is some sort of
.split
method
z
Any implementation of
split
is probably going to use some mutable data structure or a mutable reference to a persistent immutable data structure under the hood anyway though.