David
09/28/2018, 9:17 AMDavid
09/28/2018, 9:17 AMimport kotlin.native.concurrent.*
@ThreadLocal
private var gyroSumComputer: GyroSumComputer? = null
class GyroSumWorker {
var listener: GyroSumWorkerListener? = null
private val worker = Worker.start()
private val listenerWorker = Worker.start()
init {
worker.execute(TransferMode.SAFE, { GyroSumComputer() }) {
gyroSumComputer = it
}
}
fun performOperation(input: InputData) {
println("input data received")
input.freeze()
val future = worker.execute(TransferMode.SAFE, { input }) {
gyroSumComputer?.performOperation(it)
}
listenerWorker.execute(TransferMode.SAFE, { Pair(future, listener.freeze()) }) { input ->
input.first.consume { outputData ->
println("output computed")
outputData?.let { input.second?.gyroSumUpdate(it) }
}
}
}
}
interface GyroSumWorkerListener {
fun gyroSumUpdate(outputData: OutputData)
}
olonho
09/28/2018, 9:20 AMDavid
09/28/2018, 9:22 AMDavid
09/28/2018, 9:42 AMolonho
09/28/2018, 11:17 AMimport kotlinx.cinterop.convert
import kotlin.native.concurrent.*
import platform.posix.*
import kotlin.random.Random
data class Event(var data: Int)
interface WorkerListener {
fun update(data: Event)
}
class ListenerKey
fun doJob(key: ListenerKey): Pair<ListenerKey, Event> {
usleep(Random.nextInt(100, 100000).convert())
return Pair(key, Event(Random.nextInt(0, 100000)))
}
class WorkerProcessor {
private val listeners = mutableMapOf<ListenerKey, WorkerListener>()
private val worker = Worker.start()
private val pendingFutures = mutableSetOf<Future<Pair<ListenerKey, Event>>>()
fun addListener(listener: WorkerListener): ListenerKey {
val key = ListenerKey().freeze()
listeners[key] = listener
return key
}
fun requestJob(key: ListenerKey) {
pendingFutures += worker.execute(TransferMode.SAFE, { key }) {
doJob(it)
}
}
fun check(timeout: Int = 10) {
val ready = pendingFutures.waitForMultipleFutures(timeout)
for (future in ready) {
future.consume {
it -> listeners[it.first]!!.update(it.second)
}
}
}
}
fun main(args: Array<String>) {
val handler = WorkerProcessor()
val key = handler.addListener(object : WorkerListener {
override fun update(data: Event) {
println("got $data")
}
})
handler.requestJob(key)
for (i in 1 .. 1000) {
handler.requestJob(key)
handler.check(10)
// do smth else here.
}
}
olonho
09/28/2018, 11:23 AMolonho
09/28/2018, 11:23 AM