https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Marko Mitic

07/22/2019, 12:30 PM
I need bit of guidance, I'm trying to use coroutines to offload DB writes to another thread. I have this DB table that is read on app start and is all writes after that thanks to caching. I wanted to avoid blocking the calls for clients so I'm offloading writes using a channel (writes should be executed in-order so I used a channel). Is this ok? (GlobalScope will be replaced later)
Copy code
internal class BlockedAppsStoreDatabaseImpl(private val blockedAppDao: BlockedAppDao) : BlockedAppsStore {

    private val channel = Channel<DbOp>(Channel.UNLIMITED)

    init {
        GlobalScope.launch {
            select {
                channel.onReceive { op ->
                    when (op) {
                        is Get -> op.cb.complete(blockedAppDao.getAll().toSet())
                        is Put -> blockedAppDao.putOrReplace(BlockedAppEntity(op.uid))
                        is Remove -> blockedAppDao.remove(op.uid)
                        is PutSet -> blockedAppDao.putOrReplace(op.uids)
                        is Fill -> blockedAppDao.fill(op.uids)
                        is RemoveAll -> blockedAppDao.removeAll()
                    }
                }
            }
        }
    }

    override fun getAll(): Set<Uid> {
        val deferred = CompletableDeferred<Set<Uid>>()
        channel.sendBlocking(Get(deferred))
        return runBlocking { deferred.await() }
    }

    override fun put(uid: Uid) {
        channel.sendBlocking(Put(uid))
    }
    ...
d

Dominaezzz

07/22/2019, 12:42 PM
This still blocks the calling thread anyway. You need to make those functions suspend.
m

Marko Mitic

07/22/2019, 12:46 PM
Get is intentionally blocking, but put shouldn't block, right?
(since channel isn't limited in size)
d

Dominaezzz

07/22/2019, 1:01 PM
That depends on what you want. Do you need the operations to conclude before the function returns?
Also why do you need the channel?
m

Marko Mitic

07/22/2019, 1:14 PM
I don't need DB write to complete before returning, I'm trying to skip that wait
I'm using channel to keep write operations in order
d

Dominaezzz

07/22/2019, 1:21 PM
Then you need to do a
launch
in the set function.
The use suspending
send
.
m

Marko Mitic

07/22/2019, 1:24 PM
sendBlocking
doesn't block unless channel is full
👌 1
it should work for me since channel is unlimited, right?
I've confirmed this works (after putting
select
in a
while
loop), I'm checking whether I'm missing some big-picture issue
m

mbonnin

07/22/2019, 1:30 PM
You could use offer() directly if you know your channel will never block
m

Marko Mitic

07/22/2019, 1:30 PM
nice one, thanks
b

bdawg.io

07/24/2019, 9:27 PM
You could simplify your code to avoid using a
select
and just use
channel.consumeEach { op ->
directly as well.
One other thing you could check out is using an actor directly for your
channel
as well.
Copy code
private val channel = GlobalScope.actor<DbOp>(capacity = Channel.UNLIMITED) {
    for (op in channel) { ... }
}
That will ensure that your channel is properly cleaned up when your scope gets cancelled. (Which would make moving away from GlobalScope easier in future refactors)
m

Marko Mitic

07/24/2019, 11:00 PM
Thanks, that's all usefull
both
actor
and
consumeEach
are experimental, I can't really use them in my library
although they look real nice
b

bdawg.io

07/25/2019, 5:43 PM
`actor`s are "obsolete" because they are being refactored to support complex cases with provided classes, but I doubt they will force us to implement a class just to use an actor with the newer complex stuff.
consumeEach
is tentatively graduating in 1.4.0. even if it doesn't, it's implementation is fairly simple
Copy code
suspend fun <T> ReceiveChannel<T>.consumeEach(block: (T) -> Unit) {
    try {
        for (item in this) block(item)
    } catch (e: Throwable) {
        cancel(e as? CancellationException ?: CancellationException("Channel cancelled, consumer failed", e))
    } finally {
        cancel()
    }
}
4 Views