nino
04/11/2024, 6:59 AMinit method when starting my server
flow {
val stopTimesFirst = getStopTimesFirst()
println("preparing first data finished")
emit(stopTimesFirst)
val stopTimesSecond = getStopTimesSecond()
println("preparing second data finished")
emit(stopTimesSecond)
}.flatMapConcat { stopTimes ->
tickerFlow(FIVE_MINUTES_IN_MS)
.onEach { now ->
println("Refreshing data: $now")
results = refreshData(
now,
now.plusMinutes(90),
stopTimes.toList(),
)
println("Refreshing data: done")
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.launchIn(GlobalScope)
.start()
whereas tickerFlow is defined like
fun tickerFlow(period: Long, initialDelay: Long = 0) = flow {
delay(initialDelay)
while (true) {
emit(ZonedDateTime.now(zoneId))
delay(period)
}
}
I need an operator instead of the flatMapConcat operator that considers every emit from flow and runs the tickerFlow. Currently, only the first emit is considered. When using map the first emit doesn't trigger the ticker flow... Any suggestions?Sam
04/11/2024, 7:03 AMtickerFlow has an infinite loop, I think you're looking for flatMapLatest. That will cancel each ticker flow so that the next one can start.nino
04/11/2024, 7:05 AMcombine(firstFlow, secondFlow) { stopTimesfirst, stopTimesSecond ->
stopTimesfirst + stopTimesSecond
}.map { stopTimes ->
tickerFlow(FIVE_MINUTES_IN_MS)
.onEach { now ->
//
}
}.flowOn(Dispatchers.IO)
.launchIn(GlobalScope)
.start()
also work? So separating the first flow into two?Sam
04/11/2024, 7:08 AMmap with mapLatest to interrupt the ticker flow's infinite loop. But other than that, it should be okay, if combine gives you the behaviour you want 👍nino
04/11/2024, 7:09 AMnino
04/11/2024, 8:07 AMmerge(firstFlow, secondFlow, tickerFlow(FIVE_MINUTES_IN_MS)).map {
when (it) {
is List<*> -> {
val items = it.filterIsInstance<StopTime>()
println("${ZonedDateTime.now(zoneId)}: Received ${items.size} new stopTimes")
this.stopTimes.addAll(items)
}
else -> {
println("${ZonedDateTime.now(zoneId)}: Refresh flow triggered")
if (this.stopTimes.isEmpty()) return@map
println("${ZonedDateTime.now(zoneId)}: Going into refresh flow as stopTimes exist")
results = refreshData(stopTimes)
}
}
}.flowOn(Dispatchers.IO)
.launchIn(GlobalScope)
.start()
What do you think @Sam?nino
04/11/2024, 8:08 AMnino
04/11/2024, 8:08 AMSam
04/11/2024, 8:36 AMflow { ... } block if you need to.