Whats the closest equivalent to below code using `...
# coroutines
p
Whats the closest equivalent to below code using
Flow
. Or in other words the
create
and
just
operator equivalents.
Copy code
Observable
		.create(
			{ emitter: Emitter<Boolean> ->
                                 // Do stuff and onNext
                        },
			Emitter.BackpressureMode.BUFFER
                  )

// Also for this 

Observable.just(true)
I am using bellow builder but wanted to see other alternatives
Copy code
flow { flowCollector -> ...// Do stuff and emit }
e
Observable.just(x)
->
flowOf(x)
Observable.create { e -> ... e.onNext(x) }
->
flow { ... emit(x) }
BackpressureMode.BUFFER
-> append
.buffer()
operator
p
Got you, thnks
p
@elizarov Wasn't that flow builder non thread safe?
e
Indeed, it is not thread-safe. If “do stuff” is doing something across threads, then:
Observable.create({ e -> ... e.onNext(x) }, BUFFERED)
->
callbackFlow { ... offer(x) }
(
callbackFlow
is buffered by default)
p
Maybe it should be called
nonThreadSafeFlow
instead of
flow
😉
e
This is oxymoron.
Flow
is a sequential, not a thread-safe primitive by design.
For thread-safety and communication between coroutines we have a
Channel
To make this difference absolutely explicit we even have different method names.
Flow
->
emit
,
Channel
-> `send`/`offer`.
👍 1
p
What does this thread safety mean in the context of coroutines?
emit
says
This method is not thread-safe and should not be invoked concurrently.
This is correct:
flow<Int> { emitAll(flowOf(1, 2, 3)) }
And this isnt?
Copy code
flow<Int> {
        coroutineScope {
          launch { emitAll(flowOf(1, 2, 3)) }
          launch { emitAll(flowOf(1, 2, 3)) }
        }
g
Yes, second is incorrect
p
Good point @Paul Woitaschek. Then calling
flow.emitt()
from multiple threads does not garantee the events get pipelined? Is that correct? Unless the flow is backed up by a channel?
👌 1