LINUX.ORG.RU

Правильный multiprocessing pool на python

 , ,


1

2

Есть такая задача: есть большая база данных, оттуда нужно выгребать записи, параллельно обрабатывать и класть обратно. Записей много, поэтому выгребать нужно постепенно. Параллельных процессов тоже много, поэтому не хочется на каждый заводить отдельное подключение к БД.

Итого, нужно в основном процессе выгребать данные, отправлять worker'ам, получать от worker'ов результаты и складывать в базу. Как это лаконичнее всего организовать на питоне?

Первая идея была использовать готовый Pool. Написал тестовую прогу:

import multiprocessing
import sys
import time

def Process(arg):
    print("processing {}".format(arg), file=sys.stderr)
    time.sleep(1)
    return 10000 + arg

def Generate():
    for val in range(1000):
        print("generating {}".format(val), file=sys.stderr)
        yield val

if __name__ == '__main__':
    pool = multiprocessing.Pool(10)

    for res in pool.imap(Process, Generate(), 1):
        print("finalizing {}".format(res), file=sys.stderr)

    pool.close()
    pool.join()

На практике, понятно, в Generate() будет чтение из базы, а в цикле по imap - запись.

Вроде всё зашибись, кроме того что все 1000 значений генерируются в начале программы. На практике это будет означать что программа сразу выгребет из базы кучу данных и выжрет всю память. Причём если в Generate поставить sleep то всё хорошо, обработчики будут ждать данных, но на практике они работают медленнее базы. Обернуть в какую-нибудь lazy штуку не получится, потому что тогда она будет выполняться в worker процессе где не будет подключения к бд. Что можно сделать чтобы imap просил следующую порцию данных только когда у него есть свободный процесс или место в (ограниченной) очереди для её обработки?

Или придётся всё писать руками? Руками создавать N процессов, две очереди (для заданий worker'ам и для ответов от них)? Тогда я не могу придумать как красиво обрабатывать эти две очереди, поскольку нужен аналог select'а, иначе получим дедлоки. Вырисовывается такой уродец:

tasks_in_fly = 0

while True:
  task = get_next_from_db();

  if not task:
    # задания закончились
    break

  try:
    # нельзя просто put, иначе получим deadlock если обе очереди заполнены
    task_queue.put_nowait(task)
    tasks_in_fly += 1
  except queue.Full:
    # если места в очереди нет, нужно разобрать ответы
    try:
      while True:
        # выгребать ответы пока очередь не опустеет и не кинется исключение
        result = result_queue.get_nowait()
        tasks_in_fly -= 1

        put_to_db(result)
    except queue.Empty:
      pass

# и ещё нужно дождаться последних ответов и оработать их
while tasks_in_fly:
  result = result_queue.get()
  tasks_in_fly -= 1

  put_to_db(result)

Если только так, может есть идеи как этот код улучшить?

★★★★★

О, нашёл. Нужно взять multiprocessing.BoundedSemaphore, захватывать его в генераторе и освобождать после обработки. Теперь смущает только одно: при нажатии Ctrl+C всё виснет вместо осмысленного действия (убить процесс как всегда, или хотя бы ничего не делать).

slovazap ★★★★★ ()
Ответ на: комментарий от slovazap

Блджад. В реальной программе даже без семафора оно не выжирает много памяти. Ну ок, значит первый вариант работает.

slovazap ★★★★★ ()
Ответ на: комментарий от slovazap

Добро пожаловать в увлекательный мир Питона: никто не знает что, когда и как происходит. Прогноз невозможен, планирование бессмысленно.

MimisGotAPlan ()

что-то отличное от питона почему бы не юзнуть? Зачем обрабатывать данные с использованием мультипроцессинга или какого-нибудь мультитрединга на языке, который придуман для, и ни для чего больше не годен, как для обучения умственно отсталых азам кодинга, а также для простенького скриптования в шелле и в вебе?

lovesan ★☆ ()
Ответ на: комментарий от lovesan

на языке, который придуман для, и ни для чего больше не годен, как для обучения умственно отсталых азам кодинга, а также для простенького скриптования в шелле и в вебе?

Лол :-) А скажи тогда, почему корпорации Nvidia и Intel поддерживают оптимизации Питона для быстрого выполнения на их GPU/CPU? :-) А вот умным пользователям языка Common Lisp приходится довольствоваться какой-то там библиотечкой cl-cuda в тоже время самое :-)

Кстати, как там SBCL под Windows? :-) Python работает отлично, а SBCL? :-) Борьба продолжается? :-)

anonymous ()
Ответ на: комментарий от slovazap

вот так закрываться по Ctrl+C:

if __name__ == '__main__':
    
    try :
        ms = Sender()
        ms.startWebServer(8081, **webSettings)
        
        ms.startJobManager(**jobSettings)
        
        #logger.debug("hehe")
    except KeyboardInterrupt :
        #signal_term_handler(signal.SIGTERM, None)
        raise "KeyboardInterrupt - Interrupted"

bvn13 ★★★★★ ()
Последнее исправление: bvn13 (всего исправлений: 1)
Ответ на: комментарий от slovazap

Нужно взять multiprocessing.BoundedSemaphore

Используй. Если не нагуглишь, дам больше кода.

from multiprocessing import Manager, Lock

bvn13 ★★★★★ ()
Ответ на: комментарий от anonymous

SBCL работает сто лет как.

Python же не работает. Если мы выйдем за рамки хелловорлдов, то, например, окажется, что винда - вся про треды, там даже форка нет(не считая WSL), и не зря. Многопроцессность там сильно дорогая и ее избегают. Соответственно, выходит, что эти костылики для обхода GIL на винде неюзабельны. Ровно как и работа со сколько-нибудь сложными фичами WinAPI, по причине сломанного мультитрединга в питоне.

А скажи тогда, почему корпорации Nvidia и Intel поддерживают оптимизации Питона для быстрого выполнения на их GPU/CPU?

Потому же, почему корпорация Oracle поддерживает оптимизации для быстрого выполнения JVM. Это ничего не говорит ни о Java как о прикладном инструменте, ни о концептуальном качестве Java/JVM, которое, понятное дело, никудышное, но хоть получше Python. Это говорит о том, что разные идиоты пишут на этом, и от них идут запросы.

lovesan ★☆ ()
Ответ на: комментарий от lovesan

SBCL работает сто лет как.

А можно с CUDA из SBCL под Windows работать? :-)

по причине сломанного мультитрединга в питоне.

А в SBCL многопоточность не сломана, разве? :-) Нет ли там дедлоков при нагрузке? :-) Как оно там называется, freeze_world() или что-то в этом роде :-) Это когда сборщик вызывается, лучше не дышать в этот момент на SBCL :-)

Потому же, почему корпорация Oracle поддерживает оптимизации для быстрого выполнения JVM. Это ничего не говорит ни о Java как о прикладном инструменте

Это говорит о больших деньгах, которые вкладываются корпорациями :-) А там, где деньги, там и кадровый ресурс :-) А там, где кадровый ресурс, там и делаются реальные дела :-) Такие вот дела :-)

Но я не буду, конечно, спорить о том, что Python - слишком простой язык, который не позволяет вдоволь поиграть интеллектом до кошмариков со скобками перед глазами в ночное время :-) Python (или Java) - это для рутинных дел, а не для ЧСВ, конечно же :-)

anonymous ()
Ответ на: комментарий от anonymous

Java для рутинных дел еще туда-сюда подходит. Python же это недоделанная скриптовая игрушка для слабоумных. На уровне PHP(на нем кстати тоже, «рутинные дела», делают).

А в SBCL многопоточность не сломана, разве? :-) Нет ли там дедлоков при нагрузке? :-) Как оно там называется, freeze_world() или что-то в этом роде :-) Это когда сборщик вызывается, лучше не дышать в этот момент на SBCL :-)

Для тех кто в танке - в SBCL generational сборщик мусора с кучей оптимизаций. Не дыши лучше в питоне, пока там за refcount убогий mark-n-sweep говно подтирает.

Это говорит о больших деньгах, которые вкладываются корпорациями :-) А там, где деньги, там и кадровый ресурс :-) А там, где кадровый ресурс, там и делаются реальные дела :-) Такие вот дела :-)

В наркоторговлю и проституцию тоже вкладывают деньги и кадровый ресурс. Реальные дела, чо.

lovesan ★☆ ()

Люблю питон, но сколько не писал многопроцессные проги на нём - это ад и израиль, вкуривать их абстракции, именно с этими пуллами. По сравнению со swift3/objective-c и Grand Central Dispatch, питонячьи костыли просто позор.

menangen ★★★★★ ()
Ответ на: комментарий от lovesan

Java
Python
PHP(на нем кстати тоже, «рутинные дела», делают).

Но ты ведь отдаёшь себе отчёт в том, что на всех перечисленных тобой языках зарабатывают миллиарды, в отличии от :-) Да, языки т.з. теории не идеальны, мягко скажем :-) Только это не сильно волнует ни работодателя, ни конечного пользователя :-) Меня не волнует, что ЛОР написан на Java :-) Хоть на Коболе пусть будет написан, главное что вот пишу очередной смайлик :-)

Для тех кто в танке - в SBCL generational сборщик мусора с кучей оптимизаций.

Это круто, но как насчёт CUDA из SBCL под Windows? :-) А то мне надо :-)

anonymous ()

1. Самый простой путь - это когда сам воркер и генерирует данные и их обрабатывает. 2. Если невозможен пункт 1, тогда две очереди. И если одно из очередей выполняется быстрей другой, то самый простой вариант - лимитировать самую быструю очередь. Т.е, допустим, имеем лимит 100. Нагенерили 100 ? Останавливаемся и ждем, пока записей станет меньше 100. Таким образом у нас всегда будут записи для более медленной очереди т.е медленная очередь всегда будет работать.

Xwo ()

Работа с записью может осуществляться с помощью

collection.deque

Xwo ()

Тред и топик не читал, но multiprocessing это ппц. Короче, мы свой пул процессов написали, с блэкджеком и шлюхами. На что обращать внимание:

1) прочитать все доки целиком, особенно про работу очередей и join()

2) тщательно писать обработчики сигналов

3) понимать как работает fork()

4) избегать расшареных переменных.

5) особое внимание уделить коду остановки.

все 1000 значений генерируются в начале программы. На практике это будет означать что программа сразу выгребет из базы кучу данных и выжрет всю память.

Нагрузку на память регулировать кол-вом воркеров и глубиной очереди заданий.

На счёт селекта: просто сделай два потока: один кладёт задания, другой выгребает результаты.

В общем, удачи.

true_admin ★★★★★ ()
Ответ на: комментарий от menangen

многопроцессные проги на нём - это ад и израиль, вкуривать их абстракции, именно с этими пуллами.

+1, multiprocessing это фееричные грабли.

PS может, ТС поможет. Мы в дочерних сигналах блокируем все сигналы, остановку делаем через обработку multiprocessing.Event который выставляется из родительского процесса. Ну и инициализацию ресурсов делаем после fork(), по понятным причинам.

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

ТСу поможет только переход на Go и горутины вместо запаров на питоне «в 4 процесса с пайпами».

menangen ★★★★★ ()
Ответ на: комментарий от lovesan

Python же не работает.

Но почему-то активно используется.

SBCL работает сто лет как.

Но почему-то почти никому не нужен.

templarrr ★★★★★ ()
Ответ на: комментарий от bvn13

Ты бот что-ли? Ещё раз, в этом коде нет никакого смысла, к поведению по Ctrl+C он отношения не имеет.

slovazap ★★★★★ ()
Ответ на: комментарий от menangen

ТСу поможет только переход на Go и горутины вместо запаров на питоне «в 4 процесса с пайпами»

Не поможет. Так как на Go это пишется через жопу, только уже в другом месте. Но жопа никуда не девается. ТСу больно от того что он не владеет системным програмированием, как и ты. Но специально для таких придумали Celery, rq, dsq и иже с ними.

anonymous ()

Вроде всё зашибись, кроме того что все 1000 значений генерируются в начале программы.

Генератор ограничен размером пула, так что не бойся.

anonymous ()
Ответ на: комментарий от anonymous

Поправочка, давно дело было, генератор ограничен размером буфера сокета.

import multiprocessing
import sys
import time

def Process(arg):
    print("processing {}".format(arg), file=sys.stderr)
    time.sleep(1)
    return len(arg)


def Generate():
    for val in range(1000):
        print("generating {}".format(val), file=sys.stderr)
        yield str(val) * 100  # "большие" данные

if __name__ == '__main__':
    pool = multiprocessing.Pool(10)

    for res in pool.imap(Process, Generate(), 1):
        print("finalizing {}".format(res), file=sys.stderr)

    pool.close()
    pool.join()

Но в твоем случае, выборка из базы, большие пересылаемые куски оно как раз будет упираться в пропускную способность воркеров, так что все норм.

anonymous ()
Ответ на: комментарий от slovazap

Блджад. В реальной программе даже без семафора оно не выжирает много памяти. Ну ок, значит первый вариант работает.

О, пропустил это сообщение, собственно объяснение почему — чуть выше.

anonymous ()
Ответ на: комментарий от kachan

asyncio почему бы просто не юзнуть?

Потому что ТС не упирается в IO. И вообще, asyncio очень редко когда нужен, только управлять сокетами в количестве > 1k штук. Но тебе до этого еще дорасти надо.

anonymous ()
Ответ на: комментарий от anonymous

А вот нифига. На Go задача тс'а пилится в три притопа, два прихлопа, просто ты не в теме.

menangen ★★★★★ ()
Ответ на: комментарий от anonymous

Эм, если ТС не упирается в IO, сложных задач нет, то нахера педалить велосипед если на asyncio все пишется за 20 минут?

kachan ★★ ()
Ответ на: комментарий от menangen

На Go задача тс'а пилится в три притопа, два прихлопа, просто ты не в теме.

ТС уже напилил рабочий вариант еще в оп-посте, просто на интах ему показалось, что он не рабочий, так что все у него нормально, без go.

anonymous ()
Ответ на: комментарий от lovesan

Python же не работает.

Лежали всем офисом. Пятница удалась.

th3m3 ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.