Steven Wang
09/14/2020, 10:49 PMfun simulate() {
// get only regular session and it's
val sessionSelectedSymbols = toSelectedSymbols(scanResults).filter {
market.calendar.sessions.find { it.sessionId == it.sessionId }!!.regular
}
val targets = groupBySymbol(sessionSelectedSymbols).toMap()
runBlocking {
withContext(Dispatchers.Default) {
targets.toList().map { launch { runSymbol(it) } }
}
}
// will the tasks all finished when call to simulate() returns? or I have to call join on each launched tasks?
}
nanodeath
09/14/2020, 10:52 PMrunBlocking
won't return until all subtasks finishSteven Wang
09/14/2020, 10:54 PM// simulation - run with each stock, indicator calculated for each trading session
class USStkStrategyWalkforwardSimulator(
val strategy: (symbol: String, date: LocalDate) -> SessionStrategy,
val scanResults: List<ScanResults>,
val market: USHistoricalData, // data sources
val preLoadDays: Long = 10, //might arbitrary, 10 days, 3900 bars should be enough
val preProcessor: BarPreProcessor = BarPreProcessor_NOP(),
val nonZeroVolumeBarsOnly: Boolean = false // if we will trigger the calc only on non-zero volume bars
) {
val results: MutableMap<String, List<SessionStrategy>> = mutableMapOf()
fun simulate(): Map<String, List<SessionStrategy>> {
// get only regular session and it's
val sessionSelectedSymbols =
toSelectedSymbols(scanResults.filter {
market.calendar.sessions.find { it.sessionId == it.sessionId }!!.regular
})
val targets = groupBySymbol(sessionSelectedSymbols).toMap()
// looks like this runBlocking did not wait all task finish, because each run will got different results
// runBlocking {
// withContext(Dispatchers.Default) {
// targets.toList().map { launch { runSymbol(it) } }.forEach { it.join() }
// }
// }
// try out to run in main thread to repeat -- runned three times, all get same results
targets.toList().map { runSymbol(it) }
return results.toMap()
}
fun runSymbol(symbolSessions: Pair<String, List<LocalDate>>): Unit {
val tsStart = symbolSessions.second.minOrNull()!!.atTime(market.rthOpen()).minusDays(preLoadDays * 2)
val tsEnd = symbolSessions.second.maxOrNull()!!.atTime(market.rthClose()).plusDays(1)
val allBars =
HistDataSource_CSV(market.getCsvFileName("stk", symbolSessions.first)).loadChunk(tsStart!!, tsEnd!!)
results[symbolSessions.first] = symbolSessions.second.map {
runSessionStock(it, symbolSessions.first, allBars)
}.filterNotNull()
}
fun runSessionStock(sessionId: LocalDate, symbol: String, allBars: List<Bar>): SessionStrategy? {
val rth = market.calendar.rth(sessionId)!!
return when (rth.regular) {
false -> null
true -> {
val barsInRange = allBars.filter { it.ts in rth.open.minusDays(preLoadDays)..rth.close }.toList()
val bars = when (nonZeroVolumeBarsOnly) {
true -> barsInRange.filter { it.volume > 0 }
false -> barsInRange
}
val mktStream = HistMktDataStream(bars.iterator(), preProcessor)
val stkStrategy = strategy(symbol, sessionId)
while (mktStream.hasNext()) {
stkStrategy.onBar(mktStream.next(null)!!)
}
stkStrategy.close()
return stkStrategy
}
}
}
}
nanodeath
09/15/2020, 2:32 PMSteven Wang
09/15/2020, 2:38 PMnanodeath
09/15/2020, 2:39 PMSteven Wang
09/15/2020, 2:42 PMnanodeath
09/15/2020, 2:43 PMSteven Wang
09/15/2020, 2:44 PMnanodeath
09/15/2020, 2:50 PMresults
from multiple threads? you need a concurrenthashmap there