https://kotlinlang.org logo
#russian
Title
# russian
a

Alexjok

07/30/2018, 4:47 AM
launch в main лишний, не убрал. Пример взят с оф гайда на гитхаб, идея простая, скажем в launchprocessor из produce продюсируем xml в launch там их разбираем и записываем в БД, на каждый produce и launch своя корутина. Описанный пример - моя фантазия, но пока не клеится с реализацией
g

gildor

07/30/2018, 5:00 AM
я не вижу изменений, все равно launch в consumeEach и в launchProcessor, и главная пробелма с чтением двумя вложеными корутинами одного и того же канала все так же осталась
Если речь о “Fan-out” примере, то там все как раз верно, там не читаются данные из канала дважды, только процессорами. Там есть несколько процессоров запущеных, они все разом читают один и тот же канал и они получают данные и потом могут с ними что-то делать В примере выше все не так, там сломано чтение из канала и данные сохраняются игнорируя тред сейф в мутабельную переменную. Но да, пример про Fan-out может использоваться для юзкейса с процессингом
На самом деле не понятно зачем в вашм примере нужен producer.consumeEach, так как он читает данные из канала и не является процессором
a

Alexjok

07/30/2018, 5:13 AM
Я правильно понимаю, что если заменить consumeEach на repeat с нужным количеством повторений, то все будет работать корректно?
g

gildor

07/30/2018, 5:17 AM
частично да. Там есть еще проблемы
a

Alexjok

07/30/2018, 5:17 AM
Частично меня не устраивает))
g

gildor

07/30/2018, 5:18 AM
Например так как хочется запустить все процессоры паралельно, то нельзя будет просто на следующей строчке вызвать
println("Количество ${ok.size}")
, так как нужно будет дождаться окончания выполнения всех процессоров
ну и глобально, нужно как то забьлокировать завершение программы, что бы не усложнять пример с ожиданием всех job можно просто добавить delay
ну или все же сложить все Job в лист и дождаться завершения каждой, что бы вывести результат
так же остается проблема с тем что переменная ok не тред сейф и вообще мутабельный стейт
еще нужно убрать не только
producer.consumeEach
но и
launch(CommonPool)
вы же уже запускаете новый процессор внутри launchProcessor, еще раз оборачивать его в корутину не нужно
a

Alexjok

07/30/2018, 5:27 AM
Наверное с листом job'ов самое простое решение, delay не понятно как будет работать. В моем случае, какое-то время нужно на produce xml и потом какое-то на парсинг в launch, как правильно поставить delay в таком случае я не знаю. Нужно, чтобы job куда-то были подвязаны, подумать надо
И ещё, как все таки consumeEach работает, я вот думал, что он весь поток продюсирует
g

gildor

07/30/2018, 5:27 AM
про delay я просто для примера
consumeEach не продюсирует, он читает значения из канала
т.е. сейчас спрева читаете значение из канала, потом этот же канал отдаете процессору, где он тоже уже следующие значение читает
a

Alexjok

07/30/2018, 5:28 AM
Теперь понятно почему примерно половину пишет, спасибо
g

gildor

07/30/2018, 5:30 AM
Что-то вроде этого должно у вас получится, я поправил замеченые проблемы и замнил лист результатов просто атомик каунтером, в реальности вы будете писать в файл или базу и там не должно быть проблем https://gist.github.com/gildor/bcc5cad08f6b7226f6bba98778bee99e
a

Alexjok

07/30/2018, 5:32 AM
Андрей, спасибо большое за ваши ответы) Пойду ковыряться :)
там только старая версия kotlinx.coroutines, поэтому
.joinAll()
заменен на
.forEach { it.join() }
a

Alexjok

07/30/2018, 5:37 AM
по улице с телефоном иду) но да работает как я себе представлял, спасибо вам
g

gildor

07/30/2018, 5:54 AM
Как вариант проэмулировать хранилище можно через актор, который будет писать данные в одном потоке:
Copy code
val dbData = mutableListOf<Int>()
val actorDb = actor<Int> {
    for (value in channel) {
        // Эмулируем задержку записи
        delay(20)
        dbData += value
    }
}
и потом из процессора просто отправляем данные:
Copy code
actorDb.send(msg)
a

Alexjok

07/30/2018, 6:02 AM
Я до actor ещё не дочитал, меня на channels понесло:)
g

gildor

07/30/2018, 6:03 AM
ну актор и есть обертка над каналом, так что правильно что про каналы нужно сперва почитать
актор в данном примере как таковой не нужен, через него тоже можно сделать, но проще через то как это сделано с голым каналом, просто через актор удобно проэмулировать работу с базой или любым другим асинхронным хранилищем
Обновил пример с использованием актора для сохранения данных вместо просто каунтера
a

Alexjok

07/30/2018, 6:07 AM
Сегодня прикручу реальные данные и парсинг с последующим сохранением в БД, думаю будет круто работать
g

gildor

07/30/2018, 6:09 AM
но имейте в виду, так как “база” в виде актора одно-поточная, то есть запись всех значений не может быть быстрее чем сумарное время записи всех элементов в одном потоке, с этим тоже можно поиграться, эмулируя реальную базу (меняя capacity у актора, это кеш входящих знаений или меняя задержку на запись)
Только имейте в виду, что если сохранение в базу и парсинг блокирующие операции, они будут блокировать CommonPool, что не хорошо, лучше или что бы они были асинхронные или выполнялись на своих диспатчерах
a

Alexjok

07/30/2018, 6:31 AM
в моем кейсе, мы из продюсера отправляем в парсер кусочек xml, затем распаршенный кусок отправляем в, что-то типа валидатора, там так же работаем над данными, какие-то значения меняем и затем делаем сейв в БД. На самом деле парсер тоже своего рода produce, который данные дальше по цепочке передаёт дёргая оттуда, что нужно, валидатор финальная точка с сохранением в базе. Я думаю на каждый кусок запускать по job, то есть сколько у нас кусков xml будет продюсироваться, столько процессов и создавать.