jkbbwr
02/27/2018, 12:04 AMclass Network(val bind: String) {
val zmqContext = ZMQ.context(2)
val pull = zmqContext.socket(ZMQ.PULL)
val recvThread = newSingleThreadContext("zmqRecvThread")
val sendPool = newFixedThreadPoolContext(4, "zmqSendPool")
val peers = mutableListOf<SendChannel<String>>()
val parent = Job()
suspend fun connect(endpoint: String): SendChannel<String> {
val peer = actor<String>(CoroutineName(endpoint), parent=parent) {
log("peer")
val push = zmqContext.socket(ZMQ.PUSH)
push.connect(endpoint)
for (message in channel) {
withContext(sendPool) {
push.send(message)
}
}
}
peers.add(peer)
return peer
}
fun run() = async(parent) {
log("inbox")
pull.bind(bind)
while (true) {
val message = withContext(recvThread) {
pull.recvStr()
}
log(message)
}
}
}