David
09/28/2018, 9:17 AMDavid
09/28/2018, 9:17 AMimport kotlin.native.concurrent.*
@ThreadLocal
private var gyroSumComputer: GyroSumComputer? = null
class GyroSumWorker {
    var listener: GyroSumWorkerListener? = null
    private val worker = Worker.start()
    private val listenerWorker = Worker.start()
    init {
        worker.execute(TransferMode.SAFE, { GyroSumComputer() }) {
            gyroSumComputer = it
        }
    }
    fun performOperation(input: InputData) {
        println("input data received")
        input.freeze()
        val future = worker.execute(TransferMode.SAFE, { input }) {
            gyroSumComputer?.performOperation(it)
        }
        listenerWorker.execute(TransferMode.SAFE, { Pair(future, listener.freeze()) }) { input ->
            input.first.consume { outputData ->
                println("output computed")
                outputData?.let { input.second?.gyroSumUpdate(it) }
            }
        }
    }
}
interface GyroSumWorkerListener {
    fun gyroSumUpdate(outputData: OutputData)
}olonho
09/28/2018, 9:20 AMDavid
09/28/2018, 9:22 AMDavid
09/28/2018, 9:42 AMolonho
09/28/2018, 11:17 AMimport kotlinx.cinterop.convert
import kotlin.native.concurrent.*
import platform.posix.*
import kotlin.random.Random
data class Event(var data: Int)
interface WorkerListener {
    fun update(data: Event)
}
class ListenerKey
fun doJob(key: ListenerKey): Pair<ListenerKey, Event> {
    usleep(Random.nextInt(100, 100000).convert())
    return Pair(key, Event(Random.nextInt(0, 100000)))
}
class WorkerProcessor {
    private val listeners = mutableMapOf<ListenerKey, WorkerListener>()
    private val worker = Worker.start()
    private val pendingFutures = mutableSetOf<Future<Pair<ListenerKey, Event>>>()
    fun addListener(listener: WorkerListener): ListenerKey {
        val key = ListenerKey().freeze()
        listeners[key] = listener
        return key
    }
    fun requestJob(key: ListenerKey) {
        pendingFutures += worker.execute(TransferMode.SAFE, { key }) {
            doJob(it)
        }
    }
    fun check(timeout: Int = 10) {
        val ready = pendingFutures.waitForMultipleFutures(timeout)
        for (future in ready) {
            future.consume {
                it -> listeners[it.first]!!.update(it.second)
            }
        }
    }
}
fun main(args: Array<String>) {
    val handler = WorkerProcessor()
    val key = handler.addListener(object : WorkerListener {
        override fun update(data: Event) {
            println("got $data")
        }
    })
    handler.requestJob(key)
    for (i in 1 .. 1000) {
        handler.requestJob(key)
        handler.check(10)
        // do smth else here.
    }
}olonho
09/28/2018, 11:23 AMolonho
09/28/2018, 11:23 AM