nino
04/11/2024, 6:59 AMinit
method when starting my server
flow {
val stopTimesFirst = getStopTimesFirst()
println("preparing first data finished")
emit(stopTimesFirst)
val stopTimesSecond = getStopTimesSecond()
println("preparing second data finished")
emit(stopTimesSecond)
}.flatMapConcat { stopTimes ->
tickerFlow(FIVE_MINUTES_IN_MS)
.onEach { now ->
println("Refreshing data: $now")
results = refreshData(
now,
now.plusMinutes(90),
stopTimes.toList(),
)
println("Refreshing data: done")
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.launchIn(GlobalScope)
.start()
whereas tickerFlow
is defined like
fun tickerFlow(period: Long, initialDelay: Long = 0) = flow {
delay(initialDelay)
while (true) {
emit(ZonedDateTime.now(zoneId))
delay(period)
}
}
I need an operator instead of the flatMapConcat
operator that considers every emit
from flow
and runs the tickerFlow. Currently, only the first emit
is considered. When using map
the first emit
doesn't trigger the ticker flow... Any suggestions?Sam
04/11/2024, 7:03 AMtickerFlow
has an infinite loop, I think you're looking for flatMapLatest
. That will cancel each ticker flow so that the next one can start.nino
04/11/2024, 7:05 AMcombine(firstFlow, secondFlow) { stopTimesfirst, stopTimesSecond ->
stopTimesfirst + stopTimesSecond
}.map { stopTimes ->
tickerFlow(FIVE_MINUTES_IN_MS)
.onEach { now ->
//
}
}.flowOn(Dispatchers.IO)
.launchIn(GlobalScope)
.start()
also work? So separating the first flow into two?Sam
04/11/2024, 7:08 AMmap
with mapLatest
to interrupt the ticker flow's infinite loop. But other than that, it should be okay, if combine
gives you the behaviour you want 👍nino
04/11/2024, 7:09 AMnino
04/11/2024, 8:07 AMmerge(firstFlow, secondFlow, tickerFlow(FIVE_MINUTES_IN_MS)).map {
when (it) {
is List<*> -> {
val items = it.filterIsInstance<StopTime>()
println("${ZonedDateTime.now(zoneId)}: Received ${items.size} new stopTimes")
this.stopTimes.addAll(items)
}
else -> {
println("${ZonedDateTime.now(zoneId)}: Refresh flow triggered")
if (this.stopTimes.isEmpty()) return@map
println("${ZonedDateTime.now(zoneId)}: Going into refresh flow as stopTimes exist")
results = refreshData(stopTimes)
}
}
}.flowOn(Dispatchers.IO)
.launchIn(GlobalScope)
.start()
What do you think @Sam?nino
04/11/2024, 8:08 AMnino
04/11/2024, 8:08 AMSam
04/11/2024, 8:36 AMflow { ... }
block if you need to.