LINUX.ORG.RU

Приоритетная очередь

 


0

1

Приветствую

Через однопоточный обработчик поступают mqtt команды и его никак нельзя тормозить ещё и их обработкой, при этом какие то типы команд могут быть быстро обработаны, какие то существенно медленней.

Если с самой очередью приоритотов вроде есть понимание в использовании std::priority_queue (или лучше мудрить с heap???), то как организовать раздельную обработку каждого приоритета, чтобы длинные низкоприоритетные задачи не тормозили более высокие пока не соображу.

Пока только 2 вида, но может в будущем будет больше и не хочется тупо делать количество очередей просто по количеству приоритотов.

На чем такое реализовать на плюсах???

★★★

однопоточный обработчик

чтобы длинные низкоприоритетные задачи не тормозили

Завести второй обработчик и обрабатывать там низкоприоритетные задачи?

но может в будущем будет больше и не хочется тупо делать количество очередей просто по количеству приоритотов.

Именно так все и делается.

goingUp ★★★★★
()
Последнее исправление: goingUp (всего исправлений: 1)
Ответ на: комментарий от goingUp

Именно так все и делается

Это слишком просто) но главное создаёт вопрос остановки обработки менее приоритетных задач и как следствие спящих потоков.

wolverin ★★★
() автор топика
Ответ на: комментарий от Irma

Ну вот и вопрос как это сделано!? Чтобы более приоритетные задачи не просто брались первыми из очереди, но и не ждали конца выполнения менее приоритетных.

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

Нужно универсально и эффективно

Универсально и эффективно вообще под каждый тип задач очередь свою делать) Можно будет ставить обработчик на машинку оптимизированную под нагрузку на процессор, или память, или iops. В этом же и вся суть очередей сообщений, нет?

goingUp ★★★★★
()
Ответ на: комментарий от wolverin

но и не ждали конца выполнения менее приоритетных

Делай задачи в виде корутин из c++20, которые в конце элементарного участка работы делают co_await schedule(); и таким образом отдают процессор. Далее планировщик определяет какую следующую задачу возобновить.

Корутины можно заменить на лямбду с захваченным объектом, который хранит состояние между вызовами.

ox55ff ★★★★★
()
Ответ на: комментарий от wolverin

Кстати, в boost есть корутины и без c++20. Даже две версии. Там другая реализация, чем в c++20. Нужно убедится, что твоя платформа (арм железка) такое поддерживает. Я смотрел запись конференции, на которой чел из яндекса рассказывал как они вроде бы бекенд для такси сделали именно на этих корутинах.

ox55ff ★★★★★
()
Ответ на: комментарий от ox55ff

тогда не понимаю как мне начать выполнять более приоритетную задачу, пока поток занят выполнением менее приоритетной и не ждать его завершения.

бууст из репозитория у меня тоже не весь доступен, где то года до 2015 ) но если подскажите точнее про что речь, я бы поразбирался детальнее.

wolverin ★★★
() автор топика
Ответ на: комментарий от ox55ff

Я смотрел запись конференции, на которой чел из яндекса рассказывал как они вроде бы бекенд для такси сделали именно на этих корутинах.

Они свою разработку уже в OpenSource выложили: https://habr.com/ru/companies/yandex/articles/674902/

Но, мне кажется, вы не делаете поправки на уровень квалификации ТСа.

eao197 ★★★★★
()
Ответ на: комментарий от wolverin

пока поток занят выполнением менее приоритетной

Ты задачу пишешь, не непрерывной портянкой, а разбиваешь на куски. И когда кончается один кусок, то планировщик проверяет есть ли более приоритетная задача. Если есть, то переключается на неё, нет берёт следующий кусок этой же низкоприоритетной задачи.

если подскажите точнее

Выбирай:
boost::coroutine
boost::coroutine2

ox55ff ★★★★★
()

Вообще-то обработка должна быть отделена от получения сообщения из очереди, тем более если ты знаешь что могут быть задержки при обработке.

Поэтому разумно сделать вычитку а дальше пул потоков , в котором запускаются задачи обработки.

alex0x08 ★★★
()
Ответ на: комментарий от Irma

я не могу дробить задачу на куски, т.к., например, может выполняться длительный SQL запрос - в моем случае я XML скармливаю MySQL, но может и еще что то будет подобное.

wolverin ★★★
() автор топика
Ответ на: комментарий от ox55ff

Можно оформить в виде асинхронной задачи и не блокировать поток.

Так ведь для этого нужно, чтобы драйвер доступа к БД поддерживал асинхронную работу. ЕМНИП, в том же Яндексе для своего userver пришлось кастомные драйвера для пары СУБД писать.

eao197 ★★★★★
()
Ответ на: комментарий от eao197

Можно обойтись малой кровью. Скидывать запрос к СУБД в отдельный поток, который будет ждать завершения блокирующий операции и оповещать. У автора явно не фреймворк, который должен держать тысячи запросов в секунду. Можно и выделить поток под работу с бд.

ox55ff ★★★★★
()

смари, для начала безотносительно приоритетов.

поток сообщений (буферизованный. ибо обычно сообщения валятся неравномерно, то пусто то густо). пул потоков-обработчиков. если есть свободный обработчик - отдаем ему сообщение из очереди. нету - курим.

если обработчики не успевают разгрузить очередь - есть разные стратегии. ну, кроме как рыдать и рвать волосы из жопы. например дропать все невпихуемое. или дампить необработанное на диск, до лучших времен. или переезжать на железо получше. или делить очередь на несколько железяк. или комбинация из.

едем дальше. мы не хотим, чтобы низкоприоритетные сообщения засрали пул обработчиков, тем самым похерив обработку высокоприоритетных сообщений. для этого мы делаем что? правильно, два пула обработчиков. или сколько там. по количеству приоритетов.

то есть, получив сообщение, мы укладываем его в (буферизованную) очередь, соответствующую его приоритету. из этой очереди сообщение заберет первый же свободный обработчик из соответствующего пула. тем самым мы фактически свели задачу к простому случаю - к обработке сообщений безо всяких там приоритетов.

схему можем разносить на потоки, процессы, виртуальные машины, реальное железо, здесь, там и хоть на луне.

успехов.

olelookoe ★★★
()

Выглядит, как задача на модифицированный thread-pool.

Т.е. есть пул с, условно, 4 тредами и две очереди 1-HightPriority и 2 - LowPriority.

  • Пока потоков хватает, пул стабильно выгребает задачи и их выполняет, как по классике (сначала все из 1-ой, потом из 2-ой).
  • Когда потоки в пуле заняты, то при поступлении задач для 1-ой очереди поток в пул добавляется (и завершается если нет задач в 1-ой очереди, можно также установить верхний предел на общее количество потоков в «менеджере» тред-пула, (16 например))
  • При поступлении задач для второй очереди они ожидают пока для них освободится поток в пуле (соответственно, выгребутся задачи из приоритетной очереди и количество активных тредов снова вернётся к < 4 штук).
SkyMaverick ★★★★★
()
Ответ на: комментарий от olelookoe

Да так делают, но в вашем решении есть проблема - вы упретесь очень быстро в производительность железки, хотя цели быстро выполнять низкоприоритетные задачи нет.

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)

Сделай по отдельной очереди на каждый приоритет. И сделай по треду на каждый приоритет. Пусть каждый тред работает со своей очередью. В итоге тред обрабатывающий лёгкие задачи будет быстро забирать новые задачи из своей очереди. А тред выполняющий тяжёлые задачи будет медленно забирать новые задачи из своей очереди. В итоге в каком-то роде они будут не мешать друг-другу.

Синхронизацию работы с каждой очередью сделать на std::mutex и std::conditional_variable.

KivApple ★★★★★
()
Ответ на: комментарий от wolverin

Я бы сначала проверил, на твоём месте. Перформанс такая штука, что без бенчмарков никуда.

А что ты хочешь?

Вот твой процесс взял тяжёлую задачу. А потом в очереди появилась лёгая задача.

Тут есть ровно два варианта:

- Он прерывает исполнение тяжёлой задачи и переключается на лёгкую. Но тогда это обычная вытесняющая многозадачность. И как вариант с несколькими тредами для разных приоритетов. В условиях ограниченных ресурсов можно играться количеством тредов (и их приоритетами), количеством приоритетов и размерами стеков. Да, если очень сильно захотеть, можно нагородить свой user-space scheduler, но таким занимаются, когда реально упираются в ресурсы, а не в теории, потому что работы много, а выхлоп не гарантирован (в современных ОС планировщики достаточно оптимизированны). Ещё есть Boost.Fiber для кооперативной многозадачности. В таком случае тебе нужно распихать по реализации тяжёлой задачи точки, где она будет проверять наличие лёгких задач и ставиться на паузу.

- Он выполняет тяжёлую задачу до конца, а потом берёт лёгкую. В таком случае не понимаю вопрос, ты уже нашёл priority queue. Просто бери каждый раз задачу с наивысшим приоритетом и всё. Но как ты верно заметил тяжёлая задача может затормозить лёгкую.

Других вариантов не существует в природе.

KivApple ★★★★★
()
Последнее исправление: KivApple (всего исправлений: 3)
Ответ на: комментарий от wolverin

Задачи всегда мешают друг другу из-за ограниченности ресурсов. Собственно, для того и создана ОС, чтобы решать вопрос.

Если у тебя несколько обработчиков для разноприоритетных задач, то им самим можно задать приоритет средствами ОС и спихнуть задачу на ядро.

Irma ★★
()
Ответ на: комментарий от ox55ff

но и не ждали конца выполнения менее приоритетных

Делай задачи в виде корутин

Корутины не имеют жёстких приоритетов, и подходят скорее для «асинхронности» (отложенных и ленивых вычислений, и т.п.), а не «мультизадачности».

Вероятно, в данном случае лучше честно завести массив очередей с индексацией по приоритету, причём каждый приоритет должен быть поддержан своей нитью или пулом нитей соответствующего приоритета. Это позволит не колхозить с корутинами, а воспользоваться всей мощью ядра. Только это гарантирует ожидаемые тайминги выполнения, и на самом деле конкурировать за ЦП с другими процессами в системе. И, да, придётся подумать, что действительно более приоритетное, а что менее, и стоит ли вообще вводить дополнительные приоритеты.

raspopov
()
Ответ на: комментарий от raspopov

В c++20 корутины очень низкоуровневые. На них можно много чего сделать, а не только попсовый async/await из того же си шарп. В том числе и кооперативную многопоточность.

Если я правильно понял автора, то у него из-за ограниченых ресурсов ЦП нужно уметь прерывать низкоприоритетные задачи и отдавать процессор высокоприоритетной задаче.

Корутину можно прервать на co_await команде. Что позволит решить поставленную задачу. А в вашем случае потоки, обслуживающие низкоприоритетные задачи, продолжат выполнятся параллельно высокоприоритетным и отнимать дефицитные ресурсы.

ox55ff ★★★★★
()
Ответ на: комментарий от ox55ff

Корутину можно прервать на co_await команде

Не можно прервать, а придётся прервать, т.е. заниматься планировкой времени ЦП практически вручную.

Нить же с низким приоритетом будет естественным образом вытеснена с ЦП в соответствии с гибким планированием, включая интересные варианты типа round-robin.

raspopov
()

есть сортированная по приоритету очередь заданий. туда кидаются задания. есть N тредов обработчиков заданий, которые в цикле выбирают задания и исполняют их. тред-обработчик задания может менять свой приоритет в зависимости от приоритета задания.

все остальные «способы» суть производные от этого, или частные(специальные) случаи.

alysnix ★★★
()
Ответ на: комментарий от alysnix

да, но нет.

если у тебя пул из N потоков и тебе прилетело N низкоприоритетных сообщений, то N+1 е высокоприоритетное сообщение курит, пока менее приоритетные сообщения не будут обработаны.

olelookoe ★★★
()
Ответ на: комментарий от olelookoe

Приоритет заканчивается в момент начала обработки, ну у нормальных людей.

Если делать всю обработку целиком согласно приоритетам то это получается скотство на уровне ручных пауз и продолжений выполнения.

Ну и потом процессор не резиновый, сто тысяч медленно выполняющихся потоков куда менее осмысленное явление чем десять быстрых и тыща в ожидании.

alex0x08 ★★★
()
Последнее исправление: alex0x08 (всего исправлений: 1)