I run the following code in the `init` method whe...
# flow
n
I run the following code in the
init
method when starting my server
Copy code
flow {
    val stopTimesFirst = getStopTimesFirst()
    println("preparing first data finished")
    emit(stopTimesFirst)
    
    val stopTimesSecond = getStopTimesSecond()
    println("preparing second data finished")
    emit(stopTimesSecond)
}.flatMapConcat { stopTimes ->
    tickerFlow(FIVE_MINUTES_IN_MS)
        .onEach { now ->
            println("Refreshing data: $now")
            results = refreshData(
                now,
                now.plusMinutes(90),
                stopTimes.toList(),
            )
            println("Refreshing data: done")
        }
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
    .launchIn(GlobalScope)
    .start()
whereas
tickerFlow
is defined like
Copy code
fun tickerFlow(period: Long, initialDelay: Long = 0) = flow {
    delay(initialDelay)
    while (true) {
        emit(ZonedDateTime.now(zoneId))
        delay(period)
    }
}
I need an operator instead of the
flatMapConcat
operator that considers every
emit
from
flow
and runs the tickerFlow. Currently, only the first
emit
is considered. When using
map
the first
emit
doesn't trigger the ticker flow... Any suggestions?
s
Since
tickerFlow
has an infinite loop, I think you're looking for
flatMapLatest
. That will cancel each ticker flow so that the next one can start.
n
ok cool. Thanks. Would something like
Copy code
combine(firstFlow, secondFlow) { stopTimesfirst, stopTimesSecond ->
            stopTimesfirst + stopTimesSecond
        }.map { stopTimes ->
            tickerFlow(FIVE_MINUTES_IN_MS)
                .onEach { now ->
                    //
                }
        }.flowOn(Dispatchers.IO)
            .launchIn(GlobalScope)
            .start()
also work? So separating the first flow into two?
s
You'd still need to replace
map
with
mapLatest
to interrupt the ticker flow's infinite loop. But other than that, it should be okay, if
combine
gives you the behaviour you want 👍
n
thanks for the lightning fast response!
🍻 1
🐕 1
Solved it now like that
Copy code
merge(firstFlow, secondFlow, tickerFlow(FIVE_MINUTES_IN_MS)).map {
    when (it) {
        is List<*> -> {
            val items = it.filterIsInstance<StopTime>()
            println("${ZonedDateTime.now(zoneId)}: Received ${items.size} new stopTimes")
            this.stopTimes.addAll(items)
        }

        else -> {
            println("${ZonedDateTime.now(zoneId)}: Refresh flow triggered")
            if (this.stopTimes.isEmpty()) return@map
            println("${ZonedDateTime.now(zoneId)}: Going into refresh flow as stopTimes exist")
            results = refreshData(stopTimes)
        }
    }
}.flowOn(Dispatchers.IO)
    .launchIn(GlobalScope)
    .start()
What do you think @Sam?
This at least does what I want
I don't like the global stopTimes variable in this approach though
s
If it works, it works! It's really only you and the other people working on the codebase who can decide if you're happy with the implementation. I think your instinct to avoid side-effects is a good one. Remember you can always encapsulate that kind of thing inside a new
flow { ... }
block if you need to.
👍 1