Do you know how could I restrain the number of thr...
# coroutines
m
Do you know how could I restrain the number of threads? I would like to execute a task (fetch & sync data) with only 5 threads. Here what I have done so far, but it’s not working for me.
Copy code
val entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")
        CoroutineScope(entryCoroutineContext).launch {
            val syncEntries = syncEntryDao.findAll()

            syncEntries.forEach { syncEntry ->
                Timber.d("Got this data to sync: $syncEntry") // All data are shown here I exepct just 5 to 5
                syncEntry.locked = true
                syncEntryDao.update(syncEntry)

                val response = webServices.syncEntriesCoroutine(clientName, syncEntry.date, syncEntry.blueAppId, syncEntry.offset, syncEntry.limit)
                if (response.isSuccessful) {
                    ...
                }
            }
r
To me it looks like you're actually not doing any work in parallel - you're doing one
launch
, and everything inside of that runs blocking? The
.forEach
call does not parallelize anything, at least if
syncEntries
is a standard list, and not an already parallelized stream or flow.
m
Nope and it’s my problem. The log shows that: all the output are donne at the same time.
Copy code
Timber.d("Got this data to sync: $syncEntry")
r
I'm having trouble understanding your desired outcome. Right now, your code is iterating through all entries in your
syncEntries
list, on a single thread, one by one in sequence (not at the same time). What you could do instead is this:
Copy code
syncEntries.forEach { syncEntry ->
  launch(entryCoroutineContext) {
    Timber.d("Got this data to sync: $syncEntry")
    // etc.
  }
}
This will launch a new coroutine for every syncEntry, and will run all these coroutines on your pool of 5 threads concurrently. I'm not sure that's what you actually want, though.
m
Yes thanks a lot. It's what I wanted.
j
So, well, no, it will work only as long as you won't have any suspend call inside your launch. This I learned the hard way 🙂
(or, scratch that... I was using async(pool), not launch(pool), maybe that's why it did not work for me...) Anyway I ended up using semaphore
r
@Jakub Gwóźdź you should be perfectly fine calling suspend functions inside both
launch
and
async
. It might not work if those suspend functions do unexpected stuff, but in general, it should work fine! If it does not, you probably have another bug there.
j
@robin I'm using playground on https://kotlinlang.org to run this snippet:
Copy code
data class Counter(var current:Int = 0, var top:Int = 0) {
    fun inc() { 
        synchronized(this) { 
            current++
            top = top.coerceAtLeast(current) 
        }
    }
    fun dec() {
        synchronized(this) {
            current--
        }
    }
}

val counter = Counter()

val entryCoroutineContext = newFixedThreadPoolContext(5, "sync_entry_fixed")

suspend fun main() = coroutineScope {
    (1..10).forEach { syncEntry ->
  		launch(entryCoroutineContext) {
          	counter.inc()
	        println("I'm starting $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
	        
            Thread.sleep(syncEntry * 100L)

            counter.dec()
	        println("I'm ending $syncEntry in thread ${Thread.currentThread().name}, counter is $counter")
        }
  	}
}
And it limits concurrency perfectly. But as soon as I change Thread.sleep(...) to delay(...) - well, see for yourself. I was surprised too :)
👍 1
newFixedThreadPoolContext might limit the number of threads in RUNNING state, but won't affect the number of started coroutines that are suspended or waiting (but with already aquired resources like db connections or open files). That needs to be limited with a Semaphore, I think
r
Ah, I get what you're talking about now. Yeah, using coroutines on a fixed thread pool will not limit the number of Jobs that actually run concurrently, it will only limit the number of Threads the system will use to run the Jobs, which is an important distinction. Every time a coroutine hits a suspension point (e.g. calling a function like
delay
), it might suspend and give back the thread it runs on to another coroutine that's waiting to be executed. Which is one of the major benefits of programming with coroutines - why completely block the existing thread while you're waiting, when other work could be done on it? So using a limited thread pool is only a means to restrict how many actual OS threads can be occupied with the Jobs, but not how many are run on them. Keeping the code inside the coroutine completely blocking of course circumvents that, but that's more an accident than desired behavior, really. If you want to actually restrict the number of Jobs that run concurrently, then yes, a Semaphore is exactly what you would use for that.
Usually, in a completely coroutines-based program, you should never really have the need to define your own thread pool. Use
Dispatchers.Default
and
<http://Dispatchers.IO|Dispatchers.IO>
to distinguish between CPU- and IO-intensive tasks, and the underlying dynamic thread pool will handle distribution of those tasks optimally for you, even going as far as not needing an actual Thread switch when switching to IO context. You can focus on what's running and the relationships between everything that's running, without worrying about what's running where.
j
And again - in ideal world that would be great, but sometimes you may need to use that one library that does blocking IO and you'd like to have larger amount of threads than these given by Dispatchers.Default 🙂
r
Absolutely, if you're dealing with a blocking library a larger, separate thread pool could make sense! But that's a different situation than what this thread was about initially.
m
No problem, make sense to me what you are talking about. It's really helpful for me, too many misunderstanding in my part. Thanks a lot.
d
@Jakub Gwóźdź Your mix of coroutines, semaphores and `Thread.sleep()`'s brings the cold sweat 😅
j
thread.sleep was only to emulate time-consuming task 🙂