```fun simulate() { // get only regular sessio...
# getting-started
s
Copy code
fun simulate() {
    // get only regular session and it's
    val sessionSelectedSymbols = toSelectedSymbols(scanResults).filter {
        market.calendar.sessions.find { it.sessionId == it.sessionId }!!.regular
    }
    val targets = groupBySymbol(sessionSelectedSymbols).toMap()
    runBlocking {
        withContext(Dispatchers.Default) {
            targets.toList().map { launch { runSymbol(it) } }
        }
    }
    // will the tasks all finished when call to simulate() returns? or I have to call join on each launched tasks?
}
n
IIRC
runBlocking
won't return until all subtasks finish
should be easy to test
s
Thanks, that was what I understood ... thanks for confirming, -- have to debug to find out why multiple run on same data/config got different results ...
Not sure where went wrong, but I replaced above code with single threaded, and had three runs got same results, so the runBlocking did not work as expected in my use case, it is pretty wierd..... Here is the whole class I have implemented. Any pointers will appreciated.
Copy code
// simulation - run with each stock, indicator calculated for each trading session
class USStkStrategyWalkforwardSimulator(
    val strategy: (symbol: String, date: LocalDate) -> SessionStrategy,
    val scanResults: List<ScanResults>,
    val market: USHistoricalData, // data sources
    val preLoadDays: Long = 10, //might arbitrary, 10 days, 3900 bars should be enough
    val preProcessor: BarPreProcessor = BarPreProcessor_NOP(),
    val nonZeroVolumeBarsOnly: Boolean = false // if we will trigger the calc only on non-zero volume bars

) {
    val results: MutableMap<String, List<SessionStrategy>> = mutableMapOf()

    fun simulate(): Map<String, List<SessionStrategy>> {
        // get only regular session and it's
        val sessionSelectedSymbols =
            toSelectedSymbols(scanResults.filter {
                market.calendar.sessions.find { it.sessionId == it.sessionId }!!.regular
            })
        val targets = groupBySymbol(sessionSelectedSymbols).toMap()


// looks like this runBlocking did not wait all task finish, because each run will got different results
//        runBlocking {
//            withContext(Dispatchers.Default) {
//                targets.toList().map { launch { runSymbol(it) } }.forEach { it.join() }
//            }
//        }


        // try out to run in main thread to repeat -- runned three times, all get same results
        targets.toList().map { runSymbol(it) }


        return results.toMap()
    }

    fun runSymbol(symbolSessions: Pair<String, List<LocalDate>>): Unit {
        val tsStart = symbolSessions.second.minOrNull()!!.atTime(market.rthOpen()).minusDays(preLoadDays * 2)
        val tsEnd = symbolSessions.second.maxOrNull()!!.atTime(market.rthClose()).plusDays(1)
        val allBars =
            HistDataSource_CSV(market.getCsvFileName("stk", symbolSessions.first)).loadChunk(tsStart!!, tsEnd!!)
        results[symbolSessions.first] = symbolSessions.second.map {
            runSessionStock(it, symbolSessions.first, allBars)
        }.filterNotNull()
    }

    fun runSessionStock(sessionId: LocalDate, symbol: String, allBars: List<Bar>): SessionStrategy? {
        val rth = market.calendar.rth(sessionId)!!
        return when (rth.regular) {
            false -> null
            true -> {
                val barsInRange = allBars.filter { it.ts in rth.open.minusDays(preLoadDays)..rth.close }.toList()
                val bars = when (nonZeroVolumeBarsOnly) {
                    true -> barsInRange.filter { it.volume > 0 }
                    false -> barsInRange
                }
                val mktStream = HistMktDataStream(bars.iterator(), preProcessor)
                val stkStrategy = strategy(symbol, sessionId)
                while (mktStream.hasNext()) {
                    stkStrategy.onBar(mktStream.next(null)!!)
                }
                stkStrategy.close()
                return stkStrategy
            }
        }
    }

}
seems can not figure out what went wrong, might have to go back implement with Java ....
n
well, worst case you can use threads like you would in Java, but in Kotlin 😛
if you push a minimal SSCCE somewhere (gist.github.com?) we can try running it
s
thanks for look after this @nanodeath I just tried with Java way of running it, still got same issue, so that box out the issue with Coroutine, Now I believe it has something to do with the Indicators(some functional class in my code), I have a function to duplicate indicators, might got shared states (it suppose to be a deep copy, the copy should not share anything with original one.) In old java, I usually do in-memory object serailization back and forth to get deep copy for free, then each object can be used in different thread. Now in Kotlin, I could not find a easy way to do that, (tried to use Json serialize back and forth, did not work out -- maybe I did not do it right), so end up use reflection to invoke the primary constructor, -- I think might something funky I did there. Now try to do the deep copy manually, hope that works. anyway, thanks again for looking into this
n
ideally you're using immutable types, which obviates the need for deep copies for thread-safety reasons
s
each Indicator is a stateful object to process continuous timeseries so can not really be immutable (BTW, I do not really understand what you mean immutable type here in kotlin)
n
class with only vals which are either primitives or unwritable classes, e.g. (Kotlin) Map
s
oh, got it, that won't work for me, because the Indicator here has to be stateful ....
n
uh, are you mutating
results
from multiple threads? you need a concurrenthashmap there