LINUX.ORG.RU

Реализация очередей и не только

 


0

2

Допустим, есть структура очереди. В неё можно читать и из неё писать. Объём памяти, выделенный под очередь, фиксированный (задаётся при создании). Очередь в себе хранит указатели на первый и последний элемент (при этом если мы доходим до конца массива во время чтения или записи, то переходим к его началу). Всё хорошо работает, однако возможна ситуация, когда в очереди кончится свободное место. В этом случае надо дождаться, пока оно появиться. Аналогично мы можем хотеть считать из очереди N байт, а если их не хватает, то дождаться их появления.

Busy loop это не круто. Однако у меня есть специальный примитив синхронизации - EventSource. Он поддерживает две основные операции - ожидание сигнала и отправка сигнала. Если поток ждёт сигнал, он не потребляет ресурсы. Отправка сигнала пробуждает все ждущие потоки.

На Linux реализуется этот примитив через pthread_condvar (параметр mutex не используем, передавая свежий mutex, который уничтожаем сразу же после вызова). Под моей личной RTOS реализуется нативно.

Теперь поступаем просто - после каждой записи или чтения посылаем соответственно сигнал о записи или о чтении. Если записали/прочитали не всё, что хотели, то ждём противоположный сигнал (если хотим записать, ждём пока кто-то прочитает, если хотим прочитать, то ждём, пока кто-нибудь запишет).

Возникает логичная проблема. А что если мы отправим сигнал о, скажем, записи. В этот момент нас нагло прервёт проснувшийся поток, который ждал, пока появятся данные в очереди (и мы его разбудили), вытащит из очереди все данные, пошлёт сигнал чтения, а затем снова уснёт от их недостатка. И только теперь к нам возвращается управление и мы начинаем ждать событие чтения. А оно не придёт! Потому что тот кто хотел уже всё прочитал и ждёт, пока нового события записи, которое мы отправили ранее и больше не отправим, пока не запишем ещё.

И что с этим делать? Надо как-то научиться принимать сигналы из прошлого. Скажем, заведём флаг и будем сбрасывать его в false в начале каждой записи/чтения, а после окончания записи (либо если кончилось место/данные в очереди) устанавливаем в true флаг противоположного действия помимо отправки сигнала. В ожидание переходим лишь если флаг остался в false. Но это не поможет - нас могут прервать внутри самой функции ожидания, до того как она собственно начнёт ждать. И она точно также пропустит сигнал.

Конечно, в своей RTOS я могу что-то нахимичить (сделать проверку флага и переход в ожидание, если его нет, атомарным), но не в pthread же.

Как вообще такое надо делать?

Моя кривая реализация очереди, если что - https://github.com/KivApple/controllerFramework/blob/master/libraries/platfor...

★★★★★

параметр mutex не используем, передавая свежий mutex, который уничтожаем сразу же после вызова

Так делать нельзя, ты можешь «потерять событие» из-за гонок.

Посмотри хотя бы на это:https://github.com/shkolnick-kun/bugurtos/blob/master/libs/native/sig.c

Там решение аналогичное, но ничего не теряется, ибо ничего не уничтожается.

Возникает логичная проблема. А что если мы отправим сигнал о, скажем, записи. В этот момент нас нагло прервёт проснувшийся поток, который ждал, пока появятся данные в очереди (и мы его разбудили), вытащит из очереди все данные, пошлёт сигнал чтения, а затем снова уснёт от их недостатка. И только теперь к нам возвращается управление и мы начинаем ждать событие чтения. А оно не придёт! Потому что тот кто хотел уже всё прочитал и ждёт, пока нового события записи, которое мы отправили ранее и больше не отправим, пока не запишем ещё.

Это называется producer-consumer problem, решается при помощи 2 условных переменных на одном мьюьексе или двух счетныз семафоров на 1 мьютексе, читай википедию.

И да, как там твой атомарный проект? Я вот забросил все это, ибо начитался статей про атомарные операции, где написано, что lock-free структуры в большинстве случаев тормознее, чем со спин-локами, как раз из-а атомарных операций.

shkolnick-kun ★★★★★ ()

И таки да, producer-consummer.

invy ★★★★★ ()

Похоже на то, для чего придумали в Haskell монаду STM, которую по привычке называют Software Transactional Memory, но она больше об атомарных операциях. Под покровом реализации там внутри события и слушатели, да еще журналы откатов.

dave ★★★★★ ()
Ответ на: комментарий от shkolnick-kun

Я тоже забил на полный lock-free, использую его лишь в отдельных моментах. Сейчас всё более-менее работает.

Кстати, вот сделал решение для очереди - я перекроил механизм событий. Теперь явно сначала создаётся EventListener для какого-либо EventSource. А уже потом мы запускаем ожидание события. Причём если событие пришло после инициализации слушателя, но перед запуском ожидания, то функция ожидания вернётся тут же. Любое успешное завершение ожидания сбрасывает флаг события.

В итоге в очередях всё просто - создаём слушателя, выполняем чтение/запись сколько влезет, если влезло не всё и мы готовы ждать, то ждём события. Это отлично работает на STM32, однако на Linux я таки смог словить зависание (думаю, что виной более высокая производительность).

Проблема уже в самом коде ожидания события:

while (!listener->fired) {
	ThreadSleep();
}
listener->fired = false;

Что если listener->fired изменится уже после проверки, но до непосредственного засыпания? Будет печально.

На STM32 это можно решить блокировкой системы во время проверки флага и усыплением текущего треда. На Linux вероятно придётся поколдовать с conditional variables.

KivApple ★★★★★ ()
Ответ на: комментарий от KivApple

Значит далее нужно для каждого consumer'а вести свою очередь на вычетку, в которую перебрасывать элементы, которые доступны на момент вычитки. Не знаю как по-русски, но это называется slice. В ядре Linux такое сплош и рядом.

i82 ★★ ()

Зачем всё это барокко? Что не так с решением из учебника, на mutex и condvar, или решением из другого учебника, где задачи отправляются спать, а другие их будят?

tailgunner ★★★★★ ()
Ответ на: комментарий от KivApple

В данном случае лучше применить классическое решение rpoducer-consumer problem, как предлагает tailgunner.

В твоем решении есть как минимум лишний мьютекс, и код переусложнен.

Вот пример реализации: https://gist.github.com/Alexis-D/1801206

Добра тебе!

shkolnick-kun ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.