If I am writing a piece of code for kotlin multiplatform, I tried to improve the following code to g...
r
If I am writing a piece of code for kotlin multiplatform, I tried to improve the following code to get rid of the GlobalScope.launch()..
Copy code
class ChannelWorker() {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    init {
        GlobalScope.launch {
            while(true) {
                val receive = requestChannel.receive()
                val resp = fetchStuff(receive.id)
                responseChannel.send(FooResult(resp))
            }   
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }
}
version 2:
Copy code
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    init {
        CoroutineScope(EmptyCoroutineContext).launch {
            for (message in requestChannel) {
                val resp = fetchStuff(message.id)
                responseChannel.send(FooResult(resp))
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }
}
still I am creating a coroutine in a init block, is that really a good thing? how will the lifecycle of that coroutine be managed? What happens when an object of ChannelWorker goes out of scope, how would I stop the channel receiver coroutine? Or is it cleaner if ChannelWorker has start() and stop() methods?
e
creating a CoroutineScope like 2 is a bad idea, your coroutine may get garbage collected
r
so the first solution is actually better?
e
no, better to pass in a scope as a parameter
r
ok... lets say this scope is not available at object construction
e
then try to restructure so that you have a scope or don't launch anything in init
r
try #3
Copy code
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()
    private var isRunning = Semaphore(1)
    
    private suspend fun start() = coroutineScope {
            if (isRunning.tryAcquire()) {
                launch {
                    println("Current scope $this")
                    for (message in requestChannel) {
                        val resp = fetchStuff(message.id)
                        responseChannel.send(FooResult(resp))
                    }
                }
            }
        }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        start()
        requestChannel.send(FooRequest(id))
        val result = responseChannel.receive()
        return result.result
    }
    
}
e
that likely doesn't do what you want; structured concurrency enforces that
start()
will not return to the caller until all its children jobs are complete
r
even the launched ones?
ok...
so... how do I do it? it seems async is not available in kotlin mpp
try 4:
Copy code
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    private var isRunning = Semaphore(1)
    
    suspend fun start(scope: CoroutineScope) {
        scope.launch {
            if (isRunning.tryAcquire()) {
                println("Current scope $this")
                for (message in requestChannel) {
                    val resp = fetchStuff(message.id)
                    responseChannel.send(FooResult(resp))
                }
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        requestChannel.send(FooRequest(id))
        val result = responseChannel.receive()
        return result.result
    }

}
e
in general a function should be
suspend
, receive a
CoroutineScope
, or neither, but not both. in this case
start
does not need to be
suspend
. but this is closer
on a different note,
requestFoo
is not safe for concurrency
there is no guarantee that, if you make two simultaneous
requestFoo()
calls, that the two
responseChannel.receive()
will execute in the same order that the two
requestChannel.send()
were executed in
and if it's not safe for concurrency, I don't see what the point of this whole class could be
r
yeah I have to create a separate request/response channel for each call to requestFoo
... and in that case...
Copy code
data class FooRequest(val id: String, val responseChannel: Channel<FooResult>)
data class FooResult(val result: Int)
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()

    private var isRunning = Semaphore(1)

    fun start(scope: CoroutineScope) {
        scope.launch {
            if (isRunning.tryAcquire()) {
                println("Current scope $this")
                for (message in requestChannel) {
                    val resp = fetchStuff(message.id)
                    message.responseChannel.send(FooResult(resp))
                }
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        val responseChannel = Channel<FooResult>()
        requestChannel.send(FooRequest(id, responseChannel))
        val result = responseChannel.receive()
        return result.result
    }

}
e
I mean, what does this gain you over calling
fetchStuff()
directly. if you want to ensure serial calls then use an existing sync mechanism
r
I would like to ensure serial calls yes...
basically a LoadingCache like behavior. If cache empty, fetch. Ensure that two fetch requests do not execute concurrently. This is even ok if different keys want refresh in this scenario. Ensure just one concurrent fetch
you mean just use a Semaphore directly?
e
Copy code
val mutex = Mutex()
suspend fun requestFoo(id: String): Int = mutex.withLock {
    fetchStuff(id)
}
no need for any persistent scope or channels
r
hmm thanks
so, expanded to handle a crude cache:
Copy code
val mutex = Mutex()
    val cache = HashMap<String, Int>()
    suspend fun requestFoo(id: String): Int {
        return mutex.withLock {
            val cachedResult = cache[id]
            if(cachedResult != null) {
                cachedResult
            } else {
                val fetched = fetchStuff(id)
                cache[id] = fetched
                fetched
            }
        }
    }
e
stylistically, use
mutableMapOf<String, Int>()
unless you have a particular need for the specific
HashMap
type, and the whole body within
mutex.withLock()
can be replaced by
Copy code
cache.getOrPut(id) { fetchStuff(id) }
1
but essentially yes