https://kotlinlang.org logo
#coroutines
Title
# coroutines
b

bj0

07/09/2019, 11:45 PM
if I have a
Flow
of data that has separators in it, is there an easy way to split it into a flow of separated chunks? ie:
"@some data@more data@
->
[some data, more data]
j

jw

07/09/2019, 11:49 PM
That's like a
flatMapConcat(item -> split(item).toFlow())
b

bj0

07/09/2019, 11:59 PM
but the initial flow are the individual characters, so how would you split it?
(without waiting to collect the whole thing)
j

jw

07/09/2019, 11:59 PM
oh so in this example it's like a flow of characters?
b

bj0

07/10/2019, 12:00 AM
right, in reality its a flow of bytes with a special "message separator" byte
j

jw

07/10/2019, 12:01 AM
ah, got it. i would probably write a custom operator that kept a StringBuilder until i saw the delimiter and then emit and reset. can write an example later, but for now i'm off for a few hours.
i'm sure there's some crazy contrivance with built-in operators, but Flow makes it so easy to write custom operators it shouldn't be too hard.
b

bj0

07/10/2019, 12:02 AM
I sort of did that, but I'm having trouble with determining when the initial flow is complete and stopping
np, no rush really, I'm off for a while too
I've just been reading through and trying to figure it out for a while so I posted it here
this is the best i've got so far (byte version):
Copy code
fun Flow<ByteArray>.parse() = flow {
    // message separator
    val eom = 0x7e.toByte()
    var data = flatMapConcat { it.asList().asFlow() }

    while (true) {
        data = data.dropWhile { it != eom }.drop(1)
        val msg = data.takeWhile { it != eom }
                .toList().toByteArray()

        if (msg.isEmpty())
            break

        emit(msg)

        data.drop(msg.size)
    }
}
but the
msg.isEmpty()
is a problem, if there's an empty message it will stop early, but if it's not there the loop never ends
z

Zach Klippenstein (he/him) [MOD]

07/10/2019, 12:39 AM
That looks more complicated than it needs to be, I think Jake was suggesting something like this:
Copy code
val eom = 0x7e.toByte()
fun Flow<ByteArray>.parse() = flow {
  val data = flatMapConcat {
    it.toList().asFlow()
  }
  val buffer = mutableListOf<Byte>()

  data.collect { byte ->
    if (byte == eom) {
      emit(buffer.toList())
      buffer.clear()
    } else {
      buffer += byte
    }
  }
  // Emit the last buffer.
  emit(buffer.toList())
}
I would recommend using something like Okio’s
Buffer
instead of a simple mutable list, since it’s probably more efficient, but same idea.
u

ursus

07/10/2019, 3:28 AM
how come its so easy to write that operator compared to rxjava, you dont even need synchronization on the
buffer
?
l

louiscad

07/10/2019, 6:42 AM
@ursus You don't even need synchronization because it's all sequential.
u

ursus

07/10/2019, 6:43 AM
@louiscad what if it were multiple
async { }
? then it would be needed right#
l

louiscad

07/10/2019, 6:47 AM
@ursus It'd not work if you call
emit
from inside them because the
CoroutineContext
would not be the same (different
Job
). Calls to
emit
must be called on the came
CoroutineContext
, it's specified in the documentation, and checked at runtime.
u

ursus

07/10/2019, 6:51 AM
@louiscad okay thanks, but how about plain coroutines without flow?
l

louiscad

07/10/2019, 6:52 AM
@ursus What's your question exactly? How what?
u

ursus

07/10/2019, 6:53 AM
@louiscad wether regular old multithreading issues of shared stuff apply for parallel async blocs etc
l

louiscad

07/10/2019, 6:55 AM
@ursus Depends on what you do. You can create concurrency issues even with a single thread. The key is avoiding shared mutable state, and using kotlinx.coroutines solutions like
Mutex
or
actor
when you still need it.
u

ursus

07/10/2019, 6:57 AM
@ursus ok thanks, so lets say gathering multiple asyncs results into array list needs synchronization?
j

jw

07/10/2019, 11:09 AM
RxJava is all sequential as well
l

louiscad

07/10/2019, 11:42 AM
@ursus Put all the
Deferred
into a
List<Deferred>
and call
awaitAll()
on it. You shouldn't need extra synchronization if you don't have shared mutable state as I said.
b

bj0

07/10/2019, 2:59 PM
Ah I was trying to avoid the use of a mutable buffer
z

Zach Klippenstein (he/him) [MOD]

07/10/2019, 11:57 PM
Why?
b

bj0

07/11/2019, 8:41 PM
i was trying to think of a way to do it with just immutable/functional calls, only thing missing is some sort of
.split
method
z

Zach Klippenstein (he/him) [MOD]

07/11/2019, 9:16 PM
Any implementation of
split
is probably going to use some mutable data structure or a mutable reference to a persistent immutable data structure under the hood anyway though.
3 Views