https://kotlinlang.org logo
Title
r

rrva

05/05/2022, 7:32 PM
If I am writing a piece of code for kotlin multiplatform, I tried to improve the following code to get rid of the GlobalScope.launch()..
class ChannelWorker() {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    init {
        GlobalScope.launch {
            while(true) {
                val receive = requestChannel.receive()
                val resp = fetchStuff(receive.id)
                responseChannel.send(FooResult(resp))
            }   
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }
}
version 2:
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    init {
        CoroutineScope(EmptyCoroutineContext).launch {
            for (message in requestChannel) {
                val resp = fetchStuff(message.id)
                responseChannel.send(FooResult(resp))
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }
}
still I am creating a coroutine in a init block, is that really a good thing? how will the lifecycle of that coroutine be managed? What happens when an object of ChannelWorker goes out of scope, how would I stop the channel receiver coroutine? Or is it cleaner if ChannelWorker has start() and stop() methods?
e

ephemient

05/05/2022, 7:57 PM
creating a CoroutineScope like 2 is a bad idea, your coroutine may get garbage collected
r

rrva

05/05/2022, 7:58 PM
so the first solution is actually better?
e

ephemient

05/05/2022, 7:58 PM
no, better to pass in a scope as a parameter
r

rrva

05/05/2022, 7:58 PM
ok... lets say this scope is not available at object construction
e

ephemient

05/05/2022, 8:00 PM
then try to restructure so that you have a scope or don't launch anything in init
r

rrva

05/05/2022, 8:04 PM
try #3
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()
    private var isRunning = Semaphore(1)
    
    private suspend fun start() = coroutineScope {
            if (isRunning.tryAcquire()) {
                launch {
                    println("Current scope $this")
                    for (message in requestChannel) {
                        val resp = fetchStuff(message.id)
                        responseChannel.send(FooResult(resp))
                    }
                }
            }
        }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        start()
        requestChannel.send(FooRequest(id))
        val result = responseChannel.receive()
        return result.result
    }
    
}
e

ephemient

05/05/2022, 8:06 PM
that likely doesn't do what you want; structured concurrency enforces that
start()
will not return to the caller until all its children jobs are complete
r

rrva

05/05/2022, 8:09 PM
even the launched ones?
ok...
so... how do I do it? it seems async is not available in kotlin mpp
try 4:
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()
    private val responseChannel = Channel<FooResult>()

    private var isRunning = Semaphore(1)
    
    suspend fun start(scope: CoroutineScope) {
        scope.launch {
            if (isRunning.tryAcquire()) {
                println("Current scope $this")
                for (message in requestChannel) {
                    val resp = fetchStuff(message.id)
                    responseChannel.send(FooResult(resp))
                }
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        requestChannel.send(FooRequest(id))
        val result = responseChannel.receive()
        return result.result
    }

}
e

ephemient

05/05/2022, 8:40 PM
in general a function should be
suspend
, receive a
CoroutineScope
, or neither, but not both. in this case
start
does not need to be
suspend
. but this is closer
on a different note,
requestFoo
is not safe for concurrency
there is no guarantee that, if you make two simultaneous
requestFoo()
calls, that the two
responseChannel.receive()
will execute in the same order that the two
requestChannel.send()
were executed in
and if it's not safe for concurrency, I don't see what the point of this whole class could be
r

rrva

05/05/2022, 8:45 PM
yeah I have to create a separate request/response channel for each call to requestFoo
... and in that case...
data class FooRequest(val id: String, val responseChannel: Channel<FooResult>)
data class FooResult(val result: Int)
class ChannelWorker {
    private val requestChannel = Channel<FooRequest>()

    private var isRunning = Semaphore(1)

    fun start(scope: CoroutineScope) {
        scope.launch {
            if (isRunning.tryAcquire()) {
                println("Current scope $this")
                for (message in requestChannel) {
                    val resp = fetchStuff(message.id)
                    message.responseChannel.send(FooResult(resp))
                }
            }
        }
    }

    private suspend fun fetchStuff(id: String): Int {
        delay(1000)
        return id.toInt()
    }

    suspend fun requestFoo(id: String): Int {
        val responseChannel = Channel<FooResult>()
        requestChannel.send(FooRequest(id, responseChannel))
        val result = responseChannel.receive()
        return result.result
    }

}
e

ephemient

05/05/2022, 8:53 PM
I mean, what does this gain you over calling
fetchStuff()
directly. if you want to ensure serial calls then use an existing sync mechanism
r

rrva

05/05/2022, 8:53 PM
I would like to ensure serial calls yes...
basically a LoadingCache like behavior. If cache empty, fetch. Ensure that two fetch requests do not execute concurrently. This is even ok if different keys want refresh in this scenario. Ensure just one concurrent fetch
you mean just use a Semaphore directly?
e

ephemient

05/05/2022, 8:56 PM
val mutex = Mutex()
suspend fun requestFoo(id: String): Int = mutex.withLock {
    fetchStuff(id)
}
no need for any persistent scope or channels
r

rrva

05/05/2022, 9:02 PM
hmm thanks
so, expanded to handle a crude cache:
val mutex = Mutex()
    val cache = HashMap<String, Int>()
    suspend fun requestFoo(id: String): Int {
        return mutex.withLock {
            val cachedResult = cache[id]
            if(cachedResult != null) {
                cachedResult
            } else {
                val fetched = fetchStuff(id)
                cache[id] = fetched
                fetched
            }
        }
    }
e

ephemient

05/05/2022, 9:09 PM
stylistically, use
mutableMapOf<String, Int>()
unless you have a particular need for the specific
HashMap
type, and the whole body within
mutex.withLock()
can be replaced by
cache.getOrPut(id) { fetchStuff(id) }
1
but essentially yes