Добрый день! Подскажите пожалуйста по корутинам, п...
# russian
a
Добрый день! Подскажите пожалуйста по корутинам, при использовании JMS разбираю сообщения асинхронно, код имеет примерно следующий вид:
Copy code
ovveride fun onMessage(message: Message) = runBlocking {
   worker()
}

fun worker(msg: String) = GlobalScope.launch
{
    doSomeWork(msg)//какой-то долгий процесс  
}
Как лучше написать, чтобы была возможность контролировать количество одновременно работающих worker?
r
runBlocking { launch { ... } }
заблокирует текущий поток до конца выполнения корутины, потому такая конструкция не имеет смысла.
a
GlobalScope ведь, не заблокирует. Проблема в другом, для примера скажем, что сообщения весят по 100mb и единовременно их может быть от 1 до 10000 и над ними выполняются тяжелые операции. Хочу контролировать количество единовременно обрабатываемых сообщений, явно указать что число одновременных обработчиков 100 и блокировать onMessage если 100 в работе, и брать следующее сообщение только когда один из worker'ов закончит работу.
r
GlobalScope
разве что не будет самостоятельно дожидаться / отменять корутину в связке с родительским скоупом, потому что его нет. А блокировать будет
runBlocking
. Если ограничить нужно до разумного количества, и есть возможность выделить количество потоков, соответствующее максимальному количеству параллельных задач, то это можно сделать на основании
newFixedThreadPoolContext
-- достаточно запустить на нём
actor
, по идее. Если потоков должно быть меньше, чем количество параллельных задач -- нужно руками следить за количеством незавершённых задач, опираясь на какой-нибудь
AtomicInt
и врапая свой
doSomeWork
в инкремент и декремент счётчика.
a
Я если честно не очень понял, что Вы имеете в виду под блокировкой в данном случае.
Copy code
fun main(args: Array<String>) = runBlocking {
       for (i in 0..10) {
           val word = "Hello"
           onMessage(word)
       }
    println("loop is finish now")
    delay(3000)
}


fun onMessage(message: String) = GlobalScope.launch {
    delay(2000)
    println(message)
}
Это ведь неблокирующий код
проблема в ресурсах
Думал реализовать, то о чем @elizarov рассказывал на kotlinconf, только не пойму как List<Reference> заменить на onMessage блок.
r
Да, я выпил кофе и таки сообразил, что
runBlocking
в данном случае ничего не делает, потому что
GlobalScope.launch
сам не саспендится, а это единственная операция в
onMessage
. То есть
runBlocking
можно вообще со спокойной душой убрать. Правда, вы всё ещё, оперируя глобальным скоупом, теряете исключения и возможность сделать graceful shutdown, так как у вас нету конкретного скоупа, в котором запущены все задачи. Можете конкретную ссылку подкинуть, о чём речь идёт? Там несколько часовых докладов всё-таки.
a
Если про доклад, то я об этом

https://www.youtube.com/watch?v=a3agLJQ6vt8

Но чот не получается в текущей задаче поюзать
e
А какая цель? Чего хочется достичь?
a
@elizarov Роман, добрый день! Идея такая, есть MQ очередь из нее прилетают большие текстовые сообщения в заархивированном виде, далее они обрабатываются на бэке - распаковываются, парсятся из них извлекается нужная инфа, происходят обращения по JDBC, по REST итд. После того как сообщение обработано необходимо отправить ответ об успешной или неуспешной обработке. И мне кажется Ваш кейс отлично укладывается в эту концепцию, я хочу обрабатывать сообщения асинхронно и при этом контролировать количество одновременно обрабатываемых сообщений, после обработки отправлять ответ. Вот ссылка это Ваш пример с презентации, только я его переделал для работы со строками. И он у меня не работает, когда количество worker’ов меньше количества сообщений. https://try.kotlinlang.org/#/UserProjects/qk78mcnfm1m16fnf5m6jqrj9ts/djs2m87ma852b3qa6pou966565
e
Вижу одну потенциальную проблему — если все workers заняты (никто не пытается взять задание из locations), то downloader будет ждать на locations.send(…), не сможет сделать contents.recieve, а значит и worker не сможет освободиться (не сможет сделать contents.receiver) — вот вам и дед лок
Починить можно разными способами. Самый простой и правильный способ это сделать у contents и locations буфер размера N_WORKERS:
Copy code
val locations = Channel<String>(N_WORKERS)
 val contents  = Channel<String>(N_WORKERS)
Это, заодно, увеличит пропускную способность, так как гарантированно позволить каждому woker-у выслать свой ответ и начать работать над следующим заданием. Но само по себе не гарантирует отсутствие дедлока (а просто сделает его менее вероятным). Надо еще в
select
переставить местами блоки — первым обрабатывать
contents.onReceive
(всегда забирать ответы если они есть), а только потом (если нет ответов для обработки) делать
references.onReceive
.
Это вообще большая филофоская проблема в системах построенных на каналах. Всё просто пока работа с данными построено по принципу “pipeline” (двигаются в одну сторону), но как только есть циклы (или другие шаблоны типа запрос/ответ), то сразу возникает потенциал такого рода дедлоков. Спасибо, что ко мне с ней постучались.
a
Роман, спасибо за ответ! Все получилось. Я правильно понимаю, что в какой-то момент даже при таком подходе может возникнуть дедлок? Хотел попробовать в production пустить этот кейс (знаю, что experimental), в любом случае будет много тестов на тестовых средах, если все будет ок - может и пойдет в прод.
e
Если сделать как я сказал (и не забыть первым поставить
references.onReceive
), то легко доказать, что дедлока быть не может.
a
Ясно, буду экспериментировать) K👍