I have a stream of `Meeting`s, where a meeting str...
# coroutines
u
I have a stream of `Meeting`s, where a meeting struct has a
beginsAt: Timestamp
on it I need to render a list of those meetings, and color red those which are say, 15 minutes before beginning. Now, I’d best like to use a combineLatest, however that requires a “time” stream to combine with. I could have a stream which emits every say seconds, but that feels like a waste So my attempt, I observe the stream, and calculate the delta time from now and “15min before beginning”, and
timer
to emit then. Now, the question is, should I use this only as a “tickle” into the combineLatest, and let it’s combiner to reevaluate the “15min before beginning” condition in it
combine(meetings, timer, ::combiner)
Or, should I treat
timer
as the trusted, and then just set the meeting’s color directly, i.e. don’t reevaluate the condition? -- I like the former, however it requires for the
timer
not emit before the given deadline. Is it to be trusted to not do so?
u
What if you have a meeting at 120000 and one at 120001 (or even two meetings at 120000) and now the timer is a bit late due to system load and when it fires the 2nd meeting is also 15 minutes before? So I’d programm a timer for 15min before the first meeting. And whenever the timer emmts, i update all meetings that are <15min ahead and set a new timer to the next meeting comming in range. That’s what I would do any way. And as a benefit, if the timer would actually fire early, you’d update no events.
u
right, so the timer only used to reevaluate the conditions with current time; it doesn’t actually have a payload right?
u
right
u
so basically
combine(meetings, reevaluateTrigger).map { (meetings, _) -> evaluateStuff(meetings, clock.now) }
u
yes. and don’t forget to set a new trigger every time.
u
yes thats what I had in mind, however, is it possible for the timer to emit early? say there is only 1 meeting, and I setup the reevaluateTrigger to emit in 15mins, however if it emits in 14:59 I’m sol, no? meeting color doesnt change and nothing else is enqueued
u
you never know if it is not documented. the set of dispatchers can be extended with new ones any time. and the implementers are not bound by such a guarantee.
Copy code
meeting color doesnt change and nothing else is enqueued
Why would you not enqueue eny thing? the meeting is 1 sec in the future. so you should schedule a new trigger in 1s
u
hm, I guess it depend on how to schedule the timer, this is what I had
Copy code
meetings
   .map { it.sortBy(beginsAt).firstOrNull() }
   .flatMap {
      if (it == null) emptyFlow() else timerFlow(it.beginsAt - clock.now - 15min) }
   }
   .collect {
      reevaluateTrigger.emit(somethingUniqueue())
   }
u
but wouldn’t
it.beginsAt - clock.now
be 1s? And do you mean:
Copy code
(it.beginsAt - 15min) - clock.now
u
yea sry
u
so why would this not schedule a new trigger
it might become negative though 🙂
u
negative timer just does it immidiately, thats fine
u
maybe constrain it to a minimal value of 0 or 100ms
i guess the flatmap is an issue
u
but image this emits early (before deadline), say at 14:59 instead of 15:00 sharp, since the scheduler is weird or something, the
combine
get reevaluated, nothing changes since time is still before 15min till the meeting
then
Copy code
meetings
   .map { it.sortBy(beginsAt).firstOrNull() }
   .flatMap {
      if (it == null) emptyFlow() else timerFlow(it.beginsAt - clock.now - 15min) }
   }
   .collect {
      reevaluateTrigger.emit(somethingUniqueue())
   }
doesn’t get triggered again
I need to attach something to the meeting like, “checkedAt”
u
yeah, you need a combine latest i’d say
u
or, add some “fudge factor” to the calculated timer time, to make sure its later than sooner
and fingers crossed
u
Still don’t get it. In flatMap you set a new timer which will fire in 1s. And then you’ll call reevaluateTrigger again.
u
no
meetings
wont emit again
why would it?
Copy code
combine(meetings, trigger)
   .map { (meetings, _) -> mapToColoredMeetings(meetings, clock.now) }
   .collect { coloredMeetings -> renderUi(coloredMeetings) }

data class Meetings(id, beginsAt)
data class ColoredMeetings(id, beginsAt, color)
u
Copy code
meetingsFlow
   .flatMap { meetings ->
        val nextMeeting = meetings.minOfOrNull(it.beginsAt)
        val nextTriggerAt = (nextMeeting - 15min) - clock.now
        val timerFlow = if (nextTime == null) emptyFlow() else timerFlow(nextTriggerAt) }
        timerFlow.map { meetings }
   }
   .collect { meetings ->
        // This will fire whenever the meetings change or the timer triggers
   }
Only when there are no more meetings will this stop to emit. If you then add a new meeting, the meetingsFlow will restart the machinery.
You can then proceed as you suggested:
Copy code
.map { meetings ->
    mapToColoredMeetings(meetings, clock.now) }
.collect { coloredMeetings ->
    renderUi(coloredMeetings) }
}
ok, should work, but have one extra emission every time, because even if the timer is not early, the first meeting will still be in the list and cause a 0 timer
but maybe you have to partition the meetings inside the flatmap into meetings & colored meetings
good luck, let us know what you come up with
u
somehow I feel
Copy code
meetings
   .transformLatest { meetings ->
      emit(tuple(meetings, clock.now))

      val nextMeetingBegingsAt = it.sortBy(beginsAt).firstOrNull()
      if (nextMeetingBeginsAt != null) {
         delay(nextMeetingBegingsAt - clock.now - 15min + FUDGE_FACTOR)
         emit(tuple(meetings, clock.now))
      }
   }
is more elegant