LINUX.ORG.RU

4
Всего сообщений: 23

Можно ли использовать celery без rabbitmq и redis? Database only

Можно ли использовать celery без rabbitmq и redis? Чтобы работало исключительно с базой данных. celery-beats

 ,

bryak ()

Синхронная обработка конечного числа запросов

Есть сервис А который посылает запросы на API сервиса B. Но сервис В не может обрабатывать запросы одновременно (ломается на 2000 в секунду) и есть лимит в 10000 в день. Клиент А это должен учитывать и иметь доступ к переменной которая условно равна количеству доступных запросов на API B в определённый момент времени, что бы её можно можно было отобразить на сайте и притом иметь регулировку по количеству запросов на сервер в секунду. Может я не туда думаю, но это делается через очереди + воркеры или можно проще?

 , , ,

Cirno ()

С помощью каких инструментов такое делается?

Всем привет. Вопрос насчет celery или агрегации сообщений. Обычно веб работает так:

user1          user2    
  |              |
message       message
  |              |
celery        celery
  |              |
task1          task2   и т.д.

Как добиться такого(ниже)? Какой инструмент для этого есть?

   user1   user2      user3 (и т.д.)
    |       |           |
   message  message   message
    |_______|___________|
        |
     aggregator message (data list [] )
        |
      celery
        |
      будет решать одну задачу (потоки, или асинхронное выполнение)

В celery конечно есть всякие методы (group,chunk) для обработки тасков, но вроде чтоб добиться такого нет.

 , ,

bikes ()

periodic task

@task()
def aaa():
    logger.info("some")

Как мне сделать apply_async() и чтобы эта таска выполнялась периодически? Не с помощью декоратора @periodic_task, а именно в коде указать интервал, между запусками

в доке есть вот такой вот пример

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

Но мне надо не при старте, а просто в коде указать

 , ,

Xwo ()

Celery + Peewee

Пытаюсь сейчас использовать для отдельных задач Celery. Первая задача проходит нормально. На всех последующих сыпется: peewee.OperationalError: SSL error: decryption failed or bad record mac

ORM peewee. Пробовал гуглить. Особо полезного мало нашел. Судя по всему, какая-то проблема с переиспользованием соединений. Пробовал использовать базу с пулом соединений. На первой же ошибке валится, что соединение уже открыто.

 , , , ,

crarkie ()

Pyrogram в Celery таске не закрывается.

Возникла такая проблема. Пробую написать обычный скрипт, который подключается к аккаунту телеги на Python, получает информацию обо мне и сразу закрывается:

client.start()
print(client.get_me())
client.stop()

Если же я пробую выполнить такое в Celery таске, то он корректно подключается и на этом зависает. Те печати информации обо мне нету, как и дисконнекта. Последние строчки лога -

[2019-02-17 00:51:32,234: INFO/ForkPoolWorker-2] Connecting...
[2019-02-17 00:51:33,712: INFO/ForkPoolWorker-2] Connected! DC2 - IPv4 - TCPAbridgedO
[2019-02-17 00:51:36,157: INFO/ForkPoolWorker-2] Session initialized: Layer 91
[2019-02-17 00:51:36,157: INFO/ForkPoolWorker-2] Device: CPython 3.6.7 - Pyrogram 🔥 0.11.0
[2019-02-17 00:51:36,158: INFO/ForkPoolWorker-2] System: Linux 4.15.0-43-generic (EN)
Можно ли как-то решить проблему. Как понимаю, проблема тут из-за Celery и асинхронности pyrogram. Но как решить не пойму

 , ,

crarkie ()

Celery запускает планировщик чаще чем указано в настройках

Здравствуйте! Подскажите в чем может быть проблема с Celery worker? Когда запускаю его он начинает выполнять задание чаще чем раз в секунду, хотя стоит интервал в несколько минут.

  • - Запуск воркера: «celery -A market_capitalizations worker -l info -S django»
  • - Запуск бита: «celery market_capitalizations beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler»

Настройки:

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'exchange_rates',
        'django_celery_beat',
        'django_celery_results',
        ]
    TIME_ZONE = 'Europe/Saratov'
    USE_I18N = True
    USE_L10N = True
    USE_TZ = True

    CELERY_BROKER_URL = 'redis://localhost:6379'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = TIME_ZONE 
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
Ссылки на картинки в image хостинге.

При запуске задачи, не отправляется запрос.

Подскажите ,пожалуйста, как сделать чтоб сельдерей подхватывал время задачи с веб страницы и запускал задачу с нее же?

Пробовал запускать задачу через код, но она все равно выполняется чаще чем в секунду.

    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'save_exchange_rates_task',
            'schedule': 600.0,
            #'args': (16, 16)
        },
    }

 , ,

PavelShturm ()

Свопирование записей Redis

Пробую разобраться.
Redis у меня используется для хранения сообщений от брокера - раздача пакетов заданий на вычислительные ноды.
Согласно redis.io, «Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker.»
Но тут проблема. На VDS мало оперативки, а диск - быстрый SSD. Хотелось бы, чтобы по достижении лимита maxmemory, или близко к нему, Redis сбрасывал данные на диск, начиная с самых старых записей. Можно конечно, навелосипедить вручную, установив maxmemory-policy noeviction. Но хочется чтобы БД не ждала до упора, а заранее начинала свопить записи.

При этом, хочется не использовать системный swap.

Или проще в таком случае не заморачиваться, и запихнуть всё в Postgres? Супер-экстремальной производительности мне не требуется, пока достаточно 200-300 записей в секунду, и вряд ли будет сильно больше 1000 записей/сек. Постгрес у меня на сервере всё-равно будет крутиться.

Есть ли простое решение для Redis? P.S.Я решил пока не мучаться с Celery.

 , ,

Mirage1_ ()

Спи спокойно дорогой товарищ

Наверное у всех админов витает в воздухе мысль написать свою собственную вэб-мордочку для централизованного поднятия сервисов и заливки конфигов на сервера. Кому интересна данная тема прошу присоединяйтесь...

https://github.com/dcopm999/ssdt.git

Описание модели deploy:

Service: Перечень сервисов которые используются в вашем проекте

  • name - Название сервиса (nginx, apache2, php-fpm, memcached...)
  • image - Логотип сервиса
  • desc - Описание (реверсивный прокси, ... ) необязательное поле

Address: перечень серверов которые используются в вашем проекте

  • addr - ip адрес сервера
  • auth_type - Тип авторизации (user, ssh-key, ...)
  • username - логин необязательное поле
  • password - пароль необязательное поле

DeployGroup: разделение серверов на логические группы, с одинаковым набором сервисов

  • name - Произвольное название группы серверов
  • servicegroup - Перечень сервисов которые необходимо развернуть в группе серверов
  • addressgroup - Перечень серверов на которых будут развернуты сервисы, указанные в группе сервисов

предполагается два типа авторизации на сверверах:

  • 1) user - авторизация через пользователя имеющего права sudo
  • 2) ssh-key - авторизация рутом через ssh ключ

 , ,

dcopm999 ()

(50$) Python celery, исправить баг

Решено, тема не актуальна.

 , , ,

rubro ()

Python celery, помогите привлечь внимание разработчиков

Дополнение: Проблема решена. Не актуально!

Дополнение: кто хочет за деньги решить проблему вам сюда

Всем доброго дня!

Имеется критический баг в celery, который я очень детально вот здесь расписал разрабочикам.

Проблема в том что разработчики не чешутся по каким-то причинам, баг очень для меня критичный. И я прошу у кого есть возможность и желание помочь апнуть там тему чтобы она наверх снова вылезла.

Если кто в celery шарит и считает что бага нет, то прошу пожалуйста указать на мою ошибку если она есть.

Я также писал создателю celery (товарищ Ask Solen) на мыло, но он проигнорировал.

 ,

rubro ()

Python celery, многопоточно воркеры, однопоточно обработку результатов

Всем доброго дня!

Изучаю celery, есть неясности. Прошу пожалуйста помочь кто в теме.

Дано:

  • 100 миллионов тасков которые надо выполнить, они лежат в базе sqlite в виде разбросанных данных
  • Скрипт воркер, который обрабатывает таски (он запускается посредством celery)
  • Функция db_dumper, которая принимает на входе результат таска и пишет в нужном для меня виде результат в sqlite
  • Celery 3.1.24

Задачи:

  1. Добавить ещё 1 поток который параллельно занимался бы сбором результатов всех успешных тасков, и складывал результаты в sqlite посредством моей функции db_dumper
  2. Как-то нужно удалять успешные таски после п.1, настройки celery сейчас такие:
    BROKER_TRANSPORT='amqp',
    CELERY_RESULT_BACKEND='rpc://'
    на rpc почему-то не пашет result.forget() для таска

Как это можно сделать?

Что пробовал:

  1. Ставил CELERY_RESULT_BACKEND sqlite базу, мне не понравилось что туда пишется информация в своём формате (id таска, статус, результат в бинарном формате), мне это не подходит
  2. Пробовал сделать как в пункте выше, но затем туда подключаться и оттуда выгребать информацию о тасках, получается много лишних движений, хочется это лишнее звено убрать и напрямую из celery брать успешные таски и их результаты писать в базу как мне надо

 ,

rubro ()

Как лучше организовать веб-приложение с долгой обработкой запроса и внешними сервисами?

Собственно, что оно будет делать:

  • запрашивать тексты с 1 сервиса
  • обрабатывать их 2 сервисом
  • возвращать результат обработки

    Притом - обработка может быть весьма долгой.

    Предпочёл бы python (ноды с го это, конечно, хорошо - но не то, освоением чего я бы хотел сейчас заняться). Соответственно - есть мысль сделать следующее :

  • реализовать всю вышеуказанную работу
  • таск под это дело в celery

    Тогда - я смогу запускать задачу в celery, возвращать идентификатор, проверять её статус и по готовности - возвращать инфу о результате.

    Собственно - есть ли явно лучшие методы?

    И да - в python3.5 вроде завезли кучу всего асинхронного. С учётом того, что у меня одно дерганье внешних сервисов - вроде бы может дать профит. Вопрос в том, не будет ли в случае применения celery явных подводных камней?

 ,

alex4321 ()

python3 celery механизм работы, общие вопросы

Всем доброго дня, есть вопросы по celery:

  • 1. Работал сервер, далее его ребутнули, куда деваются таски которые выполнялись в момент выполнения?
  • 2. Верно ли я понимаю что 1 таска = 1 процесс?
  • 3. Можно ли указать сколько процессов юзать до запуска таксков? То есть не в конфиге лазить, а прям перед запуском сказать что вот для этой группы тасков потолок допустим 20 процессов. Требуется это значение менять динамически.
  • 4. Что происходит с тасками после выполнения? У меня такое ощущение что пока я не сделаю вызов .forget() то они (результаты их выполнений) будут болтаться в памяти
  • 5. Добавляю таски, а они все в PENDING, и не ясно почему так, есть механизм как-то форсировать выполнение?
  • 6. Есть ли нормальные туториалы по сабжу? На офф сайте есть немного, но этого мало, я бы хотел посмотреть как в продакшене рабочий код выглядит, разобраться с тем как люди работают, чтобы не городить велосипеды
  • 7. Можно ли собрать исполняемый файл (например exe для windows) да так чтобы celery был уже внутри? И чтобы это всё было без излишней возни

 ,

rubro ()

Celery не может выполнить задачу в режиме демона

Доброго времени суток. Может быть кто-нибудь сталкивался - при запуске celery через supervisor (aptitude install supervisor) или celeryd (и там и там) сталкиваюсь с проблемой - при попытке выполнить задачу в которой используется cx_Oracle возвращается ошибка: ERROR/MainProcess] Unrecoverable error: UnpicklingError('NEWOBJ class argument has NULL tp_new',) И дальше задание прерывается. Если запустить celery от пользователя - /usr/local/bin/celery multi start worker -A django_app - все работает нормально. Собственно меня бы устроил либо вариант авто запуска celery multi без логина либо (предпочтителньый вариант) разобраться как сделать,чтобы задание выполнялось через supervisor нормально. Python - 3 P.S. - пробовал выполнять обычные задачи через supervisor - задача выполняется.

 , ,

Nemui ()

Количество workers и concurrency для celery

В документации прямым текстом говорится, что нужно экспериментировать, чтобы понять какое значение concurrency устанавливать для воркера и сколько этих воркеров создавать, но не говорится как это делать, а для меня это проблема - я действительно не понимаю как подобрать то самое идеальное количество. Может кто сталкивался и может подсказать?

Исходные данные: сервер за, например, 5 долларов на https://www.digitalocean.com/pricing/, на нем крутится nginx и простенький сайт на flask (uwsgi), которому и предстоит работать с celery. Таски для celery отрабатывают не больше, чем за 5-7 секунд.

 , , ,

girel ()

max number of clients reached

В heroku подключен бесплатный rediscloud с лимитом 10 соединений. Задачи celery постоянно выпадают с руганью на превышение количества клиентов redis.

  • Что мешает ему общаться с redis в одном соединении, не превышая лимит?
  • Почему я, подключившись с помощью redis-cli, вижу десяток незакрытых соединений, хотя celeryв этот момент по идее ничего не делает?
  • В чем разница между BROKER_POOL_LIMIT («The maximum number of connections that can be open in the connection pool») и CELERY_REDIS_MAX_CONNECTIONS («Maximum number of connections available in the Redis connection pool»)? Пробовал выставлять первый в 0, оба в 1, ничего не помогло.
  • Почему в некоторых ситуациях <taskname>.delay() вызывает эксэпшн, прерывающий выполнение функции, из которой ставится задание, а в других - эксепшн появляется в логе celery worker'а?

 , , ,

shatsky ()

Celery, проверка результатов асинхронных задач

Все привет. Собираюсь использовать в своем проекте celery, для выполнения некоторых задач. Хочется работы с объектами AsyncResult как с futures. Вот так я работаю с future

def future_wait(f, on_succ=None,
        on_failed=None, on_completed=None):
    def done_callback(f):
        if f.exception():
            if on_failed:
                on_failed(f.exception())
        else:
            if on_succ:
                on_succ(f.result())
        if on_completed:
            on_completed(f)

    f.add_done_callback(done_callback)
    return f

def done_callback(res):
    print res

def failed_callback(res):
    print res

f = do_some_task(...) # f - объект Future

future_wait(f, on_succ=done_callback, on_failed=failed_callback)
Если у future вызвать add_done_callback, то, как только результат таска будет получен, future сам вызовет мой callback. Таким образом, у меня все выполняется асинхронно. А вот в celery такой функции нету, и мне приходится писать некоторый костыль
class ResultKeeper(object):
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(ResultKeeper, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self):
        self.lock = Lock()
        self.results = list()
        self.thread = Thread(target=self.loop)
        self.thread.daemon = True
        self.thread.start()

    def __call__(self, async_res, on_success=None, on_failue=None):
        with self.lock:
            async_res._callback = {states.SUCCESS: on_success,
                                   states.FAILURE: on_failue}
            self.results.append(async_res)

    @property
    def is_alive(self):
        return True

    def process_result(self, async_res):
        callback = async_res._callback.get(async_res.state)
        self.results.remove(async_res)
        if callable(callback):
            callback(async_res.result)

    def loop(self):
        while self.is_alive:
            with self.lock:
                for async_res in iter(self.results):
                    if async_res.ready():
                        self.process_result(async_res)
            sleep(0.2)

    def _remove(self, async_res):
        with self.lock:
            try:
                self.results.remove(async_res)
            except:
                pass


rk = ResultKeeper()
res=convert_audio.delay(src='dfbxdfb', dst='serg')
rk(res,
   on_success=done_callback,
   on_failue=failed_callback)

В loop мне приходится вызывать для каждого результата read(), и потом делать sleep на некоторое время... Мне этот sleep не нравится. Собственно сам вопрос, как избавиться от этого sleep?

 , ,

energyclab ()

Flask, celery, AttributeError: 'Flask' object has no attribute 'user_options'

подключаю к фласку сельдерей

в обработчике хочу подключаться к базе фласка

делаю вот так http://pastebin.com/DprvYwbQ

но получаю какую то странную штуку при запуске сельдерея

AttributeError: 'Flask' object has no attribute 'user_options'

как это поправить? покажите рабочее подобное

 ,

fMad ()

Django + Celery + Virtualenv

Всем привет! Пытаюсь начать использовать в работе django-celery, да вот косячок на уровне запуска демона celeryd выходит, победить не пока не могу. Посему, ежели кто сталкивался с подобной проблемой, поможИте чем можИте).

Общая структура проекта:


(../ - virtualenv)

mysite/
    manage.py
    mysite/
        __init__.py
        settings.py
        urls.py
        wsgi.py
        celery.py
    myapp/
        tasks.py
        ...

Хочу запускать celeryd как обычную службу debian (7.4). Создаю файл «/etc/init.d/celeryd» (https://raw.github.com/ask/celery/master/contrib/generic-init.d/celeryd)

Cтавлю права как +x.

Создаю файл «/etc/default/celeryd» (по http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing):

CELERYD_CHDIR="path/to/project"
CELERY_BIN="path/to/bin/celery" # virtualenv
CELERY_APP="myapp"
CELERYD_MULTI="$CELERYD_CHDIR/manage.py celeryd_multi"

CELERYD_OPTS="--time-limit=300 --concurrency=8 -B"
CELERYD_LOG_FILE=/var/log/celery/%n.log

# Path to celerybeat
CELERYBEAT="$CELERYD_CHDIR/manage.py celerybeat"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"

CELERY_CONFIG_MODULE="settings"

export DJANGO_SETTINGS_MODULE="settings"

Пытаюсь запустить демон celeryd:

/etc/init.d/celeryd start

Выдает:

...
File "/usr/local/lib/python2.7/dist-packages/django/conf/__init__.py", line 132, in __init__
    % (self.SETTINGS_MODULE, e)
ImportError: Could not import settings 'settings' (Is it on sys.path? Is there an import error in the settings file?): No module named settings

Будут какие-нибудь соображения?

 , , ,

Sektor ()