https://kotlinlang.org logo
Title
u

ursus

12/26/2021, 2:10 AM
class Presenter {
   init {
      presenterScope.launch {
         syncer.syncingFinishedEvent
	   .collect {
	      navigateSomewhere()
	   }
      }
      syncer.sync()
   }
}

class Syncer {
   fun sync() {
      syncerScope.launch {
         ...
         syncFinishedEvent.emit(Unit)
      }
   }
}
Is there a way to only call
syncer.sync()
after
syncer.syncingFinishedEvent
is for sure subscribed to? I cannot miss a event
b

bezrukov

12/26/2021, 6:36 AM
You can use
start = Undispatched
in the first launch, in this case you will subscribe to the flow in the same call frame (but in this case it will be executed on the presenter's constructor thread)
n

Nick Allen

12/27/2021, 1:46 AM
Trying to manipulate internal behavior to get desired behavior is fragile at best. A
flowOn
could easily break this approach. Just because you subscribed to a
Flow
does not mean that the upstream source
Flow
has been subscribed. For this particular scenario, I don't even see the need for any
Flow
. If you want to do something and wait for it to finish, just make it a suspend method.
suspend fun sync()
would make it a lot simpler. If you really need/want it to run in it's own scope, then return the Job so you can
join
it.
u

ursus

12/27/2021, 3:11 AM
Well, thats the shape of my api, many callsites can call it so syncs run on own scope, ui just sends requests to it. I dont want to expose suspend fun as then I'd need a mutex, only to return right away after resuming after contention, and then thats needs a flow of results anyways, as I dont want to rerun the sync for the 2nd mutex contender etc Also, exposing a Job.. someone might cancel it, I'm not a huge fan of that What I'd best like here is
doOnSubscribe
from rxJava, but not sure how to implement it as line after
collect,
when creating custom operator, only runs after the flow completes 😔 To be honest I find it silly that I cannot guarantee im collecting a flow, when the same component is initiating work which writes to the flow, so logically, it is present .. why are the hacks needed.. it just worked with rxjava
n

Nick Allen

12/27/2021, 3:30 AM
Seems like you may benefit from something like:
val syncJob = syncerScope.launch(start = LAZY) { ... }
suspend fun sync() = syncJob.join()
The basic equivalent of RxJava's
doOnSubscribe
would be
onStart
. If using a
SharedFlow
there is the
onSubscription
method.
FYI, RxJava`s
doOnSubscribe
only really tells you that subscription is setup downstream , it does not guarantee upstream subscription. The
subscribeOn
operator can mess up such a use case just like
flowOn
with a
Flow
. If you want to wait for something, you should really do it explicitly. That's no different in RxJava than Kotlin coroutines.
I still think this looks more like "state" but if you have a shared event, you can always use
shareIn
operator and subscribers can avoid "missing" it if you use
replay = 1
.
FYI, if you want failure to propagate to the caller:
val syncJob = syncerScope.async<Unit>(start = LAZY) { ... }
suspend fun sync() = syncJob.await()
u

ursus

12/28/2021, 2:12 AM
To be honest it was state, but the thing why im dealing with this is because sometimes I got backpressure in the collector, so it skipped the SYNCING state, then SYNCED got emitted, but lets say SYNCED was default, so its last value the collector seen, so the last SYNCED wont get delivered because of the
distinctUntilChanged
behavior of StateFlow Anyways,
SharedFlow.onSubscription
looks like what I want, but it feels odd to expose flow subclass as public api, or its just me?
n

Nick Allen

12/28/2021, 2:52 AM
it feels odd to expose flow subclass as public api, or its just me?
Yes it's weird, imo, for this use case. In general it's not weird to expose SharedFlow or StateFlow. If you describe a bit more of your use case, I'm guessing a better solution can be found.
u

ursus

12/28/2021, 4:30 AM
I have a Syncer which has its own scope so I can throttle/serialize/return requests to sync from many call sites, and sync should not be canceled when call site dies. I expose
Sycner.syncInProgress: Flow<Boolean>
Now this usecase came where I need to navigate to some blocking ui, trigger a sync, wait for it to finish, then navigate to next screen. My solution was, as it was driven by StateFlow, to drop(1) to get ride of the cached value, and to request the sync. Given its dropped 1, I should only receive "live" values, wait for
filter { it == false
}`` as my signal to navigate to next screen. What however sometimes happens is that, if sync is fast, and collector (main thread drvien) is busy, it skips the
true
emit. And the when state changes back to false, its equal to the initial cached emit in eyes of the collector, so a no-emit. And now my ui is stuck.
n

Nick Allen

12/28/2021, 5:05 AM
I'm reading that as "I want to do something and know when it's done". That's the use case that a suspend method is for. You can do the work on
syncerScope
, and then `join`/`await` it.
I don't advise it but the another way would be with monotonic ids (counter or clock) so calls to
fun sync(): SyncId
could be linked to
Flow<SyncId>
and the monotonic nature means you can miss SyncIds in the
Flow
and still know they happened.
Flow<Boolean>
is really "I want to know if this is syncing" which is not what you want. It's essentially the same for
Flow<Unit>
since there's nothing except vicinity of timing letting you guess that the event and request are related but you may not know for sure.