LINUX.ORG.RU

Python celery, многопоточно воркеры, однопоточно обработку результатов

 ,


0

2

Всем доброго дня!

Изучаю celery, есть неясности. Прошу пожалуйста помочь кто в теме.

Дано:

  • 100 миллионов тасков которые надо выполнить, они лежат в базе sqlite в виде разбросанных данных
  • Скрипт воркер, который обрабатывает таски (он запускается посредством celery)
  • Функция db_dumper, которая принимает на входе результат таска и пишет в нужном для меня виде результат в sqlite
  • Celery 3.1.24

Задачи:

  1. Добавить ещё 1 поток который параллельно занимался бы сбором результатов всех успешных тасков, и складывал результаты в sqlite посредством моей функции db_dumper
  2. Как-то нужно удалять успешные таски после п.1, настройки celery сейчас такие:
    BROKER_TRANSPORT='amqp',
    CELERY_RESULT_BACKEND='rpc://'
    на rpc почему-то не пашет result.forget() для таска

Как это можно сделать?

Что пробовал:

  1. Ставил CELERY_RESULT_BACKEND sqlite базу, мне не понравилось что туда пишется информация в своём формате (id таска, статус, результат в бинарном формате), мне это не подходит
  2. Пробовал сделать как в пункте выше, но затем туда подключаться и оттуда выгребать информацию о тасках, получается много лишних движений, хочется это лишнее звено убрать и напрямую из celery брать успешные таски и их результаты писать в базу как мне надо

Ответ на: комментарий от rubro

Ну почему же, можно отнаследоваться от DatabaseBackend и переопределить необходимые методы. Затем в CELERY_RESULT_BACKEND указываешь my_project.celery:MyResultBackend и по идее должно завестись.

Kilte ★★★★★
()
Ответ на: комментарий от Kilte

Хотя не, там всё переопределять придётся. Проще свой запилить. Но всё равно там вроде ничего сложного нет.

Kilte ★★★★★
()
Ответ на: комментарий от Kilte

Большое спасибо Вам за идеи, пока что побаиваюсь туда суваться со своими слабыми скиллами. Если никто более простых вариантов тут не предложит то придётся погружаться туда.

rubro
() автор топика

Как это можно сделать?

Можно «как» заменить на «за сколько» и переместить это в джобс. Вероятность успеха будет сильно больше, чем 0.

Сейчас ты нам просто ретранслируешь задачу, которую тебе поставил работадатель.

Изучаю celery

Ты двигаешься в правильном направлении! По большей части все что ты хочешь уточнить есть в документации.

PS: Термины многопоточно-однопоточно используются некорректно, воркеры масштабируются по процессам (не потоков).

BigAlex ★★★
()
Ответ на: комментарий от BigAlex

Дорогой и многоуважаемый Кэп, я думаю Вы в целом правы, но в данном случае мне Ваши решения не подходят так или иначе. Если у Вас что-то более достойное появится, то напишите пожалуйста.

rubro
() автор топика
Ответ на: комментарий от HeipaVai1o

Я, кстати, так в своё время и сделал для одного проекта. Теперь мои волосы мягкие и шелковистые. Хотя там и своих проблем хватает.

Kilte ★★★★★
()

А, собственно, не дампить ли тебе результат в БД и получать из отдельного процесса? Только sqlite выкинуть придётся, пожалуй.

з.ы. в celery я понимаю только её назначение. Помнится, юзал для 1 велосипеда, но вспоминать я, конечно, не буду.

з.ы.2. я могу ошибаться и не рекомендую изать мои решения никому и ни в какой ситуации.

alex4321
()
Ответ на: комментарий от alex4321

@alex4321, sqlite мне нужен как воздух

Ребят всем спасибо за идеи, пока что остановился на такой схеме. Использую result_backend='rpc//' то есть результаты работы тасков улетают в обычную AMQP очередь rabbitmq, далее я планирую к этой очереди подрубаться из другого скрипта и делать что надо, то есть дампить данные в sqlite.

Сейчас вдруг выяснилось что celery результаты работы тасков пишет в очередь с именем вида 840a9030-a280-3ace-8f28-b010f1d9ccd8, ЕСЛИ кто знает как при отправке таске указать имя очереди куда надо складывать результаты то напишите пожалуйста.

rubro
() автор топика

Коллеги может кто знает, как celery указать в какую очередь результаты тасков складывать? Сейчас создаёт рандомную очередь и туда всё пишет

rubro
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.