https://kotlinlang.org logo
Title
m

Maurice Jouvet

02/27/2020, 9:44 AM
Do you know how could I restrain the number of threads? I would like to execute a task (fetch & sync data) with only 5 threads. Here what I have done so far, but it’s not working for me.
val entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")
        CoroutineScope(entryCoroutineContext).launch {
            val syncEntries = syncEntryDao.findAll()

            syncEntries.forEach { syncEntry ->
                Timber.d("Got this data to sync: $syncEntry") // All data are shown here I exepct just 5 to 5
                syncEntry.locked = true
                syncEntryDao.update(syncEntry)

                val response = webServices.syncEntriesCoroutine(clientName, syncEntry.date, syncEntry.blueAppId, syncEntry.offset, syncEntry.limit)
                if (response.isSuccessful) {
                    ...
                }
            }
r

robin

02/27/2020, 10:19 AM
To me it looks like you're actually not doing any work in parallel - you're doing one
launch
, and everything inside of that runs blocking? The
.forEach
call does not parallelize anything, at least if
syncEntries
is a standard list, and not an already parallelized stream or flow.
m

Maurice Jouvet

02/27/2020, 10:36 AM
Nope and it’s my problem. The log shows that: all the output are donne at the same time.
Timber.d("Got this data to sync: $syncEntry")
r

robin

02/27/2020, 12:34 PM
I'm having trouble understanding your desired outcome. Right now, your code is iterating through all entries in your
syncEntries
list, on a single thread, one by one in sequence (not at the same time). What you could do instead is this:
syncEntries.forEach { syncEntry ->
  launch(entryCoroutineContext) {
    Timber.d("Got this data to sync: $syncEntry")
    // etc.
  }
}
This will launch a new coroutine for every syncEntry, and will run all these coroutines on your pool of 5 threads concurrently. I'm not sure that's what you actually want, though.
m

Maurice Jouvet

02/27/2020, 1:34 PM
Yes thanks a lot. It's what I wanted.
j

Jakub Gwóźdź

02/28/2020, 8:27 AM
So, well, no, it will work only as long as you won't have any suspend call inside your launch. This I learned the hard way 🙂
(or, scratch that... I was using async(pool), not launch(pool), maybe that's why it did not work for me...) Anyway I ended up using semaphore
r

robin

02/28/2020, 1:39 PM
@Jakub Gwóźdź you should be perfectly fine calling suspend functions inside both
launch
and
async
. It might not work if those suspend functions do unexpected stuff, but in general, it should work fine! If it does not, you probably have another bug there.
j

Jakub Gwóźdź

02/28/2020, 4:43 PM
@robin I'm using playground on https://kotlinlang.org to run this snippet:
data class Counter(var current:Int = 0, var top:Int = 0) {
    fun inc() { 
        synchronized(this) { 
            current++
            top = top.coerceAtLeast(current) 
        }
    }
    fun dec() {
        synchronized(this) {
            current--
        }
    }
}

val counter = Counter()

val entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")

suspend fun main() = coroutineScope {
    (1..10).forEach { syncEntry ->
  		launch(entryCoroutineContext) {
          	counter.inc()
	        println("I'm starting $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
	        
            Thread.sleep(syncEntry * 100L)

            counter.dec()
	        println("I'm ending $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
        }
  	}
}
And it limits concurrency perfectly. But as soon as I change Thread.sleep(...) to delay(...) - well, see for yourself. I was surprised too :)
👍 1
newFixedThreadPoolContext might limit the number of threads in RUNNING state, but won't affect the number of started coroutines that are suspended or waiting (but with already aquired resources like db connections or open files). That needs to be limited with a Semaphore, I think
r

robin

02/28/2020, 4:55 PM
Ah, I get what you're talking about now. Yeah, using coroutines on a fixed thread pool will not limit the number of Jobs that actually run concurrently, it will only limit the number of Threads the system will use to run the Jobs, which is an important distinction. Every time a coroutine hits a suspension point (e.g. calling a function like
delay
), it might suspend and give back the thread it runs on to another coroutine that's waiting to be executed. Which is one of the major benefits of programming with coroutines - why completely block the existing thread while you're waiting, when other work could be done on it? So using a limited thread pool is only a means to restrict how many actual OS threads can be occupied with the Jobs, but not how many are run on them. Keeping the code inside the coroutine completely blocking of course circumvents that, but that's more an accident than desired behavior, really. If you want to actually restrict the number of Jobs that run concurrently, then yes, a Semaphore is exactly what you would use for that.
Usually, in a completely coroutines-based program, you should never really have the need to define your own thread pool. Use
Dispatchers.Default
and
<http://Dispatchers.IO|Dispatchers.IO>
to distinguish between CPU- and IO-intensive tasks, and the underlying dynamic thread pool will handle distribution of those tasks optimally for you, even going as far as not needing an actual Thread switch when switching to IO context. You can focus on what's running and the relationships between everything that's running, without worrying about what's running where.
j

Jakub Gwóźdź

02/28/2020, 9:42 PM
And again - in ideal world that would be great, but sometimes you may need to use that one library that does blocking IO and you'd like to have larger amount of threads than these given by Dispatchers.Default 🙂
r

robin

02/29/2020, 4:39 PM
Absolutely, if you're dealing with a blocking library a larger, separate thread pool could make sense! But that's a different situation than what this thread was about initially.
m

Maurice Jouvet

03/01/2020, 10:47 AM
No problem, make sense to me what you are talking about. It's really helpful for me, too many misunderstanding in my part. Thanks a lot.
d

darkmoon_uk

03/02/2020, 6:22 AM
@Jakub Gwóźdź Your mix of coroutines, semaphores and `Thread.sleep()`'s brings the cold sweat 😅
j

Jakub Gwóźdź

03/16/2020, 10:19 AM
thread.sleep was only to emulate time-consuming task 🙂