LINUX.ORG.RU

LockFree очередь сообщений в shared memory - какие хорошие реализации?


1

4

Итак, имеем следующую задачу. Один (или несколько) процессов вычитывают из сети сообщения. Несколько других процессов обрабатывают эти сообщения. Т.е. апи приблизительно такой -

  • retcode_t push(void* data, int size);
  • retcode_t pop(void* data, int max_size, int* size);

При том это все рабоатет на уровне сообщения. Т.е. если вставили тру кусочка по 100, 200, и 300 то и выберем 100, 200, 300 а не 600 одним куском.

Хотелось бы это сделать без блокировок. Писателей/читателей от 4 до 50

Хотелось бы это сделать без блокировок

«Есть такая тема - нужна машина, чтобы могла ездить, летать и плавать. Хотелось бы чтобы она была без колес, винтов, крыльев и без двигателя».

Примитивы синхронизации - мутексы, семафоры и т.п. в любом случае будут, явно или неявно.

no-dashi ★★★★★
()
Последнее исправление: no-dashi (всего исправлений: 1)
Ответ на: комментарий от no-dashi

А атомарная операция - это, наверное, такой неявный мутекс, спрятанный внутри процессорной инструкции, а поэтому и весь алгоритм всё равно будет на адских блокировках.

Sorcerer ★★★★★
()
Ответ на: комментарий от no-dashi

Конечно, разве может быть иначе? В современных процессорах вообще имеется высокоуровневая инструкция POST_TO_LOR.

Sorcerer ★★★★★
()
Ответ на: комментарий от no-dashi

Вот тут подборка ссылок на эту тему. Но меня скорее интересует опыт практического использования, чем теоретические изыскания. Кстати, как раз похожая структура встроена в ядро линукса

Мутексы могут и быть, но например использоваться не каждую вставку а один раз из 100. Или вообще без мутекса.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

Тебе онанизмом заниматься, или задачу решить? Там же понятным по белому в первой же статье по данной ссылке говорят, что lock-free зачастую намного менее производительны, чем структуры с блокировками.

no-dashi ★★★★★
()
Ответ на: комментарий от no-dashi

Мне важнее не дергать примитивы синхронизации из ядра. Т.к. это приводит к тормозам. Проблема в том, что вот у меня 10 процессов и все они делают похожую работу.

  • выбирают сообщение из очереди
  • проверяют что никто уже не работает с подобным сообщением (от того-же пользователя)
  • логируют его
  • записывают его в буффер трасировки
  • учитывают в статистике
  • обработка, куча запросов к бд, LUA итд, генерация ответа
  • записываем ответ в буфер трассировки
  • логируем ответ
  • помечаем, что закончили работу с этим пользоваетелем
  • засовываем в исходящую очередь

и на каждом шаге блокировка. Проблемы начинаются когда загрузка ЦПУ достаточна велика и количество потоков больше числа ядер. В этом случе система может на 3-5 секунд войти в состяние 100% загрузки ЦПУ. К сожалению, не всегда получается деражать количества процеесов небольшим. Я готов пойти на небольшое падение производительности, но отсутвие затыков для меня КРИТИЧНО, т.к. система почти realtime. Протормозим с ответом - пользователеь не получит доступа к интернету.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

В этом случе система может на 3-5 секунд войти в состяние 100% загрузки ЦПУ.

Когда большинство потоков ожидают освобождения мутекса загрузка ЦПУ не может быть 100%

Absurd ★★★
()
Ответ на: комментарий от vromanov

Так блокировки надо делать разумные, там где надо на то что надо ровно на нужное время. И извини, но у тебя просто неправильная архитектура. Введи диспетчер нагрузки, который будет сам опрашивать очередь, помнить какой обработчик чем занимается и озадачивать обработчики - и «внезапно» у тебя проблемы с блокировками в очереди исчезнут сами собой, и останется только синхронизировать принимающую события часть и диспетчер нагрузки.

no-dashi ★★★★★
()
Последнее исправление: no-dashi (всего исправлений: 1)
Ответ на: комментарий от Absurd

Если используются, например, адаптивные мутексы то вполне могут. Просто потоки крутятся в цикле и ждут освобождения лока

vromanov ★★
() автор топика
Ответ на: комментарий от no-dashi

В этом направлении мы тоже думаем. Часть проблем этим будет решена, но не все. Можно будетпросто завести несколько небольших очередей по числу воркеров и перекладывать сообщения из большой очереди в маленькие.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

Вообще я похоже писал похожую систему лет 5 назад. Там очередью служила оракловая таблица, в которую новые задания добавлялись при помощи простого insert-а. Треды-рабочие ее разгребали не по одному а сразу по MIN(queue_size, 50) штук. Там в оракле можно писать запросы вида update Job set Status='PENDING' where Status='NEW' and RowNum <50 bulk collect into OutputArray c последующим возвратом этого OutputArray в виде резалтсета. Тред получал эксклюзивную копию этих 50 штук джобов и дальше работал с ними без блокировок. Таким образом количество блокировок на один джоб снижалось в 50 раз.

Absurd ★★★
()
Последнее исправление: Absurd (всего исправлений: 1)
Ответ на: комментарий от no-dashi

У нас кроме этого есть куча других проблем, которые мы успешно решаем. Сервер успевает обрабатывать 8-10 тысяч сообщений по DIAMETER и/или 20-50 тысяч HTTP запросов в секунду на обычном десктопе. При этом он успевает еще выполнить >100 000 запросов в базу данных, реплицировать состояние на второй узел кластера и делать еще кучу полезных дел, напрмер, писать в лог по 20 тысяч строк в секунду

vromanov ★★
() автор топика
Ответ на: комментарий от Absurd

эээээ.... И скорость выборки небось была 100-1000 сообщений в секунду? У нас есть и такое. Но мы просто удаляем по одной строчке с использование DELETE first 1 from events where .... RETURNING col1, col2; Все равно дальше будут блокировки, но речь идет не о блокировках на уровне BD, а на уровне объектов внутри приложения

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

100 000 запросов в базу данных, реплицировать состояние на второй узел кластера и делать еще кучу полезных дел, напрмер, писать в лог по 20 тысяч строк в секунду

И это все «на обычном десктопе» ? Хороший у вас десктоп, с рейдом из SSD/SAS дисков, похоже.

А что за БД, позволяющая делать 100к QPS?

Slader
()
Ответ на: комментарий от vromanov

Все равно дальше будут блокировки, но речь идет не о блокировках на уровне BD, а на уровне объектов внутри приложения

А что, передавать объекты из треда в тред не по одному а сразу по 100 никак? Я чтобы избежать потерь на синхронизацию просто обмениваюсь большими кусками данных. Делаю рингбуффер в который объекты кладутся по одному, а берутся сразу все первым обратившимся. Жалоб на перформанс нету обычно.

Absurd ★★★
()
Ответ на: комментарий от Slader

База данных TimesTen. На десктопе стоит обычный винчестер.

WDC WD5000AAKS-00E4A0 Intel(R) Core(TM) i7 CPU K 875 @ 2.93GHz

vromanov ★★
() автор топика
Ответ на: комментарий от Absurd

Это будет плохо. Время обработки одного сообщения около 0.3-0.6 ms. Сейчас процессы обрабатывают сообщения сразу после их получения. Если брать 100 штук, то последнее из сотни будет обработано спустя 30-60 мс, что многовато, т.к. сейчас среднее время ответа померянное на стороне тестовой тулы 2 ms. Т.е. при таком походе мы сразу увеличиваем среднее время ответа в 10-20 раз.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

Время обработки одного сообщения около 0.3-0.6 ms. Сейчас процессы обрабатывают сообщения сразу после их получения. Если брать 100 штук, то последнее из сотни будет обработано спустя 30-60 мс, что многовато, т.к. сейчас среднее время ответа померянное на стороне тестовой тулы 2 ms.

Ну не фиксированно по 100 штук, а жадным образом. То есть первый пришедший возьмет все что есть. Потом второй пришедший возьмет все что успело накопиться в интервале времени между тем как все забрал первый и освободился второй. Система стабилизируется - скажем, за время пока до очереди добегает очередной освободившийся в ней накапливается в среднем 10 штук. Все равно они обрабатываются последовательно на физическом уровне. Только если делать по одному джобу на тред, эту последовательность будет перематывать планировщик задач.

Absurd ★★★
()
Ответ на: комментарий от Absurd

Делали уже так. Под нагрузкой как раз все стремится к худшему варианту, плюс растет время под локом, т.к. приходится получать пачку данных. С этой точки зрения более оптимально разделить очередь на несколько.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

Не пробовали сделать отдельную очередь на каждого читателя, соответственно балансируя нагрузку между очередями?

Думаю все ваши проблемы от неправильной архитектуры строящейся системы

pulo
()
Ответ на: комментарий от pulo

В некоторых местах так уже и сделано. Но есть места, где это сделать проблематично. Например, очередь сообщений в лог. Там очень хочтеся сохранить последовательность.

Система уже пару лет в эксплуатации и вполне успешно работает. Несколько кластеров по всей стране. Это скорее улучшение мелких проблем. Например, эта проблема возникает в среднем 2 раза за сутки при работе под экстремальной нагрузкой, которая в эксплуатации ни разу не достигалась. У конкурентов нормой считается время ответа до 500ms и 1% ошибок, у нас показатели на порядки лучше.

vromanov ★★
() автор топика
Ответ на: комментарий от vromanov

растет время под локом, т.к. приходится получать пачку данных.

Чему там расти? Берущий передает ссылку на массив куда под локом кладется несколько указателей. Массив thread-local фиксированного размера, переаллокаций не происходит.

Absurd ★★★
()
Ответ на: комментарий от Absurd

Можно и так.. Но у нас в очередь запихивает один процесс, а другие выбирают. Т.е. сообщения лежат не в куче, а самой очереди фиксированного размера. При выборке тело сообщения копируется в буффер, после чего на его место уже может быть помещено другое сообщение в случае переполнения очереди. При вашем подходе место придется резревировать до окончания обработки сообщения. А потом еще рализовать алгоритм для склейки освобожденных кусков, если обработка будет заканичиваться не в той последовательности, как вычитывание. Вот это как раз стоит сделать в lockless очереди.

vromanov ★★
() автор топика

Блокировки вообще не нужны: у каждого читателя своя очередь, писатели им по round robin ложат сообщения, делая атомарный своп указателя на последнее.

mv ★★★★★
()
Ответ на: комментарий от mv

А теперь представьте себе следующие случаи

  • Эта очередь сообщений для логирования и хочется сохранить последовательность сообщений
  • Количество читатаелей/писателей может меняться

Ну и простого атомарного свопа недостаточно.. Об этом и был вопрос.

vromanov ★★
() автор топика
Ответ на: комментарий от mv

Блокировки вообще не нужны: у каждого читателя своя очередь, писатели им по round robin ложат сообщения, делая атомарный своп указателя на последнее.

Это не очередь, это стек. Для FIFO нужно dequeue делать из начала, а не с конца.

Absurd ★★★
()
Ответ на: комментарий от vromanov

Количество читатаелей/писателей может меняться

Поэтому тебе нужен диспетчер заданий, который в одиночку будет разруливать все блокировки, «писатели» будут на нём регистрироваться и отправлять сообщения, а «читатели» опять таки регистрироваться и получать сообщения, ни те, ни другие вообще ни о чем не будут думать, поскольку для них диспечтер будет либо бездонной корзиной (для писателей), либо бесконечным источником (для читателей). А что кому направить будет решать диспетчер, чьи блокировки будут легковесными внутрипроцессовыми.

no-dashi ★★★★★
()
Ответ на: комментарий от vromanov

вынеси лог в отдельный поток со своей очередью, обработчики пусть без блокировки толкают сообщения в очередь лога. Все равно запись в файл(или куда ты там пишешь) - это последовательная операция, требующая блокировки. Если логгер будет один, то пропадет проблема непоследовательности записей в лог. Также снизится влияние на риалтайм, т.к. обработчики будут заниматься обработкой данных, а не ждать разблокировки лога. Тоже самое можно сделать со статистикой, и «буффером трассировки»(что бы это ни было у тебя), короче все отложенные операции можно выделить в отдельные «вялотекущие» потоки.

codeogre
()
Ответ на: комментарий от Sorcerer

На просто атомарных далеко не уедешь. Там часто все основано на CAS операциях, а они могут хорошенько бороться за ресурсы на большом количестве потоков.

vertexua ★★★★★
()

Наводящий вопрос. А если без блокировок, то как консюмеры будут ждать когда не будет сообщений? В цикле вращаться?

vertexua ★★★★★
()
Ответ на: комментарий от vromanov

Эта очередь сообщений для логирования и хочется сохранить последовательность сообщений

timestamp/sequence number? Сохраняй как попало, потом на чтении разберешься

vertexua ★★★★★
()
Последнее исправление: vertexua (всего исправлений: 2)

Опишите поподробнее задачу. Оптимизация под запись или под чтение? Сколько читателей сколько писателей? Оптимизация под latency или throughput? Поведение queue или topic? Размеры очереди, всех сообщений, каждого сообщения (средние, максимальные)? Очередь часто пуста или переполнена? Ожидается ли отваливание по ошибке читателей и писателей? Как часто писатели и читатели отключаются?

vertexua ★★★★★
()

Вы уже стали коммитить в ядро?

anonymous
()

Вот что интересно по поводу конкаренси что всё описано и расписано пол века назад, но до сих пор не осиливают. Люди стали глупее. Печально.

anonymous
()
Ответ на: комментарий от pathfinder

о недопустимости хамства

Где это вы углядели?

anonymous
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.