LINUX.ORG.RU

Правильный параллелизм в цикле python3

 , ,


1

2

Задача заключается в том что бы в цикле из 300 объектов обрабатывать параллельно несколько (10-15).
нагугли несколько вариантов, но мне показались они ужастно костыльными и плохо читаемыми. Есть реальные примеры решения этой задачи?

Пробовал так:

def main():
    def make_all(item):
        item.check = check(item.host.strip())

    pool = multiprocessing.Pool(processes=app.config['CHECK_THREADS'])
    pool.map(make_all, (item for item in items) )
Получаю ошибку:
AttributeError: Can't pickle local object 'main.<locals>.make_all'

NUM_PROC=15

def processing_func(some_object, p1, p2):
    # Do stuff

with multiprocessing.Pool(processes=NUM_PROC) as pool:
    result = pool.starmap(processing_func, [(o, param1, param2) for o in some_objects])

Всегда так делал®©™, это самый стандартный путь (за исключением объектов с открытыми файлами и т.п., но их нельзя в принципе просто так передавать между процессами; параллельных потоков без поднятия GIL в cpython нетути). И напиши, что конкретно тебя не устраивает в имеющихся решениях.

EDIT: написал выше до исправления топика. Твоя проблема в том, что функция процессинга внутри другой функции и фактически является локальной переменной. Эта переменная спавнится во время выполнения функции, а не раньше. Поэтому после форка функцию процесса найти не получается. Вынеси make_all наружу на уровень модуля.

EDITT: форматирование

EDITTT: Ты еще забыл про то, что у процессов память отдельная. Поэтому тебе нужно вернуть измененные объекты, иначе они останутся только в памяти воркер-процессов, а не вызывающего.

lu4nik ★★★ ()
Последнее исправление: lu4nik (всего исправлений: 3 )

Сначала тебе нужно понять что тебе для этого нужно : multiproc, threads или asyncio. Что за задачи ?

Jopich1 ()
Ответ на: комментарий от Jopich1

Задача заключается в том что бы вытащить 300 объектов из бд, каждый из них обновить если требуется:

item.check = True
....
db.session.commit()

EDITTT: Ты еще забыл про то, что у процессов память отдельная. Поэтому тебе нужно вернуть измененные объекты, иначе они останутся только в памяти воркер-процессов, а не вызывающего.

Кстати да, спасибо это я по глупости забыл.

EDIT: написал выше до исправления топика. Твоя проблема в том, что функция процессинга внутри другой функции и фактически является локальной переменной. Эта переменная спавнится во время выполнения функции, а не раньше. Поэтому после форка функцию процесса найти не получается. Вынеси make_all наружу на уровень модуля.

спасибо, буду знать

noname_user ★★★ ()

Арихитектурно так и надо. Только в map надо глобальную функцию передавать, а не замыкание.

slovazap ★★★★★ ()
Ответ на: комментарий от lu4nik

Такой вопрос, если я передал объект из бд в функцию вынесенную, изменю его и закомичу, это будет правильно? просто я думал реализовать как то так, что бы коммит в бд был уже тогда, когда я изменю все объекты

EDIT: написал выше до исправления топика. Твоя проблема в том, что функция процессинга внутри другой функции и фактически является локальной переменной. Эта переменная спавнится во время выполнения функции, а не раньше. Поэтому после форка функцию процесса найти не получается. Вынеси make_all наружу на уровень модуля.

Попробовал вынести ф-цию на уровень модуля как ты сказал, в моем случае это заканчивается ошибкой, так как flask ругается на выход из контекста приложения (в app.app_context() придется заворачивать вообще весь модуль)

noname_user ★★★ ()
Последнее исправление: noname_user (всего исправлений: 1 )
Ответ на: комментарий от noname_user

Если ты хочешь просто параллельно обновлять БД, то многопроцессность тебе нафиг не сдалась. У тебя код на питоне просто выставляет опции, ORM коммуницирует с БД, сама БД выставляет значения. Во-первых, это всё выглядит как IO-bound, плохой кейс для multiprocessing. Во-вторых, твои 100500 процессов наверняка будут большую часть времени простаивать по процессору, т.к. будут ждать I/O. multiprocessing полезен в основном тогда, когда ты в процессор упираешься.

Также multiprocessing плохо сочетается с (микро)фреймворками, потому что контекст. С threading у тебя будет та же проблема, но мб с async что-то получится, но я его совсем плохо знаю (фласк на самом деле тоже), может кто-то другой подскажет.

lu4nik ★★★ ()
Ответ на: комментарий от noname_user

Задача заключается в том что бы вытащить 300 объектов из бд, каждый из них обновить если требуется

А не проще ли вытащить за 1 запрос все 300 объектов, а затем в транзакции выполнить для всех 300 объектах обновление базы данных?

Попробовал вынести ф-цию на уровень модуля как ты сказал, в моем случае это заканчивается ошибкой, так как flask ругается на выход из контекста приложения (в app.app_context() придется заворачивать вообще весь модуль)

Если у тебя веб-приложение flask, то я бы рекомендовал использовать что-то стандартное для python/web типа Celery

dicos ★★ ()
Ответ на: комментарий от dicos

А не проще ли вытащить за 1 запрос все 300 объектов, а затем в транзакции выполнить для всех 300 объектах обновление базы данных?

Но ведь у этих объектов могут быть разные значения

noname_user ★★★ ()
Ответ на: комментарий от noname_user

Запили процедурку. Всяко будет адекватнее чем мультипроцессингом это делать.

А так, получил бы их одним запросом, проблема в целом бы и решилась, если ты их по одному коммитить не намерен.

P.S. Искренне сочувствую всем бедолагам, у которых питон всё ещё синхронный. Это печально.

WitcherGeralt ★★ ()
Последнее исправление: WitcherGeralt (всего исправлений: 1 )
Ответ на: комментарий от WitcherGeralt

Даже референсный питон не синхронный: он с давних пор умеет в конкуррентные потоки. Отсутствие параллельности с помощью потоков мешает в основном числодробителям, а в задаче ТСа не так важно, что ждет ответа БД: потоки, процессы или асинхронщина какая.

lu4nik ★★★ ()
Ответ на: комментарий от WitcherGeralt

P.S. Искренне сочувствую всем бедолагам, у которых питон всё ещё синхронный. Это печально.

лорчую

eternal_sorrow ★★★★★ ()
Ответ на: комментарий от WitcherGeralt

А причем здесь фласк? Ну да, процессы/потоки из стандартной библиотеки не работают с фласком из-за использования контекста (никогда не понимал, зачем так запилено). Но питон не ограничивается только фласком.

lu4nik ★★★ ()
Ответ на: комментарий от WitcherGeralt

P.S. Искренне сочувствую всем бедолагам, у которых питон всё ещё синхронный. Это печально.

Согласен, но что взять для веба асинхронного по типу flask? что бы быстро это све деплоить в пет-проектах.

Запили процедурку.

ты имеешь ввиду не использовать контекст фреймворка?

noname_user ★★★ ()
Последнее исправление: noname_user (всего исправлений: 1 )
Ответ на: комментарий от noname_user

Процедуру в БД.

Согласен, но что взять для веба асинхронного по типу flask?

Вагон их:

  • Tornado — бессмысленный копролит, но всё таки он на asycnio теперь, так что при желании можно и его.
  • quart — реализует API фласка, разницы не заметишь, и даже некоторые фласковые расширения сможешь юзать.
  • aio-libs/aiohttp — минималистичная библиотека, умеет ещё и в клиент, который дефолтная замена requests сейчас фактически.
  • huge-success/sanic — тоже на фласк очень смахивает, вполне годно.
  • vibora-io/vibora — хз, не юзал.
  • encode/starlette — минималистичный, но самый быстрый фреймворк из представленных.
  • tiangolo/fastapi — основан на starlette (не форк, старлетт под капотом), имеет поддержку тайпхинтов, по состоянию на 2020 это дефотный фреймворк для запила Web API, как фласк когда-то.

Ссаник популярней чем quart, так что вероятно имеет смысл взять его.

WitcherGeralt ★★ ()
Последнее исправление: WitcherGeralt (всего исправлений: 2 )
Ответ на: комментарий от WitcherGeralt

спасибо за список, буду смотреть туда, что бы не костылить ужас

noname_user ★★★ ()
Ответ на: комментарий от noname_user

А не проще ли вытащить за 1 запрос все 300 объектов, а затем в транзакции выполнить для всех 300 объектах обновление базы данных?

Но ведь у этих объектов могут быть разные значения

Я имел ввиду завернуть все 300 sql запросов UPDATE в транзакцию

BEGIN;
update bla-bla-bla SET column = 'tra-ta-ta' WHERE id = 1;
update bla-bla-bla SET column = 'tra-ta-ta-ta' WHERE id = 2;
update bla-bla-bla SET column = 'ha' WHERE id = 3;
COMMIT;
dicos ★★ ()
Ответ на: комментарий от noname_user

Задача заключается в том что бы вытащить 300 объектов из бд, каждый из них обновить если требуется

А разом нет? Так и на локах базы сесть можно. И вообще если ты при этом обновлении жирный диффур не считаешь - звучит как кейс для asyncio, мультипроц тут только время на старт сожрёт.

upcFrost ★★★★★ ()
Для того чтобы оставить комментарий войдите или зарегистрируйтесь.