Есть такая задача: есть большая база данных, оттуда нужно выгребать записи, параллельно обрабатывать и класть обратно. Записей много, поэтому выгребать нужно постепенно. Параллельных процессов тоже много, поэтому не хочется на каждый заводить отдельное подключение к БД.
Итого, нужно в основном процессе выгребать данные, отправлять worker'ам, получать от worker'ов результаты и складывать в базу. Как это лаконичнее всего организовать на питоне?
Первая идея была использовать готовый Pool. Написал тестовую прогу:
import multiprocessing
import sys
import time
def Process(arg):
    print("processing {}".format(arg), file=sys.stderr)
    time.sleep(1)
    return 10000 + arg
def Generate():
    for val in range(1000):
        print("generating {}".format(val), file=sys.stderr)
        yield val
if __name__ == '__main__':
    pool = multiprocessing.Pool(10)
    for res in pool.imap(Process, Generate(), 1):
        print("finalizing {}".format(res), file=sys.stderr)
    pool.close()
    pool.join()
На практике, понятно, в Generate() будет чтение из базы, а в цикле по imap - запись.
Вроде всё зашибись, кроме того что все 1000 значений генерируются в начале программы. На практике это будет означать что программа сразу выгребет из базы кучу данных и выжрет всю память. Причём если в Generate поставить sleep то всё хорошо, обработчики будут ждать данных, но на практике они работают медленнее базы. Обернуть в какую-нибудь lazy штуку не получится, потому что тогда она будет выполняться в worker процессе где не будет подключения к бд. Что можно сделать чтобы imap просил следующую порцию данных только когда у него есть свободный процесс или место в (ограниченной) очереди для её обработки?
Или придётся всё писать руками? Руками создавать N процессов, две очереди (для заданий worker'ам и для ответов от них)? Тогда я не могу придумать как красиво обрабатывать эти две очереди, поскольку нужен аналог select'а, иначе получим дедлоки. Вырисовывается такой уродец:
tasks_in_fly = 0
while True:
  task = get_next_from_db();
  if not task:
    # задания закончились
    break
  try:
    # нельзя просто put, иначе получим deadlock если обе очереди заполнены
    task_queue.put_nowait(task)
    tasks_in_fly += 1
  except queue.Full:
    # если места в очереди нет, нужно разобрать ответы
    try:
      while True:
        # выгребать ответы пока очередь не опустеет и не кинется исключение
        result = result_queue.get_nowait()
        tasks_in_fly -= 1
        put_to_db(result)
    except queue.Empty:
      pass
# и ещё нужно дождаться последних ответов и оработать их
while tasks_in_fly:
  result = result_queue.get()
  tasks_in_fly -= 1
  put_to_db(result)
Если только так, может есть идеи как этот код улучшить?








