https://kotlinlang.org logo
Title
e

Eric Martori

06/07/2019, 3:00 PM
I am trying to create an InvocableFlow, the idea is being able to send values without outside the declaration site of the flow while still exposing a flow Api, would this implementation actually work? Is there any flow in my approach? :
internal class InvocableFlow<T>(private val scope: CoroutineScope) {
    val flow = flow {
        for (value in channel.openSubscription()) {
            emit(value)
        }
    }

    private val channel = BroadcastChannel<T>(BUFFERED)

    fun invoke(data: T) {
        scope.launch {
            channel.send(data)
        }
    }
}
g

gildor

06/07/2019, 3:19 PM
It's not a good approach, this flow can be used only once, after that it will be consumed and class will not work, instead it should be like:
val flow get() = channel.asFlow()
e

Eric Martori

06/07/2019, 3:27 PM
with this change you propose this should work, right?
z

Zach Klippenstein (he/him) [MOD]

06/07/2019, 4:02 PM
I think using
launch
to start the coroutine to call
send
from could be racy: it could look like data is being sent out-of-order. If
scope
has a dispatcher that is backed by a thread pool, and you call
invoke
multiple times in quick succession from the same thread, the actual order in which the `launch`ed coroutines invoke
send
could be non-deterministic (because they’re running on different threads). If you use
runBlocking
instead, the
invoke
call won’t return until the send is complete. It will also propagate backpressure, but will block the caller thread if the buffer is full (backpressure/blocking concerns depend on your use case).
g

gildor

06/07/2019, 4:11 PM
There is no race, it's safe to use with Channels
Channels are thread safe out of the box, so it's fine approach, depends on what you expect, if you want to preserve order you need another approach with bigger buffer (default is just 16 items) and use offer instead of launch + send
You also can expose SendChannel, so user of your API may decide to use offer or send depending on case
z

Zach Klippenstein (he/him) [MOD]

06/07/2019, 4:23 PM
It’s thread safe, yes, but the order that the sends occur will be non-deterministic if the coroutines invoking
send
are running on a threadpool, which might not be what the caller expects. Offer isn’t necessarily appropriate, since it may drop items. Whether this behavior,
offer
, or
runBlocking
is appropriate depends on the use case.
g

gildor

06/07/2019, 4:37 PM
I know that it will be out of order, but it is not necessary bad, for example it's exactly how flatMap works
So in general it not necessary bad
Offer will not drop items if you use conflated channel or Unlimited (or big enough buffer)
runBlocking is almost always bad and if some of client is really needs 8t I believe it should be explicit and wrapped to runBlockimg on call site, not implicitly and usually shouldn't be exposed in public API
z

Zach Klippenstein (he/him) [MOD]

06/07/2019, 7:13 PM
It’s not necessarily bad, it depends on the use case. I just wanted to call it out as potentially surprising behavior.
e

Eric Martori

06/08/2019, 1:48 PM
Well, this discussion gave me a lot to think about. Thank you. I will need to review how exactly I want to use this. Using the current approach in a small pet project didn't break anything. The main idea is being able to send elements from user interactions so maybe it's not that likely the data will be send out of order