launch в main лишний, не убрал. Пример взят с оф г...
# russian
a
launch в main лишний, не убрал. Пример взят с оф гайда на гитхаб, идея простая, скажем в launchprocessor из produce продюсируем xml в launch там их разбираем и записываем в БД, на каждый produce и launch своя корутина. Описанный пример - моя фантазия, но пока не клеится с реализацией
g
я не вижу изменений, все равно launch в consumeEach и в launchProcessor, и главная пробелма с чтением двумя вложеными корутинами одного и того же канала все так же осталась
Если речь о “Fan-out” примере, то там все как раз верно, там не читаются данные из канала дважды, только процессорами. Там есть несколько процессоров запущеных, они все разом читают один и тот же канал и они получают данные и потом могут с ними что-то делать В примере выше все не так, там сломано чтение из канала и данные сохраняются игнорируя тред сейф в мутабельную переменную. Но да, пример про Fan-out может использоваться для юзкейса с процессингом
На самом деле не понятно зачем в вашм примере нужен producer.consumeEach, так как он читает данные из канала и не является процессором
a
Я правильно понимаю, что если заменить consumeEach на repeat с нужным количеством повторений, то все будет работать корректно?
g
частично да. Там есть еще проблемы
a
Частично меня не устраивает))
g
Например так как хочется запустить все процессоры паралельно, то нельзя будет просто на следующей строчке вызвать
println("Количество ${ok.size}")
, так как нужно будет дождаться окончания выполнения всех процессоров
ну и глобально, нужно как то забьлокировать завершение программы, что бы не усложнять пример с ожиданием всех job можно просто добавить delay
ну или все же сложить все Job в лист и дождаться завершения каждой, что бы вывести результат
так же остается проблема с тем что переменная ok не тред сейф и вообще мутабельный стейт
еще нужно убрать не только
producer.consumeEach
но и
launch(CommonPool)
вы же уже запускаете новый процессор внутри launchProcessor, еще раз оборачивать его в корутину не нужно
a
Наверное с листом job'ов самое простое решение, delay не понятно как будет работать. В моем случае, какое-то время нужно на produce xml и потом какое-то на парсинг в launch, как правильно поставить delay в таком случае я не знаю. Нужно, чтобы job куда-то были подвязаны, подумать надо
И ещё, как все таки consumeEach работает, я вот думал, что он весь поток продюсирует
g
про delay я просто для примера
consumeEach не продюсирует, он читает значения из канала
т.е. сейчас спрева читаете значение из канала, потом этот же канал отдаете процессору, где он тоже уже следующие значение читает
a
Теперь понятно почему примерно половину пишет, спасибо
g
Что-то вроде этого должно у вас получится, я поправил замеченые проблемы и замнил лист результатов просто атомик каунтером, в реальности вы будете писать в файл или базу и там не должно быть проблем https://gist.github.com/gildor/bcc5cad08f6b7226f6bba98778bee99e
a
Андрей, спасибо большое за ваши ответы) Пойду ковыряться :)
там только старая версия kotlinx.coroutines, поэтому
.joinAll()
заменен на
.forEach { it.join() }
a
по улице с телефоном иду) но да работает как я себе представлял, спасибо вам
g
Как вариант проэмулировать хранилище можно через актор, который будет писать данные в одном потоке:
Copy code
val dbData = mutableListOf<Int>()
val actorDb = actor<Int> {
    for (value in channel) {
        // Эмулируем задержку записи
        delay(20)
        dbData += value
    }
}
и потом из процессора просто отправляем данные:
Copy code
actorDb.send(msg)
a
Я до actor ещё не дочитал, меня на channels понесло:)
g
ну актор и есть обертка над каналом, так что правильно что про каналы нужно сперва почитать
актор в данном примере как таковой не нужен, через него тоже можно сделать, но проще через то как это сделано с голым каналом, просто через актор удобно проэмулировать работу с базой или любым другим асинхронным хранилищем
Обновил пример с использованием актора для сохранения данных вместо просто каунтера
a
Сегодня прикручу реальные данные и парсинг с последующим сохранением в БД, думаю будет круто работать
g
но имейте в виду, так как “база” в виде актора одно-поточная, то есть запись всех значений не может быть быстрее чем сумарное время записи всех элементов в одном потоке, с этим тоже можно поиграться, эмулируя реальную базу (меняя capacity у актора, это кеш входящих знаений или меняя задержку на запись)
Только имейте в виду, что если сохранение в базу и парсинг блокирующие операции, они будут блокировать CommonPool, что не хорошо, лучше или что бы они были асинхронные или выполнялись на своих диспатчерах
a
в моем кейсе, мы из продюсера отправляем в парсер кусочек xml, затем распаршенный кусок отправляем в, что-то типа валидатора, там так же работаем над данными, какие-то значения меняем и затем делаем сейв в БД. На самом деле парсер тоже своего рода produce, который данные дальше по цепочке передаёт дёргая оттуда, что нужно, валидатор финальная точка с сохранением в базе. Я думаю на каждый кусок запускать по job, то есть сколько у нас кусков xml будет продюсироваться, столько процессов и создавать.