LINUX.ORG.RU

Data race

 


0

2

Привет, мужчины. Имеется вопрос, возник в ходе работы над завершением программы. Упрощаю до сути - имеется ли здесь data race?

#include <thread>
#include <mutex>
#include <vector>
#include <chrono>
#include <memory>
#include <iostream>
using namespace std;


class Thread_pool {
   vector<thread> th;
   mutex mtx;
   atomic_flag stop_flag;
   bool destruction_stage = false;
public:
   ~Thread_pool() {
      stop_flag.test_and_set(memory_order_relaxed);
      if (true) {
         lock_guard l(mtx);
         destruction_stage = true;
      }
      std::cout << th.size() << "is running" << endl; // 16
      for (thread &t : th)
         t.join();
   }
   bool add_thread(void(f)(atomic_flag *)) {
      lock_guard l(mtx);
      if (destruction_stage)
         return false;
      th.push_back(thread(f, &stop_flag));
      return true;
   }
} th_pool;


void tf(atomic_flag *f) {
   while (! f->test(memory_order_relaxed)) {
      this_thread::sleep_for(1s);
      for (int i = 0;  i < 3;  ++i)
         th_pool.add_thread(tf);
   }
}

int main() {
   th_pool.add_thread(tf);
   this_thread::sleep_for(3s);
}

Имеется разделяемй между потоками ресурс (сам пул в данном случае). Всё внимаение на деструктор пула - логично, что нельзя взять мьютекс и убивать всех не спеша, другой поток так же захочет этот мьютекс и случится deadlock, в нём я ставлю флаг, который не даёт конкурирующим потокам дальше как-то использовать данные, сразу выход в add_thread(). Расчёт на то, что мьютекс при котором флаг установлен - последнее в цепи его событий, ну а раз так, то ни один поток (кроме главного) не может после этого события данные использовать. Но я не уверен, например вот:

atomic_thread_fence imposes stronger synchronization constraints than an atomic store operation with the same std::memory_order. While an atomic store-release operation prevents all preceding reads and writes from moving past the store-release, an atomic_thread_fence with memory_order_release ordering prevents all preceding reads and writes from moving past all subsequent stores.

Т.е. если правильно понял, то atomic_thread_fence может гарантировать, что какая-нибудь запись не пролезет до него, а вот с простыми атомиками такой гарантии уже не будет. А мьютексы?

С санитарами собирал, data race не находят.

PS: ГПТ беседу не вывозит, нужно мнение кожаных экспертов.



Последнее исправление: kvpfs_2 (всего исправлений: 2)

Привет, мужчины.

Привет, сексист.

С санитарами собирал, data race не находят.

TSan проверяет в рантайме, надо гонять под нагрузкой и долго. Надёжнее, чем TSan, ни один кожаный ублюдок не определит.

annulen ★★★★★
()

у вас логикой все нормально? если вы защищаете деструктор пула мьютексом, значит в вашей системе, во время деструкции пула может кто-то к нему обращаться. а раз может «во время», значит может и после деструкции обращаться, а это уже ошибка совсем.

обращение во время деструкции надо оформлять паникой или эксепшеном. и смотреть откуда оно берется вообще, ибо такого быть не может в корректной программе.

в правильной логике сначала стопят всех, кто может лезть в обьект, и потом убивают обьект.

то есть тредпул должен иметь функцию stop_all(), где он переводится в неактивное состояние и не может далее добавлять треды, и в ней он ожидает завершения всех его тредов.

но тредпул должен быть недеструктрурирован, пока есть ненормальные, что могут вызвать его функции.

alysnix ★★★
()
Последнее исправление: alysnix (всего исправлений: 2)
Ответ на: комментарий от annulen

Надёжнее, чем TSan

Я против санитаров ничего не имею, но остался осадочек - он даёт false positive на condition variables, хз, может пофиксили, но доверие слегка упало.

kvpfs_2
() автор топика
Ответ на: комментарий от alysnix

у вас логикой все нормально? если вы защищаете деструктор пула мьютексом, значит в вашей системе, во время деструкции пула может кто-то к нему обращаться.

А почему нет? Программа многопоточная, там плагины со своими потоками, запросы статуса текущих потоков, удаление, добавление плагинов с потоками.

а раз может «во время», значит может и после деструкции обращаться, а это уже ошибка совсем.

Так деструктор этим и занят, что даёт команду на завершение всего этого зоопарка, делает джойн() для всех потоков и выпиливается сам. Не, ну может я там что нарушил вроде: «после начала выполнения деструктора обращение к объекту - ЮБ», но сомневаюсь. И вообще это непринципиально, я могу это и в условный stop() вынести, суть от этого не изменится, всё равно нужно пробежаться по массиву потоков в конкурентной среде, избежать deadlock’ов и модификаций в процессе завершения.

kvpfs_2
() автор топика

Конечно у тебя race, ты сначала проверяешь флаг, затем ждёшь секунду и лезешь в мютекс. Вот в эту секунду мютекс окажется занятым будет либо дедлок если ты терпеливо ждёшь join треда, либо UB если ты его убиваешь.

Просто проверка флага без атомарной его модификации или без ещё одного мютекса снаружи (если ты хочешь связать дальшейшее действие с состоянием флага) - действие бесполезное.

firkax ★★★★★
()
Последнее исправление: firkax (всего исправлений: 2)
Ответ на: комментарий от firkax

Вот в эту секунду мютекс окажется занятым будет либо дедлок если ты терпеливо ждёшь join треда, либо UB если ты его убиваешь.

тредпул сидит на join с отпущенным мьютексом. тред его получит, попытается добавить еще три треда, произойдет выход из add_thread() с false, после чего будет проверен флаг завершения и тред выйдет.

alysnix ★★★
()
Последнее исправление: alysnix (всего исправлений: 1)
Ответ на: комментарий от alysnix

А да и правда. Тогда destruction_stage это и есть флаг, защищённый мютексом, про который я писал. А stop_flag немного лишний и только запутывает читателя кода. Насколько там нужен атомик лень думать, подозреваю что не нужен (после переделывания кода на единый флаг).

Зачем таскать везде указатель на stop_flag тоже непонятно, если всё равно к тредпулу обращаешься в котором он хранится.

firkax ★★★★★
()
Последнее исправление: firkax (всего исправлений: 2)
Ответ на: комментарий от alysnix

Оно не может про него ничего не знать т.к. вызывает add_thread из него. А потом выйдет что stop_flag у него от одного тредпула по указателю, а add_thread вызывается из другого (если их несколько), и благодаря этой путанице программист долго не заметит проблемы.

firkax ★★★★★
()
Последнее исправление: firkax (всего исправлений: 1)
Ответ на: комментарий от firkax

а если будут разные тредпулы? в коих можно запускать треды с данным телом? конкретный тредпул ставит свой адрес флага завершения. тут все верно. куда добавили, того и флаг

alysnix ★★★
()
Последнее исправление: alysnix (всего исправлений: 1)
Ответ на: комментарий от firkax

Зачем таскать везде указатель на stop_flag тоже непонятно, если всё равно к тредпулу обращаешься в котором он хранится.

В программе возможно остановка/добавление отдельных элементов, в некоторые из них нужно «вдуть» поток. Т.к. остановка этих частей тоже предусмотрена и если сделать по твоему, то придётся постоянно дергать тредпул, обращаться к его ассоциативному массиву со своим thread_id в качестве ключа, брать мьютекс. Всё же передать и проверить атомик - как-то сильно проще.

На самом деле в функцию потока передаётся даже целых три флага - 1. требования остановки, 2. признак запущенности (пул узнает, что плагин упал), 3. вроде deferred execution, т.е. сначала инициализируем всю систему как было при последнем выключении, подцепляем все плагины, восстанавливаем все линки между ними, дергаем этот флажок и все потоки побежали.

kvpfs_2
() автор топика
Ответ на: комментарий от kvpfs_2

а раз может «во время», значит может и после деструкции обращаться, а это уже ошибка совсем.

Так деструктор этим и занят, что даёт команду на завершение всего этого зоопарка, делает джойн() для всех потоков и выпиливается сам

Это не работа деструктора, не мешай разные сущности в одну. Правильно тебе пишут, меняй логику, чтобы такая ситуация была в принципе невозможной

XMs ★★★★★
()
Ответ на: комментарий от XMs

Это не работа деструктора, не мешай разные сущности в одну.

В общем соглашусь, лучше сделать отдельный stop_threads().

Правильно тебе пишут, меняй логику, чтобы такая ситуация была в принципе невозможной

Это невозможно, с логикой там всё норм вроде, просто плагины имеют боьшие возможности, могут графики открывать (там поток), цеплять другие плагины с потоками, ну и в это время юзер может кикнуть прогу. Ну что ты тут сделаешь, если нужно одновременно взять мьютекс, и сделать join() на потоке, которому нужен тот же мьютекс.

В общем-то мне кажется, что моё решение должно быть рабочим и решать задачу с deadlock’ом, лишь хочется подтверждения, хотя … ведь санитары со мной согласны …

kvpfs_2
() автор топика

Архитектура. Если кто-то пытается обратиться к инстансу, который уже на фазе разрушения, это… плохо.

Т.е. нужно, как минимум, защитить сам инстанс, снаружи. Что бы тот, кто к нему обращается мог понять, что работать не с чем и пора сворасивать функционал.

hatred ★★★
()
Последнее исправление: hatred (всего исправлений: 1)

Кстати, ещё смежный вопрос - как уведомить главный поток (он стоит в poll(stdin) о приходе сигнала пироге (тот сигнал, который SIGTERM и тп)? Обойтись при этом без всяких «проснулся по таймауту и проверил флаг». Была идея открывать пайп дополнительный и передавать в полл главному, из обработчика сигнала делать клоуз этого пайпа (это сигнал сейф, POSIX говорит). Но там какая-то муть, мол если во время клоуз приходит ещё один сигнал, то его состояние unspicified. Даже не знаю теперь - с пайпом годная идея и как оно спец-но в итоге в линуксе, надо смотреть, или же есть вариант лучше чего-то, что можно кинуть в полл и дёрнуть из сигнала?

kvpfs_2
() автор топика
Ответ на: комментарий от anonymous2

Вообще в пуле два варианта условного add_thread - 1. с пайпом в качестве стоп сигнала, это для тех, кто стоит в poll, 2. С флагом - юзерские плагины-поделки, та что угодно может быть, тупо даю флаг и не парюсь чем там занимаются

kvpfs_2
() автор топика

Кстати, случайно вспомнил как я уже решал такую задачу. Мужчины, мотайте на ус, псевдокод:

class Run_tag {
   atomic_int *m_f;
   Run_tag(atomic_int *f) : m_f(f) {++(*m_f);}
   ~Run_tag() {--(*m_f);}
   Run_tag(Run_tag &) = delete;
   void operator=(Run_tag &) = delete;
};

class Thread_pool {
   atomic_int m_f;
   vector <threads> m_th;
   mutex m_mtx;
public:
   void run(fn, ...) {
      thread t(fn, &m_f, ...);
      ...
   }
   void stop_thread() {
      send_stop_signal();
      while (m_f != 0);
      lock_guard l(m_mtx);
      for (i& : m_th)
         i.join();
   }
};

void thread_fn(Run_tag tag, ...) {}

Во всяком случае как вариант, здесь хотя бы вопросов не должно быть.

kvpfs_2
() автор топика

В ходе дальнешего допиливания оказалось, что мне нужен std::timed_mutex, остальное не подходит. Теперь главный поток ждет бесконченость, остальные потоки пытаются несколько секунд и отваливаются.

Благодарю за помощь, мужчины.

PS: почему мужчины? А разве женщинам это интересно? Разве это поможет им варить борщ? Выше плиты не приыгнут)

kvpfs_2
() автор топика