rrva
05/05/2022, 7:32 PMclass ChannelWorker() {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
init {
GlobalScope.launch {
while(true) {
val receive = requestChannel.receive()
val resp = fetchStuff(receive.id)
responseChannel.send(FooResult(resp))
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
}
version 2:
class ChannelWorker {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
init {
CoroutineScope(EmptyCoroutineContext).launch {
for (message in requestChannel) {
val resp = fetchStuff(message.id)
responseChannel.send(FooResult(resp))
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
}
still I am creating a coroutine in a init block, is that really a good thing? how will the lifecycle of that coroutine be managed? What happens when an object of ChannelWorker goes out of scope, how would I stop the channel receiver coroutine? Or is it cleaner if ChannelWorker has start() and stop() methods?ephemient
05/05/2022, 7:57 PMrrva
05/05/2022, 7:58 PMephemient
05/05/2022, 7:58 PMrrva
05/05/2022, 7:58 PMephemient
05/05/2022, 8:00 PMrrva
05/05/2022, 8:04 PMclass ChannelWorker {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
private var isRunning = Semaphore(1)
private suspend fun start() = coroutineScope {
if (isRunning.tryAcquire()) {
launch {
println("Current scope $this")
for (message in requestChannel) {
val resp = fetchStuff(message.id)
responseChannel.send(FooResult(resp))
}
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
suspend fun requestFoo(id: String): Int {
start()
requestChannel.send(FooRequest(id))
val result = responseChannel.receive()
return result.result
}
}
ephemient
05/05/2022, 8:06 PMstart()
will not return to the caller until all its children jobs are completerrva
05/05/2022, 8:09 PMrrva
05/05/2022, 8:09 PMrrva
05/05/2022, 8:09 PMrrva
05/05/2022, 8:31 PMclass ChannelWorker {
private val requestChannel = Channel<FooRequest>()
private val responseChannel = Channel<FooResult>()
private var isRunning = Semaphore(1)
suspend fun start(scope: CoroutineScope) {
scope.launch {
if (isRunning.tryAcquire()) {
println("Current scope $this")
for (message in requestChannel) {
val resp = fetchStuff(message.id)
responseChannel.send(FooResult(resp))
}
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
suspend fun requestFoo(id: String): Int {
requestChannel.send(FooRequest(id))
val result = responseChannel.receive()
return result.result
}
}
ephemient
05/05/2022, 8:40 PMsuspend
, receive a CoroutineScope
, or neither, but not both. in this case start
does not need to be suspend
. but this is closerephemient
05/05/2022, 8:41 PMrequestFoo
is not safe for concurrencyephemient
05/05/2022, 8:42 PMrequestFoo()
calls, that the two responseChannel.receive()
will execute in the same order that the two requestChannel.send()
were executed inephemient
05/05/2022, 8:43 PMrrva
05/05/2022, 8:45 PMrrva
05/05/2022, 8:46 PMrrva
05/05/2022, 8:50 PMdata class FooRequest(val id: String, val responseChannel: Channel<FooResult>)
data class FooResult(val result: Int)
class ChannelWorker {
private val requestChannel = Channel<FooRequest>()
private var isRunning = Semaphore(1)
fun start(scope: CoroutineScope) {
scope.launch {
if (isRunning.tryAcquire()) {
println("Current scope $this")
for (message in requestChannel) {
val resp = fetchStuff(message.id)
message.responseChannel.send(FooResult(resp))
}
}
}
}
private suspend fun fetchStuff(id: String): Int {
delay(1000)
return id.toInt()
}
suspend fun requestFoo(id: String): Int {
val responseChannel = Channel<FooResult>()
requestChannel.send(FooRequest(id, responseChannel))
val result = responseChannel.receive()
return result.result
}
}
ephemient
05/05/2022, 8:53 PMfetchStuff()
directly. if you want to ensure serial calls then use an existing sync mechanismrrva
05/05/2022, 8:53 PMrrva
05/05/2022, 8:55 PMrrva
05/05/2022, 8:55 PMephemient
05/05/2022, 8:56 PMval mutex = Mutex()
suspend fun requestFoo(id: String): Int = mutex.withLock {
fetchStuff(id)
}
no need for any persistent scope or channelsrrva
05/05/2022, 9:02 PMrrva
05/05/2022, 9:03 PMval mutex = Mutex()
val cache = HashMap<String, Int>()
suspend fun requestFoo(id: String): Int {
return mutex.withLock {
val cachedResult = cache[id]
if(cachedResult != null) {
cachedResult
} else {
val fetched = fetchStuff(id)
cache[id] = fetched
fetched
}
}
}
ephemient
05/05/2022, 9:09 PMmutableMapOf<String, Int>()
unless you have a particular need for the specific HashMap
type, and the whole body within mutex.withLock()
can be replaced by
cache.getOrPut(id) { fetchStuff(id) }
ephemient
05/05/2022, 9:09 PM