Maurice Jouvet
02/27/2020, 9:44 AMval entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")
CoroutineScope(entryCoroutineContext).launch {
val syncEntries = syncEntryDao.findAll()
syncEntries.forEach { syncEntry ->
Timber.d("Got this data to sync: $syncEntry") // All data are shown here I exepct just 5 to 5
syncEntry.locked = true
syncEntryDao.update(syncEntry)
val response = webServices.syncEntriesCoroutine(clientName, syncEntry.date, syncEntry.blueAppId, syncEntry.offset, syncEntry.limit)
if (response.isSuccessful) {
...
}
}
robin
02/27/2020, 10:19 AMlaunch
, and everything inside of that runs blocking? The .forEach
call does not parallelize anything, at least if syncEntries
is a standard list, and not an already parallelized stream or flow.Maurice Jouvet
02/27/2020, 10:36 AMTimber.d("Got this data to sync: $syncEntry")
robin
02/27/2020, 12:34 PMsyncEntries
list, on a single thread, one by one in sequence (not at the same time).
What you could do instead is this:
syncEntries.forEach { syncEntry ->
launch(entryCoroutineContext) {
Timber.d("Got this data to sync: $syncEntry")
// etc.
}
}
This will launch a new coroutine for every syncEntry, and will run all these coroutines on your pool of 5 threads concurrently. I'm not sure that's what you actually want, though.Maurice Jouvet
02/27/2020, 1:34 PMJakub Gwóźdź
02/28/2020, 8:27 AMrobin
02/28/2020, 1:39 PMlaunch
and async
. It might not work if those suspend functions do unexpected stuff, but in general, it should work fine! If it does not, you probably have another bug there.Jakub Gwóźdź
02/28/2020, 4:43 PMdata class Counter(var current:Int = 0, var top:Int = 0) {
fun inc() {
synchronized(this) {
current++
top = top.coerceAtLeast(current)
}
}
fun dec() {
synchronized(this) {
current--
}
}
}
val counter = Counter()
val entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")
suspend fun main() = coroutineScope {
(1..10).forEach { syncEntry ->
launch(entryCoroutineContext) {
counter.inc()
println("I'm starting $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
Thread.sleep(syncEntry * 100L)
counter.dec()
println("I'm ending $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
}
}
}
And it limits concurrency perfectly.
But as soon as I change Thread.sleep(...) to delay(...) - well, see for yourself. I was surprised too :)robin
02/28/2020, 4:55 PMdelay
), it might suspend and give back the thread it runs on to another coroutine that's waiting to be executed. Which is one of the major benefits of programming with coroutines - why completely block the existing thread while you're waiting, when other work could be done on it? So using a limited thread pool is only a means to restrict how many actual OS threads can be occupied with the Jobs, but not how many are run on them. Keeping the code inside the coroutine completely blocking of course circumvents that, but that's more an accident than desired behavior, really. If you want to actually restrict the number of Jobs that run concurrently, then yes, a Semaphore is exactly what you would use for that.Dispatchers.Default
and <http://Dispatchers.IO|Dispatchers.IO>
to distinguish between CPU- and IO-intensive tasks, and the underlying dynamic thread pool will handle distribution of those tasks optimally for you, even going as far as not needing an actual Thread switch when switching to IO context. You can focus on what's running and the relationships between everything that's running, without worrying about what's running where.Jakub Gwóźdź
02/28/2020, 9:42 PMrobin
02/29/2020, 4:39 PMMaurice Jouvet
03/01/2020, 10:47 AMdarkmoon_uk
03/02/2020, 6:22 AMJakub Gwóźdź
03/16/2020, 10:19 AM