LINUX.ORG.RU

Синхронизация задач в очереди без явного ожидания

 


1

5

Всем привет!

Имеется некоторый проект числодробилки, в нем красной линией проложена очередь задач. Любой запрос от клиента обрабатывается в задаче.

На основе этого я хочу распараллелить запрос, чтобы он сам генерировал задачи, по завершении которых возможна дальнейшая обработка и возможно генерация очередной порции задач уже следующего этапа и этих этапов может быть много, пока все не посчитаем и в итоге в асинхронном режиме не выдадим результат клиенту.

Классический вариант с ожиданием завершения моей группы задач не подходит - ожидая завершения потоков я блокирую один из небольшого числа потоков, который можно было бы задействовать для обработки других запросов:

// Этап 1
auto taskGroup = threadPool.createTaskGroup ();
while (conditionToCreateManyJobsIsTrue ()) {
   addJob (taskGroup, []() {
     //Do some job
   });
}

// Тут мы блокируем один из потоков очереди
// Хуже еще то, что если количество запросов от клиентов будет больше или равно количеству потоков в очереди, то при таком решении сервер просто навсегда зависнет
taskGroup.join ();

// Этап 2 аналогичен по структуре распараллеливания этапу 1
...

Если исходить из того, что действия на 2-ом этапе выполняются после завершения последней задачи на этапе 1 приходит идея, чтобы каждая лямбда функция держала объект со счетчиком ссылок, и действия 2-го этапа начались в деструкторе этого объекта.


std::shared_ptr< NextStage > nextStage = sugarWrapperToCreateExceptionGuardedDestructorClass ([]() {
    // Действия на 2-ом этапе
});

// Этап 1
auto taskGroup = threadPool.createTaskGroup ();
while (conditionToCreateManyJobsIsTrue ()) {
   addJob (taskGroup, [=nextStage]() {
     //Do some job
   });
}

Теперь код задачи после комментария

// Этап 1
завершает свою работу и освобождает поток, который может быть использован для выполнения полезной работы Балансировка нагрузки на ядра процессора тоже довольно ровная

Но мне не нравится, что я определяю 2-ой этап до первого, и поэтому код воспринимается хуже. Если этапов будет еще больше, то определять их нужно в обратном порядке.

У кого-нибудь есть идеи как еще можно решить эту задачу?



Последнее исправление: sotlef (всего исправлений: 1)

У тебя что-то нетак с дизайном. У тебя должно быть по одному воркеру на процессорное ядро, плюс поток набрасывающий задания в очередь. Никто никого не ждет. Стартуют все одновременно, завершаются тоже. Задания выгребаются из очереди по мере выполнения. Балансировкой занимается операционная система.

cvv ★★★★★
()
Последнее исправление: cvv (всего исправлений: 2)
Ответ на: комментарий от anonymous

а, забыл, что в таких случаях они представлены явно в виде отдельных ядер.

anonymous
()
Ответ на: комментарий от cvv

Количество потоков в очереди равно количеству ядер (или меньше, в зависимости от настроек в конфигурации)

Первоначально задача А (код которой схематично приведен в посте) стартует с поступлением запроса с сокета.

Дочерние задачи стартуют одновременно, каждая задача из которых может выполняться разное время - одна может выполниться за миллисекунды, другая - выполняться несколько секунд. И с этим ничего не поделаешь - спицифика задачи.

Чтобы было понятнее, числодробилкой является olap-сервер. В кубе определенные области могут задаваться разными правилами - соответственно, в зависимости от запрашиваемых клиентом срезов куба, вычисление разбивается на множество областей со своим сценарием вычислений. Количество этих областей не определенно, и они между собой могут зависимы.

Поэтому я не могу разбить вычисление на N воркеров, которые завершатся одновременно, так как вычисление некоторой области может потребовать вычисление, например, среза из другого куба.

Набрасывать задания я могу только в определенные моменты - это как раз в точках завершения предыдущего этапа и начала следующего

sotlef
() автор топика
Последнее исправление: sotlef (всего исправлений: 1)
Ответ на: комментарий от sotlef

Ты меня немного не понял. Может другим разом попробую обьяснить другими словами.

cvv ★★★★★
()
Ответ на: комментарий от sotlef

формируй две очереди, очередь на первый этап и очередь на второй, потоки для второго этапа начинают считывать задачи из второй очереди, когда в неё попадут задачи со всеми их зависимыми частями. потоки первого и второго этапа могут работать одновременно, но над отдельными группами задач. т.е. простаивать и кого-то ждать нет смысла, работой можно загрузиться по полной =)

такие вещи лучше описывать в UML.

anonymous
()
Ответ на: комментарий от anonymous

А если задача из второй очереди запустится раньше чем завершится соответствующая задача из первой очереди? Ты имеешь ввиду, что ожидание результата не будет ничего стоить, так как ОС загрузит ядро другим рабочим потоком из этих очередей?

sotlef
() автор топика
Последнее исправление: sotlef (всего исправлений: 1)

Опять программисты, которые не понимают что такое мультипоточность и зачем она нужна. У тебя задачи сильно напряжные для процессора или не очень (может они к памяти гораздо больше обращаются, чем на проце крутятся, тогда всё упрется не в процессор, а в память и переключение между потоками не будет так заметно)?

Я бы юзал пул потоков в твоей задаче. В крестах в стандартной библиотеке его нет, но ты можешь сам его написать, взять чужой, а может ты вообще с Qt разрабатываешь и там свой пул есть.

peregrine ★★★★★
()
Ответ на: комментарий от sotlef

Пока не сформирована группа взаимозависимых задач во второй очереди - потоки занимаются обработкой задач из первой. Как только во второй очереди сформируется группа, готовая для второго этапа - часть потоков начнет её солвить, при этом могут паралельно обрабатываться задачи из первой очереди для другой группы, либо их можно приостановить и перекинуть решать задачи из второй очереди, если есть приоритет быстрее завершить второй этап.

Суть в том, что потоки можно загрузить решать первый этап, чтобы они зря не простаивали в ожидании второго. Но если задачи приходят по одной взаимозависимой группе, то простоя избежать не удастся.

anonymous
()
Ответ на: комментарий от peregrine

Задачи напряжные для процессора Вроде ведь в самом начале видно, что threadPool используется Или вам просто кислотой побрызгать на меня?

sotlef
() автор топика
Ответ на: комментарий от anonymous

Но по сути у меня это и происходит.

Просто может я не акцентировал деталь, что задачи из второй очереди должны быть запущены, только когда _все_ задачи из первой были выполнены.

Вот я вроде как и решаю задачу простоя: выдаю разделяемый объект, последняя выполняемая задача из первой очереди обнуляет счетчик ссылок запуская в рабочем потоке следующую очередь задач

По факту это не значит, что другие потоки будут простаивать, задача которая тут описана как первой очереди может быть вложенной и выполняться как N-ой очереди, а вложенные задачи формируют свои «грозди» подзадач.

sotlef
() автор топика
Ответ на: комментарий от sotlef

задачи из второй очереди должны быть запущены, только когда _все_ задачи из первой были выполнены

т.е. когда группа задач будет польностью решена? можно ведь решать сразу несколько таких групп вперемешку на первом этапе, складывать результаты во вторую очередь, маркируя их айдишником группы, и когда во второй очереди наберется хотябы одна полная группа, готовая ко второму этапу - запускать воркеры второго этапа.

короче, решение найти можно, главное обеспечить достаточным количеством задач.

anonymous
()
Ответ на: комментарий от sotlef

Можно либо перейти на использование модели CSP с короутинами. Тогда у вас будет пул рабочих потоков, на которых вы будете запускать вычислительные задачи. И набор управляющих короутин (возможно, они все будут работать на одной единственной рабочей нити). Тогда в управляющей короутине вы сможете писать последовательный «синхронный» код:

auto taskGroup = threadPool.createTaskGroup ();
while (conditionToCreateManyJobsIsTrue ()) {
   addJob (taskGroup, []() {
     //Do some job
   });
}

// Здесь вашу короутину приостановят до тех пор, пока taskGroup
// не закончит свою работу.
taskGroup.join ();

Либо вы можете перейти на использование Actor Model, где у вас будут вычислительные акторы и актор-координатор. Вычислительные акторы выполняют свою задачу и отсылают сообщение-подтверждение актору-координатору. Тот, получив все подтверждения от вычислительных акторов, переходит к обработке следующей стадии (рассылая новые сообщения вычислительным акторам или создавая новых вычислительных акторов). И т.д.

eao197 ★★★★★
()

У кого-нибудь есть идеи как еще можно решить эту задачу?

переписать на rust, если ты не ровесник Сталина

anonymous
()
Ответ на: комментарий от anonymous

какой смысл писать на мертвом языке ?

я и говорю, на крестах никакого смысла писать нет. весь мир пишет на javascript, python и rust, пока эти старперы упражняются в уязвимостях

ЗЫ АНБ спонсирует курсы по C++, чтобы потом взламывать то что эти неадекваты понапишут, поэтому если ты программируешь на сиплюсплюс ты помогаешь американской разведке

anonymous
()
Ответ на: комментарий от anonymous

если никто не пишет на С++ тогда удалите весь софт который написан на этом языке в вашем планшете, копУтере итд, заходите еще к нам если у вас еще останется рабочее хотя бы одно устройство после таких манипуляций по чистке от софта

anonymous
()
Ответ на: комментарий от anonymous

растоманы такие любители рисковать, но для них это плюсом пока не выходит
ну ничего, еще годик или скорее два и закопают ваш раст вместе с тормозилой

anonymous
()

Может я не понял чего ты хочешь, но по хорошему нужно просто класть все задачи в одну очередь, а в классе задачи(ну или в std::function, пофик на самом деле до определённого момента), принимать решение что именно делать. Тогда потоки не будут впустую драконить ядра пока есть задачи n-ого этапа.

Ещё, я видел эффективное гибридное решение - локфри очередь которая засыпает после того, как разгребётся всё, что было накиданно за время просыпания и гребёт до тех пор пока не пуста. Как только пуста - засыпает.

pon4ik ★★★★★
()
Ответ на: комментарий от sotlef

Вроде ведь в самом начале видно, что threadPool используется Или вам просто кислотой побрызгать на меня?

А теперь сходи и почитай про него и хватит изобретать велосипед которым ты уже пользуешься. Или реализацию меняй если это самописный велосипед который как-то не так работает как надо.

peregrine ★★★★★
()
Последнее исправление: peregrine (всего исправлений: 1)
Ответ на: комментарий от anonymous

Дреппер сидит на лоре? Не знал, не знал :)

yoghurt ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.