LINUX.ORG.RU

Posix threads: rwlock

 posix threads


0

3

Задача: нужно создать кучу потоков, которые читают разделяемые данные. И есть один, который иногда (очень редко) меняет эти данные. Напрашивающееся решение: использовать rwlock. Это точно лучше, чем mutex. Но есть засада: при попытке получить rwlock на чтение может быть возвращена ошибка: достигнут предел на количество читателей.

Вопрос: как мне узнать заранее этот предел? Если я буду его знать, то у меня будут варианты: либо молча уменьшить макс. кол-во потоков до этой величины, либо сообщить юзеру, что я не могу обеспечить заданное им кол-во потоков, и завершиться после этого. Так как мне узнать (или задать) эту величину? Google даёт только ссылки либо на тот же man, который у меня и так есть, либо на пересказ этого же man’а своими словами с опечатками.

У меня программы должны работать десятилетиями, я не могу методом тыка определять такие величины: создавать потоки, лочить rwlock, затем при ошибке выдать сообщение о предельном кол-ве потоков, которое вбить гвоздями в программу. Сегодня это одна величина, а через 20 лет на другой версии GNU/linux она будет другой. Есть ли тут аналог sysconf(3)? Вот я могу запросить в run-time sysconf(_SC_OPEN_MAX). И исходя из этого не наступить на проблему, когда мне нужно держать открытыми много файлов одновременно.

А в случае POSIX threads вместо предсказуемого поведения программы мне предлагаются грабли. На которые я могу наступить в неизвестный мне заранее момент времени. Кто вообще писал такую спецификацию? Спек должен быть максимально конкретным, а не вот это вот расплывчатое, невнятное говно. Туда же до кучи: где мне получить величину PTHREAD_STACK_MIN? В man’е pthread_attr_setstacksize(3) она указана как 16384 байта (linux-specific). Я должен вбить гвоздями в программу константу 16384? Где, чёрт возьми, она за-define-нена? В pthread.h её нет. Ни sysconf(3), ни getrlimit(2) не дают её мне. Кто мне гарантирует, что через 10 лет она не изменится, и моя программа не перестанет работать? А где мне взять константу PTHREAD_THREADS_MAX, на которую ссылается man pthread_create(3p)?

Чем больше думаю на эту тему, тем меньше хочется использовать POSIX threads, а больше хочется заюзать напрямую clone(2). И получить предсказуемое поведение. И гори он огнём, этот POSIX с его невнятной спецификацией! Программа, написанная лет 20-25 назад с использованием linux-специфичного packet socket до сих пор работает и есть не просит.

Но всё-таки хочется же переносимости – по возможности. Может кто знает, где нарыть документацию на POSIX threads? Более внятную, чем тупо перепечатка man’ов на функции. И конкретно на rwlock: как получить (указать?) максимальное количество потоков-читателей? Opengroup не предлагать, там те же man’ы.

★★
Ответ на: комментарий от r--r--r--

Ты второй раз отвечаешь на пост, но не отвечаешь на вопрос.

мальчик, а к тебе вообще тут никто не обращался. Ты не ТС, ответы были ни к тебе, твои комментарии тоже не в тему и стартеру не помогают никак. По доброте душевной я тебе ответил.

MKuznetsov ★★★★★
()
Ответ на: комментарий от MKuznetsov

По доброте душевной я тебе ответил.

А если бы ты, девочка, ещё бы умела себя вести в приличном обществе и не щитпостить FUD’ом в интернетах - цены бы тебе не было.

r--r--r--
()
Ответ на: комментарий от vbr

Что означает эта фраза? Там по-любому будет какая-то очередь для этих запросов. А значит будет многопоточная структура данных для реализации этой очереди. И ты просто заменил простой многопоточный примитив сложным. Можно, а зачем?

Представим раннее утро. На улицах ещё пусто, лишь вороны каркают в тиши. К местной поликлинике потихоньку начинает сходиться народ. Двери здания открываются и внутрь заходят первые три человека. Это трое мужчин деревенского вида и коренастого телосложения. Один из них спрашивает на ходу у тетки-регистраторши где сидит врач такой-то. Тётя называет номер кабинета и вся троица прямиком отправляется туда. Спустя минуту входит другой мужчина, сдаёт пальто в раздевалку, надевает бахилы и молча проходит в том же направлении, куда отправились три товарища ранее.

У дверей кабинета начинается словесная перепалка между деревенскими мужчинами за право пройти первым в кабинет. Слово за слово, начинается потасовка. Разгорячённые лица, влетающие в них мощные кулаки, рвутся рубахи и штаны. Дверь кабинета открывается и борьба значительно ускоряется. В ход уже идут удары ниже пояса, укусы и удушающие приёмы. Наконец, один боец прорывается в кабинет врача и дверь за ним захлопывается. Двое других мужчин на минуту успокаиваются, но готовы в любое мгновение возобновить борьбу за право войти в кабинет первым.

Четвертый же мужчина садится у двери в соседний кабинет, достаёт свой телефон и неспешно начинает что-то отмечать в нём. Проходит несколько минут и дверь второго кабинета открывается и медсестра приглашает четвёртого мужчину по имени пройти во второй кабинет.

Деревенские мужики сильно удивились и подскакивают к своей медсестре: «А чего это того мужика позвали по имени, а нам приходится тут драться за право войти в кабинет?». А медсестра им отвечает: «Надо было вам, ребята, записываться на приём заранее, тогда и вас пригласили бы по имени и отчеству!».

Средствами доставки запросов на чтение/запись данных, самих данных для записи и подтверждений об успешности выполнения действий могут быть трубы, очереди сообщений и даже файлы. Суть в том, что в «службохранительном» подходе потокам не нужно драться и тратить свои вычислительные мощности за право получить обслуживание. Это не переизобретение семафора или мьютекса, как тут пишут форумчане выше, это отложенный вид обслуживания по предварительному запросу.

Enthusiast ★★★★
()
Ответ на: комментарий от SkyMaverick

Это ведь не решение вопроса синхронизации доступа к разделяемым данным? Видимо, предполагается, что разделяемых данных при таком подходе вообше не будет? В моём случае это не так.

Выше по топику предложили хорошее решение для моей задачи. Если упрощённо, то выглядит оно так: выделить 1 резервный буфер, в который писатель запишет изменения. После этого указатель на этот буфер атомарно записать вместо одного из рабочих буферов. Указатель бывшего рабочего – запомнить как резервный. При следующей записи использовать его. Поскольку писатель запускается крайне редко и работает сильно дольше читателя, то не нужно даже заморочиваться на проверку перед записью, что бывший рабочий никому давно не нужен.

nobody ★★
() автор топика
Ответ на: комментарий от MKuznetsov

Ок, в твоём случае это, наверно, было оправданно. Но у меня задача организационно проще, поэтому на отдельного диспетчера заморочиваться нет смысла.

nobody ★★
() автор топика
Ответ на: комментарий от MKuznetsov

Дык, я как раз и хочу избежать linux-специфичности по возможности. Не из-за того, что интерфейс для user-space поломать могут (это вряд ли), а потому что переносимый код обходится дешевле в сопровождении. Но плохая проработанность стандарта (неопределённость в некоторых местах) сильно напрягает. По сравнению с этим linux-специфичное решение полностью понятно и предсказуемо.

nobody ★★
() автор топика
Ответ на: комментарий от ttnl

Если бы у меня при включении pthread.h появились требуемые константы, я бы не написал, что их нет. У меня вообще нет такого файла, как pthread_stack_min-dynamic.h. А PTHREAD_STACK_MIN определяется в /usr/include/bits/local_lim.h. Что совершенно неочевидно, поскольку в pthread.h он не включается. А включается в bits/posix1_lim.h, который включен в limits.h.

nobody ★★
() автор топика
Ответ на: комментарий от Enthusiast

Это не переизобретение семафора или мьютекса, как тут пишут форумчане выше, это отложенный вид обслуживания по предварительному запросу.

Это может быть решением для какого-то типа задач, но не для всех и уж точно не для моей.

nobody ★★
() автор топика
Ответ на: комментарий от Enthusiast

Короче я понял, вместо того, чтобы написать три строчки с read write lock ты предлагаешь создавать веб-сервис. Ещё под него можно кластер развернуть, для отказоустойчивости. Одобряю.

vbr ★★★★★
()
Ответ на: комментарий от nobody

несколько атомарных операций не обеспечивают синхронизацию. у тебя будет ситуация когда быстрый читатель держит указатель на невалидные данные который будет менять писатель. в терминах с++ для твоей схемы есть спец shared_mutex с дешевой проверкой на чтение.

anonymous2 ★★★★★
()
Последнее исправление: anonymous2 (всего исправлений: 2)
Ответ на: комментарий от nobody

Но у меня задача организационно проще, поэтому на отдельного диспетчера заморочиваться нет смысла

фига себе попроще...когда треды исчисляешь сотнями :-) у меня читателей десяток, в высоком прыжке двадцать. И то на пороге «взять in-memory db».

когда читателей много, у писателя появляется высокий шанс надолго зависнуть в ожидании. А как правило писатель приоритетный, ему нельзя тормозить.

MKuznetsov ★★★★★
()
Ответ на: комментарий от nobody

А, ну, если те же данные постоянно нужны всем, а не, как я подумал, каждый поток утаскивает из общей кучи данные на обработку (т.е., проще говоря, сводится к очереди, в которую один поток накладывает, остальные утаскивают), то тогда да, не подойдёт такая история.

Выше по топику предложили хорошее решение для моей задачи.

Ну, в принципе, почему-бы и нет. Выглядит рабочим решением. Единственное, ИМХО я-бы как-нибудь помечал, используется-ли в данный момент рабочий буфер или простаивает, а то можно подменить в момент, когда кто-то вычитывает и неприятно получится. Впрочем, если это не важно для задачи, то и пофиг.

SkyMaverick ★★★★★
()
Ответ на: комментарий от nobody

Я бы не делал жестко 2 буфера.

Сделал бы один «указатель» (в смысле, один экземпляр указателя - назовём его P), указывающий на память с иммутабельными данными. Доступ к указателю P - под rwlock. Тип указателя P - с семантикой shared_ptr.

Читатель под read lock делает себе копию P и отпускает read lock. Затем спокойно работает с иммутабельными данными безо всякой синхронизации. Пока есть хотя бы одна живая копия указателя, память остается аллоцированной, данные живут. Когда умирает последняя копия указателя, данные разрушаются (синхронизация тут не нужна, т.к. это был последний живой указатель на данные, с ними 100% уже никто не работает), память отпускается.

Писатель аллоцирует память под новые данные, заполняет её в комфортном для себя темпе (синхронизация не нужна, т.к. кроме писателя пока никто не знает об этой памяти). Затем захватывает write lock, делает себе на стек копию указателя P, меняет P так чтобы тот указывал на новые данные, и отпускает write lock. Затем со стека отпускает свою копию старого значения P (если никто из читателей на этот момент не имеет копии старой версии P, то тут будет происходить умирание старых данных и отпускание старой памяти).

Преимущество этой схемы в том, что она не делает никак предположений о времени жизни данных, о количестве читателей и писателей, о скорости их работы, и тд. Таким образом дальнейшее развитие программы не сковано этими предположениями, и не потребует изменений в этой схеме.

struct Data {....};

using DataP = std::shared_ptr<Data>;
using DataCP = std::shared_ptr<const Data>;

class DataStorage
{
public:
  DataCP get()
  {
    std::shared_lock<std::shared_mutex> read_lock(_rwlock);
    return _P;
  }
  void set(const DataCP &newptr)
  {
    DataCP oldptr;
    {
      std::unique_lock<std::shared_mutex> write_lock(_rwloc);
      oldptr = _P;
      _P = newptr;
    }
    // release oldptr
  }

private:
  DataCP _P;
  std::shared_mutex _rwlock;
};

void reader_thread(DataStorage &storage)
{
   ....
   DataCP data = storage.get();
   ....
}

void writer_thread(DataStorage &storage)
{
   ....
   {
     DataP data = std::make_shared<Data>(....);
     ....
     storage.set(data);
   }
   ....
}

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 5)

Ну и по теме топика.

Делать потоков много больше, чем есть аппаратных процессорных ядер, кажется странной затеей - не эффективной. Nginx, который обслуживает тысячи одновременных сетевых соединений, почему-то обходится без тысяч потоков - это-жжж неспроста!

А если потоков не очень много, то оно просто из соображений здравого смысла не должно упереться в «ошибка: достигнут предел на количество читателей». Там же наверняка просто целочисленный счетчик под капотом, а ошибка посвящена его переполению; ну не будет же у тебя в программе 33.000 потоков?!

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 3)
Ответ на: комментарий от Enthusiast

Средствами доставки запросов на чтение/запись данных, самих данных для записи и подтверждений об успешности выполнения действий могут быть трубы, очереди сообщений и даже файлы. Суть в том, что в «службохранительном» подходе потокам не нужно драться и тратить свои вычислительные мощности за право получить обслуживание. Это не переизобретение семафора или мьютекса, как тут пишут форумчане выше, это отложенный вид обслуживания по предварительному запросу.

Такая архитектура имеет право на жизнь для определенного круга задач. Однако в общем случае она может работать не эффективно, поскольку:
1. Однопоточный диспетчер будет «бутылочным горлышком», т.е. будет раздавать свои ответы клиентам не достаточно быстро для того, чтобы те смогли полностью утилизировать все ядра;
2. Велики накладные расходы на сложное взаимодействие с диспетчером. Клиент должен разместить запрос у диспетчера, переключиться на другую работу пока ответ на запрос не готов, следить за готовностью ответа, вернуться к старой работе когда ответ созреет. А таких отложенных запросов может быть не 1 а 100, надо где-то помнить клиентский контекст для них всех, тратить ресурсы на управление им;
3. Latency также далека от идеала (если важен не только throughput);

драться и тратить свои вычислительные мощности за право получить обслуживание

Аналогия с дракой применима в основном только к spinlock. В остальных случаях поток, если нужный ресурс занят, не дерётся и не тратит ресурсы, а засыпает.

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 9)
Ответ на: комментарий от Manhunt

Клиент должен разместить запрос у диспетчера, переключиться на другую работу пока ответ на запрос не готов, следить за готовностью ответа, вернуться к старой работе когда ответ созреет. А таких отложенных запросов может быть не 1 а 100, надо где-то помнить клиентский контекст для них всех, тратить ресурсы на управление им

Такое управление контекстами сильно пахнет корутинами (await и вот это всё).

Manhunt ★★★★★
()
Ответ на: комментарий от Enthusiast

Перезапись общих данных и их чтение происходят менее отзывчиво, да.

В задаче ТС, где записи редкие, потеря параллельного чтения это весьма плохо.

Зато поток может заниматься своими делами во время ожидания запрошенных данных из службы-хранителя.

Или этим делами может заниматься другой поток. Необходимость экономии на потоках зависит от их «тяжеловесности». Поток, ожидающий своей очереди на каком-нибудь futex, ресурсов CPU не тратит

Ну и чисто для справки «замки» бывают и асинхронные (неблокирующие поток). Например https://github.com/jbaldwin/libcoro#shared_mutex

MirandaUser2 ★★
()
Ответ на: комментарий от MirandaUser2

асинхронные (неблокирующие поток). Например

The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the current shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter.

Это очень даже синхронный, блокирующий мьютекс.

r--r--r--
()
Ответ на: комментарий от r--r--r--

Блокируется корутина, а не поток, в котором она исполнялась. Поток (в примере там тредпул из 1 потока) возьмёт на исполнение другую таску.

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 1)
Ответ на: комментарий от r--r--r--

Блокируется доступ к разделяемому ресурсу (собственно это и есть задача мьютекса или rwlock).
Неблокирующим (асинхронным) является ожидание этого доступа в случае использования shared_mutex из библиотеки libcoro.

В процитированной документации говорится то, что корутины кладутся в FIFO очередь.

MirandaUser2 ★★
()
Ответ на: комментарий от MirandaUser2

Неблокирующим (асинхронным) является ожидание этого доступа

Давай попробуем ещё раз, медленно и по слогам прочитать школьный англюсик:

will suspend
will also wait behind

документации говорится то, что корутины кладутся

Нет. В документации английским по ASCII написано, что выполнение короутин останавливается. Это называется по-русски - блокировка.

r--r--r--
()
Последнее исправление: r--r--r-- (всего исправлений: 1)
Ответ на: комментарий от anonymous2

у тебя будет ситуация когда быстрый читатель держит указатель на невалидные данные который будет менять писатель.

Не, не будет такой ситуации. Читателю незачем держать у себя указатель на постоянной основе (кэшировать). Быстро прошёлся по разделяемым данным и вернулся обрабатывать свои собственные. При следующем разе опять возьмёт указатель на разделяемые данные (уже может быть другие, потому что указатель будет на другую область памяти).

nobody ★★
() автор топика
Ответ на: комментарий от MKuznetsov

фига себе попроще

Конечно проще. Дело ведь не в количестве, а в сложности логики взаимодействия. Для программиста в этом коде

for (i = 0; i < n; i++) do_something(i);

неважно, чему равно n. Хоть 10, хоть 10000. Логически это простая задача. У меня вот типа такой, только n обрабатывается не последовательно, а параллельно. А у тебя там сложная приоритезация, поэтому тебе нужен диспетчер (наверно).

когда читателей много, у писателя появляется высокий шанс надолго зависнуть в ожидании. А как правило писатель приоритетный, ему нельзя тормозить.

У меня не так. Писатель будет долго и упорно получать обновление некоторых данных из сети. Потом делать с ними всякую другую работу. Это всё долго, а срабатывать не будет, скорее всего, ни разу за время работы программы. Ну или разово (пару раз?). Вобщем, Заказчик заложил такую фичу просто на всякий случай. Так что приоритет тут у обрабатывающих потоков (читателей), а писатель может и подождать, на его скорости работы это всё равно не скажется (тормоза у него в другом месте).

nobody ★★
() автор топика
Последнее исправление: nobody (всего исправлений: 1)
Ответ на: комментарий от SkyMaverick

Единственное, ИМХО я-бы как-нибудь помечал, используется-ли в данный момент рабочий буфер или простаивает, а то можно подменить в момент, когда кто-то вычитывает и неприятно получится.

Не получится. Писатель работает ну очень сильно дольше читателя (даже не в разы, а на порядки). Когда писатель захочет взять ранее освободившийся им буфер, читателей у него точно не будет. Я думал об этом, но атомарно сделать пометку не очень получается (более-менее экономно по памяти). Если бы заранее знать, сколько может быть читателей, и их было бы немного, можно было бы внести в буфер поле на n слов, по кол-ву читателей. Читатель перед началом работы ставит слово в 1, перед концом сбрасывает в 0. Перед реюзом буфера писатель проверяет всё это на равно 0. Слово не может установиться в 1 прямо при чтении писателем, потому что этот буфер уже вне адресуемых читателями (с прошлого запуска писателя). Оно может быть или уже равно 0, или сброситься в 0. Но кол-во читателей мне неизвестно, в общем случае это хз какая величина. Так что всё равно потребуется какой-то механизм синхронизации, а это ненужные накладные расходы. Вобщем, не стоит оно того, решил я для себя.

nobody ★★
() автор топика
Ответ на: комментарий от Manhunt

Преимущество этой схемы в том, что она не делает никак предположений о времени жизни данных, о количестве читателей и писателей, о скорости их работы, и тд. Таким образом дальнейшее развитие программы не сковано этими предположениями, и не потребует изменений в этой схеме.

Да я ж не спорю, что rwlock тут сам напрашивается. Заглавный пост был именно про него.

nobody ★★
() автор топика
Ответ на: комментарий от Manhunt

Делать потоков много больше, чем есть аппаратных процессорных ядер, кажется странной затеей - не эффективной.

Да не моё это дело – ограничивать юзера на кол-во потоков. Он лучше меня знает, как ему эффективнее. Я же понятия не имею, что они завтра купят на ВЦ.

А если потоков не очень много, то оно просто из соображений здравого смысла не должно упереться в «ошибка: достигнут предел на количество читателей». Там же наверняка просто целочисленный счетчик под капотом

Опять предположения. Будь добр, перечитай тред, это всё уже обсудили выше. Для меня предложенное решение более предсказуемо, чем предположение о типе счётчика и о каких-то может быть встроенных ограничениях на его макс. значение. Стандарт мне ничего не даёт на этот счёт – из-за чего и появилась эта тема.

nobody ★★
() автор топика
Ответ на: комментарий от nobody

Опять предположения.

Искусственный Идиот подсказывает, что:

  • Linux (glibc): Generally, it can handle over 2 billion (2^{31}-1) concurrent read locks, effectively limited by the integer type used for the counter.
  • macOS: Has been observed to break or overflow around 16 million (2^{24}) simultaneous read locks.


Ты готов заложиться на линукс-специфичный вызов clone, но не готов заложиться на то, что линуксячья реализация pthreads сохранит этот лимит хотя бы на уровне (2^15-1) после сегодняшних (2^31-1) ?

Да не моё это дело – ограничивать юзера на кол-во потоков.

Как раз твоё. Ты, как автор программы, должен донести до пользователя инструкцию по её правильному использованию. Пользователь не должен гадать на кофейной гуще, а иметь в своем распоряжении The Fine Manual.

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 2)
Ответ на: комментарий от nobody

Да я ж не спорю, что rwlock тут сам напрашивается. Заглавный пост был именно про него.

Я предлагаю захватывать его не на время работы читателя с данными, а на время копирования указателя на данные. Так и писателю меньше ждать придётся, и читателей, одновременно удерживающих лочку, будет меньше.

Manhunt ★★★★★
()
Ответ на: комментарий от Manhunt

Ты готов заложиться на линукс-специфичный вызов clone, но не готов заложиться на то, что линуксячья реализация pthreads сохранит этот лимит хотя бы на уровне (2^15-1) после сегодняшних (2^31-1) ?

Я не хочу закладываться на linux-специфичные вещи. А хочу писать переносимо. Но для этого мне нужна предсказуемость поведения предлагаемых стандартом функций. Я ж про это пол-треда пишу.

Ты, как автор программы, должен донести до пользователя инструкцию по её правильному использованию.

Это правильно. Но я не могу написать в инструкции, что «указывай программе потоков немного». Немного – это сколько? Прочти ещё раз заглавный пост, об этом там написано.

nobody ★★
() автор топика
Ответ на: комментарий от Manhunt

Я предлагаю захватывать его не на время работы читателя с данными, а на время копирования указателя на данные.

А во время копирования читатель с данными работать не будет? Если не будет, то rwlock не нужен. Если будет, то как понимать «Я предлагаю захватывать его не на время работы читателя с данными»?

nobody ★★
() автор топика
Ответ на: комментарий от nobody

Там псевдокод есть, не знаю что ещё добавить. Ни читателю, ни писателю не нужен лок на всё время работы с данными. Лок нужен только на время доступа к той ячейке памяти, где хранится смартпоинтер на данные: чтобы этот смартпоинтер либо скопировать к себе, либо перезатереть.

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 1)
Ответ на: комментарий от nobody

Но я не могу написать в инструкции, что «указывай программе потоков немного». Немного – это сколько?

Я не знаю деталей твоей программы, но было бы логично, если бы кол-во потоков было приблизительно равно кол-ву аппаратных ядер (меньше - не утилизируешь ядра, сильно больше - будут бессмысленные потери на переключении контекстов).

Manhunt ★★★★★
()
Ответ на: комментарий от Manhunt

Там псевдокод есть, не знаю что ещё добавить.

Попытался со второй попытки распарсить то, что ты называешь псевдокодом (на самом деле там что-то похожее на плюсы, а я их забыл давно). Если я правильно понял, ты предлагаешь подсчёт кол-ва текущих пользователей буфера. Каждый пользователь атомарно инкрементирует счётчик перед началом работы с буфером. После завершения декрементирует. При попытке освободить буфер память будет отдана потом, когда при декременте будет 0. Про нечто подобное я уже писал выше, но тут опять нужен rwlock. Возможно, я его и заюзаю, пока не решил окончательно. Но только если сам напишу его, чтобы работало предсказуемо везде.

nobody ★★
() автор топика
Ответ на: комментарий от Manhunt

Я не знаю деталей твоей программы, но было бы логично, если бы кол-во потоков было приблизительно равно кол-ву аппаратных ядер

А я понятия не имею, на чём будет работать программа когда-нибудь завтра. Сколько там будет процессоров и ядер?

nobody ★★
() автор топика
Ответ на: комментарий от ya-betmen

Видимо man std::atomic_ref

Я так понимаю, что это какая-то надстройка поверх POSIX mutex? В принципе, можно подумать об использовании mutex вместо rwlock для подсчёта пользователей буфера.

nobody ★★
() автор топика
Ответ на: комментарий от nobody

Если переменная достаточно маленькая по размеру (условно, как регистр, но на самом деле чуть сложнее), то некоторые простые действия (инкремент,декремент,условный обмен и ещё что-то) над ней можно безопасно делать без блокировок в понимании ОС. В коде такие атомик-операции могут оформляться по-разному.

Собственно внутри мютексов и прочего такая штука (атомик) внутри всегда есть, чтобы большинство действий с ним (если не требуется ждать другой поток который его уже залочил) делалось сразу же быстро без сисколлов.

Я не очень понял почему он тебе советует атомик, возможно предлагает вместо системного rwlock свою реализацию написать.

firkax ★★★★★
()
Последнее исправление: firkax (всего исправлений: 2)
Ответ на: комментарий от Manhunt

Алиса с моей подачи вот такое насрала взамен shared_ptr. Выглядит правдоподобно. Но Си-шка с её отсутствием RAII, это, конечно, грусть и тоска.

#include <stdatomic.h>
#include <stdlib.h>

// Предположение: структура Data уже определена
typedef struct Data Data;
void data_init(Data *);
void data_destroy(Data *);

// Интрузивный счётчик ссылок внутри структуры
struct SharedData {
    atomic_int ref_count;
    Data data;
};

// Создание shared-указателя (аналог make_shared)
struct SharedData* make_shared_data(void) {
    struct SharedData* ptr = malloc(sizeof(struct SharedData));
    if (!ptr) {
        return NULL;
    }

    // Устанавливаем начальный счётчик ссылок = 1
    atomic_init(&ptr->ref_count, 1);

    // Инициализируем данные
    data_init(&ptr->data);

    return ptr;
}

// Копирование shared-указателя (увеличение счётчика)
struct SharedData* shared_data_copy(struct SharedData* ptr) {
    if (!ptr) {
        return NULL;
    }

    // Атомарно увеличиваем счётчик ссылок
    // memory_order_relaxed достаточно, так как мы только увеличиваем счётчик
    atomic_fetch_add_explicit(&ptr->ref_count, 1, memory_order_relaxed);

    return ptr;
}

// Освобождение shared-указателя (уменьшение счётчика и уничтожение при необходимости)
void shared_data_destroy(struct SharedData* ptr) {
    if (!ptr) {
        return;
    }

    // Атомарно уменьшаем счётчик ссылок и получаем старое значение
    // memory_order_acq_rel гарантирует корректную синхронизацию
    int old_count = atomic_fetch_sub_explicit(
        &ptr->ref_count,
        1,
        memory_order_acq_rel
    );

    // Если счётчик стал 0, уничтожаем данные и освобождаем память
    if (old_count == 1) {
        data_destroy(&ptr->data);
        free(ptr);
    }
}

Manhunt ★★★★★
()
Ответ на: комментарий от Manhunt

Ну а первоначальный псевдокод тогда вот так переписывается (простите за нейрослоп):

#include <pthread.h>
#include <stdio.h>  // fprintf
#include <unistd.h>  // _exit

void terminate_on_error(const char* message) {
    fprintf(stderr, "Error: %s\n", message);
    _exit(EXIT_FAILURE);
}

struct DataStorage {
    struct SharedData* _P;
    pthread_rwlock_t _rwlock;
};

void data_storage_init(struct DataStorage* storage) {
    storage->_P = NULL;
    if (pthread_rwlock_init(&storage->_rwlock, NULL) != 0) {
        terminate_on_error("pthread_rwlock_init");
    }
}

void data_storage_destroy(struct DataStorage* storage) {
    shared_data_destroy(storage->_P);
    if (pthread_rwlock_destroy(&storage->_rwlock) != 0) {
        terminate_on_error("pthread_rwlock_destroy");
    }
}

struct SharedData* data_storage_get(struct DataStorage* storage) {
    struct SharedData* result = NULL;

    if (pthread_rwlock_rdlock(&storage->_rwlock) != 0) {
        terminate_on_error("pthread_rwlock_rdlock");
    }

    result = shared_data_copy(storage->_P);

cleanup:
    if (pthread_rwlock_unlock(&storage->_rwlock) != 0) {
        terminate_on_error("pthread_rwlock_unlock");
    }
    return result;
}

void data_storage_set(struct DataStorage* storage, struct SharedData* newptr) {
    struct SharedData* oldptr = NULL;

    if (pthread_rwlock_wrlock(&storage->_rwlock) != 0) {
        terminate_on_error("pthread_rwlock_wrlock");
    }

    oldptr = storage->_P;
    storage->_P = shared_data_copy(newptr);

cleanup:
    if (pthread_rwlock_unlock(&storage->_rwlock) != 0) {
        terminate_on_error("pthread_rwlock_unlock");
    }
    shared_data_destroy(oldptr);
}

void reader_thread(struct DataStorage* storage) {
    struct SharedData* data = NULL;

    // ....

    shared_data_destroy(data);
    data = data_storage_get(storage);
    if (!data) {
        terminate_on_error("Failed to get shared data from storage");
    }

    // ....

cleanup:
    // ....
    shared_data_destroy(data);
    // ....
}

void writer_thread(struct DataStorage* storage) {
    struct SharedData* data = NULL;

    // ....

    shared_data_destroy(data);
    data = make_shared_data();
    if (!data) {
        terminate_on_error("Failed to create shared data");
    }

    // ....

    data_storage_set(storage, data);
    shared_data_destroy(data);
    data = NULL;

    // ....

cleanup:
    // ....
    shared_data_destroy(data);
    // ....
}

Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 6)
Ответ на: комментарий от Manhunt

То, что казалось на C++ простым и лаконичным, при переписывании на Си превратилось в какого-то монстра. Такое уже и не хочется кому-либо рекомендовать. Увы.

Manhunt ★★★★★
()
Ответ на: комментарий от Manhunt

Если уж на то пошло, так @nobody в принципе может кастомные rwlock-и сделать и не париться. Тоже, кроме атомиков, ничего не нужно и кода там не то, чтобы дофига писать.

Кусок выдрал из кода, фреймворка. Вроде нормально работал.

#define RWLOCK_MAX_REF_COUNT (1ULL << 62) - 1
#define RWLOCK_READ_FLAG 1ULL << 62
#define RWLOCK_WRITE_FLAG 1ULL << 63
#define RWLOCK_COUNTER_MASK (UINT_FAST64_MAX << 2) >> 2

// RWLock-и.
// Определение структуры мютекса
typedef struct {
    atomic_uint_fast64_t flags;
} rwlock_t;

TAF_API uintptr_t
taf_rwlock_create(void) {
    rwlock_t *rwl = taf_zmalloc(sizeof(rwlock_t));
    if (!rwl)
        return 0;
    atomic_init(&(rwl->flags), 0);
    return (uintptr_t)rwl;
}

TAF_API void
taf_rwlock_free(uintptr_t m) {
    if (!m)
        return;

    rwlock_t *rwl = (rwlock_t *)m;
    taf_free_null(rwl);
}

TAF_API int
taf_rwlock_read_lock(uintptr_t m) {

    if (!m)
        return -1;
    rwlock_t *lock = (rwlock_t *)m;

    uint64_t old_flags, new_flags;
    while (1) {
        old_flags = atomic_load(&lock->flags);

        if ((old_flags & RWLOCK_WRITE_FLAG) != 0)
            continue; // Есть пишушие блокировки

        uint64_t ref_count = old_flags & RWLOCK_COUNTER_MASK;
        if (ref_count >= RWLOCK_MAX_REF_COUNT)
            continue; // Превышено количество читателей

        new_flags = old_flags + 1;

        if (atomic_compare_exchange_weak(&lock->flags, &old_flags, new_flags))
            break;
    }
    return 0;
}

TAF_API int
taf_rwlock_write_lock(uintptr_t m) {
    if (!m)
        return -1;
    rwlock_t *lock = (rwlock_t *)m;
    uint_fast64_t old_flags, new_flags;

    // Пробуем атомарно установить RWLOCK_WRITE_FLAG (не затрагивая другие биты)
    while (1) {
        old_flags = atomic_load(&lock->flags);
        if ((old_flags & RWLOCK_WRITE_FLAG) != 0)
            continue; // есть другие писатели

        new_flags = old_flags | RWLOCK_WRITE_FLAG;

        if (atomic_compare_exchange_weak(&lock->flags, &old_flags, new_flags)) {
            break; // Флаг установлен
        }
        // Повторить, если CAS облом
    }

    // Ждём всех читателей (ref_count == 0)
    do {
        old_flags = atomic_load(&lock->flags);
    } while ((old_flags & RWLOCK_COUNTER_MASK) != 0);

    return 0;
}

static inline int
_rwlock_unlock_read(rwlock_t *lock) {
    uint64_t old_flags, new_flags;
    while (1) {
        old_flags = atomic_load(&lock->flags);
        uint64_t ref_count = old_flags & RWLOCK_COUNTER_MASK;
        if (ref_count == 0) {
            // Освобождать нечего, читателей нет
            return -1;
        }

        new_flags = old_flags - 1;

        if (atomic_compare_exchange_weak(&lock->flags, &old_flags, new_flags))
            break;
    }
    return 0;
}

static inline int
_rwlock_unlock_write(rwlock_t *lock) {
    uint64_t old_flags, new_flags;
    while (1) {
        old_flags = atomic_load(&lock->flags);
        new_flags = old_flags & ~RWLOCK_WRITE_FLAG;
        if (atomic_compare_exchange_weak(&lock->flags, &old_flags, new_flags))
            break;
    }
    return 0;
}
TAF_API int
taf_rwlock_unlock(uintptr_t m) {
    if (!m)
        return -1;
    rwlock_t *lock = (rwlock_t *)m;

    uint64_t flags = atomic_load(&lock->flags);
    return (flags & RWLOCK_WRITE_FLAG) ? _rwlock_unlock_write(lock) : _rwlock_unlock_read(lock);
}
SkyMaverick ★★★★★
()
Ответ на: комментарий от MirandaUser2

Попробую сформулировать по-другому:

Не пытайся. И coro::shared_mutex::lock, и std::shared_mutex::lock - оба два блокирующие мьютексы.

P.S. Ты бы вместо демонстрации какой-то фантастической глубины тупняка сходил бы в гугл и узнал бы наконец, чем блокирующий мьютекс отличается от неблокирующего.

r--r--r--
()

Задача: нужно создать кучу потоков, которые читают разделяемые данные. И есть один, который иногда (очень редко) меняет эти данные. Напрашивающееся решение: использовать rwlock. Это точно лучше, чем mutex. Но есть засада: при попытке получить rwlock на чтение может быть возвращена ошибка: достигнут предел на количество читателей.

  1. Используйте trylock с таймаутом.
  2. Обновите системные библиотеки и ядро.
alnkapa
()
Ответ на: комментарий от firkax

Я не очень понял почему он тебе советует атомик, возможно предлагает вместо системного rwlock свою реализацию написать.

А я не очень понял, что именно мне предлагается. Судя по синтаксису, std::atomic_ref – это что-то плюсовое, что находится в стандартной библиотеке C++. Т.е. некая надстройка над системной библиотекой.

Вопрос: надстройка над чем? Над готовыми объектами типа mutex? Или из системной библиотеки торчат наружу какие-то функции типа atomic_cmp_exchange(), с помощью которых программист может написать свои объекты синхронизации, и которые будут работать на любом железе?

Ведь в общем случае, с точки зрения процессора, атомарными операциями с памятью являются только чтение или запись машинного слова по выравненному адресу. Всё остальное – специфично для конкретного железа.

Если под «атомик» понимается некий уровень абстракции, независящий от железа, который предоставляет системная библиотека для задач синхронизации, то мне он неизвестен. Во всяком случае, поиск по atomic в каталогах man’ов ничего не даёт.

nobody ★★
() автор топика
Ответ на: комментарий от Manhunt

То, что казалось на C++ простым и лаконичным, при переписывании на Си превратилось в какого-то монстра.

Спасибо за старания, но я и так уже понял суть – из плюсового кода.

nobody ★★
() автор топика
  • Markdown
Пустая строка (два раза Enter) начинает новый абзац. Знак '>' в начале абзаца выделяет абзац курсивом цитирования.
Внимание: прочитайте описание разметки Markdown.
Используйте Ctrl-Enter для размещения комментария