LINUX.ORG.RU

python - распределенные вычисления

 , , ,


1

1

Подскажите, это уже где-то реализовано:

Задача - Конвеерная распределенная обработка данных. в данный момент - реализация на python (воркер) + tarantool queue (очереди - хранение параметров шагов) т.е. грубо говоря скачай видео (аудио, еще что-то), обработай, посчитай показатели, залей куда-то результаты. вся обработка предполагает использование разных ресурсов и разумно делать разные типы заданий на разных железяках так-же нужно горизонтально масштабироваться так-же хотелось бы имлементировать логику конвеера в одном месте.

т.е. что-то вроде
список_урлов = получить_список() # вызывается где угодно
цикл по список_урлов:
список_файлов_скачаных += скачай_файл() # вызывается на машинах с сетью и дисками
склейка =склей_файлы(список_файлов_скачаных ) # вызывается на машинах с толстым процом
расчитай_показатели(склейка ) # вызывается на машинах с gpu

вот логику выше - не хотелось бы размазывать по куче мест, и держать в одном (сейчас это написанно в коде шага (т.е. шаг знает - какой будет следующий). это ведет к сложности изменений / сложности писания разных цепочек обработки. хочется все более-менее нативно. при этом в задаче используются очереди, и в какую очередь - какое задание добавлять - опять-же в коде шага.



Последнее исправление: pumpurumer (всего исправлений: 2)

Не очень понял какого конкретно ты ежа пытаешься родить, но вероятно сгодился бы apache airflow + celery workers

phoen ★★
()
Последнее исправление: phoen (всего исправлений: 1)
Ответ на: комментарий от phoen

хочу что-то вроде:

задание: период времени, место

список_файлов = []
цикл по времени, квант 4 часа:
    урла = await(any_worker) ПолучиУрлуДляСкачки(период)
    список_файлов += await(dl_worker) СкачайПоУрле(урла)

целевойФайл = await(cpu_worker) СклейФайлы(список_файлов)
целевойФайл = await(cpu_worker) СнижениеFPS(целевойФайл)

для каждой области цикл:
    await(gpu_worker) РасчитайПоказателиПоОбласти(целевойФайл, область)

вот эту логику хочу в одном месте держать.

сейчас это написанно так:

Задание_ПолучиУрлуДляСкачи()
  получаем_пачку_урлов
  создаем_пачку_заданий_на_скачку
  создаем_задание_проверки_что_все_скачалось

Задание_скачки()
    качаем_что_нам_сказали_в_файл

Задание_проверки_скачки()
    Проверяем_что_все_скачали
    Если не_скачали: создаем_задание_проверки_что_все_скачалось
    Иначе: создаем_задание_на_склейку

т.е. не хочу размазывать логику последовательности выполнения заданий. как будно await не просто аснихронно выполняет, но еще и выбирает на какой машине это выполнять (в зависимости от параметров)

celery - начал смотреть
ci - там нет удобной передачи параметров между заданиями. и тут вопрос о сотнях экземплярах одной цепочки. цепочек разных - примерно 10.
и между цепочками - переиспользование кода.

pumpurumer
() автор топика
Последнее исправление: pumpurumer (всего исправлений: 2)
Ответ на: комментарий от pumpurumer

Очень похоже что тебе нужен airflow: это и шедуллер, и своего рода оркестратор, и оперирует он понятиями DAG (directed acyclic graph) состоящими из тасков (крайне похожими на твой псевдокод).

Единственная проблема которая может возникнуть - назначать задачу на конкретный воркер, но вроде и она решаема через назначение конкретной queue для таска.

phoen ★★
()
Последнее исправление: phoen (всего исправлений: 3)

Можно на питоне дергать инфраструктуру Hadoop, но там всё размажется и свой подход MapReduce с которым надо уметь работать. Из плюшек можно очень-очень легко и сильно масштабироваться, были бы деньги на железо. Есть конечно ограничения, но они достаточно большие, чтобы у тебя не было проблем, если только ты не делаешь что-то больше мордокниги.

peregrine ★★★★★
()
Последнее исправление: peregrine (всего исправлений: 2)

А мог бы использовать GNU Parallel.

anonymous
()
Ответ на: комментарий от peregrine

Не, там человек под распределёнными вычислениями не то имеет в виду. Просто надо запускать таски на нужных машинах по шедуллеру и всё, т.е. вычислений как таковых и нет.

p.s. С MR в хадупе не так всё мрачно, есть же и PySpark (spark для адептов scala) и hive с его sql-like.

phoen ★★
()
Ответ на: комментарий от phoen

airflow - это то что мне нужно. есть операторы (фигни которое что-то делают), есть сенсоры (которые определяют когда делать), есть DAG (который по факту - в терминах выше - цепочка. описывает взаимосвязи операторов и сенсоров). граф взаимосвязей, что за чем делать.

Из немаловажных для меня фичей - можно породить несколько заданий, потом дождатся всех, или например дождатся / проверить наличие неких результатов от внешней системы, которая слоупочит с разными таймингами.

и похоже есть перезапуск с точки падения. это тоже очень важно, при разработке / отладке / разборе инцидентов.

Осталось только понять, работает ли это с 3тим питоном, потому что в 2018 оно было на втором питоне.

Всем спасибо за внимание, пойду выкидывать свой велосипед :)

pumpurumer
() автор топика
Последнее исправление: pumpurumer (всего исправлений: 1)
Ответ на: комментарий от pumpurumer

Осталось только понять, работает ли это с 3тим питоном, потому что в 2018 оно было на втором питоне.

Работает, у меня на 3.6 так и живёт для кучи ETL процессов.

phoen ★★
()
Ответ на: комментарий от pumpurumer

У нас пробовали использовать Airflow, но потом перешли на Luigi. По причине того, что сам по себе запущенный Airflow потреблял порядочно ресурсов и грел ноутбуки разработчиков (тогда как, по идее, его работа должна быть незаметна для всего остального).

Вообще по постановке задача похожа на ту, которую выполняют шедулеры на кластерах, такие как slurm или pbs. Но они скорее про подбор по ресурсам.

sshestov
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.