LINUX.ORG.RU

Синхронизация потоков. std::condition_variable.

 , ,


0

1

Привет!

Есть два потока: первый наполняет буфер (на самом деле буфера два, один наполняется, второй обрабатывается, потом меняются) данными из сети, второй ждёт данные в бесконечном цикле и должен их обрабатывать, как буфер заполнился, а остальное время спать.
Второй может работать дольше первого, соответственно пропустить несколько наполнений. Данные, которые второй поток обработать не успел, нужно отбрасывать.

Сейчас для синхронизации используется std::condition_variable, прямо в виде из примера: https://en.cppreference.com/w/cpp/thread/condition_variable

Проблема в том, что, если второй поток работал слишком долго и пропустил несколько циклов заполнения, то первый поток уже установил переменную, используемую в предикате, в состояние true и вызвал (один или несколько раз) notify_one().
И «cv.wait(lk, []{return ready;});», насколько я понимаю, снимает блокировку сразу, не дожидаясь нового события (новых данных). Второй поток при этом начинает обрабатывать старые данные. Мне это не нужно. Нужно, чтобы он ждал свежие, а те, что не успел обработать, просто отбрасывались бы.

Можно было бы использовать «cv.wait(lk);», т.е. без предиката. Тогда, насколько я понимаю, notify_one() вызванный до wait() будет проигнорирован, а сработает тот notify_one(), что вызывался уже после wait(). Что мне и нужно. Но везде пишут, что без предиката может произойти некое состояние «spurious wakeup» (ложное пробуждение), когда никто не делал notify_one() или notify_all(), а wait() вернул управление и поток-обработчик проснулся просто так.

Как такая задача правильно решается?
Каким чудом эти spurious wakeup вообще случаются?

★★★★

как-то все сумбурно.

вот у тебя 2 очереди:
Поток заполняет очередь, свопает ее со второй и делает нотификацию
Второй поток просыпается, обрабатывает очередь и ждет снова
так?

seryoga ()
Ответ на: комментарий от seryoga

Не две очереди, двойная буферизация.
Первый поток заполняет буфер по указателю P, который указывает на буфер A. Как только заполнил, он выставляет указатель P2 на буфер A, делает нотификацию и меняет указатель P, теперь он указывает на буфер B.
Второй поток просыпается по нотификации и обрабатывает буфер по указателю P2.
Нужно, чтобы второй поток игнорировал пропущенные нотификации и ждал те, которые произойдут уже после того, как он заснул.

ls-h ★★★★ ()
Последнее исправление: ls-h (всего исправлений: 1)
Ответ на: комментарий от ls-h

Нужно, чтобы второй поток игнорировал пропущенные нотификации и ждал те, которые произойдут уже после того, как он заснул.

Так оно и будет. Если CV не ждет, то оно пропустит нотификации. Сработать может по предикату. Если тебе не хватает в предикате флажка ready, добавь туда еще условие. например непустая очередь.

seryoga ()
Ответ на: комментарий от seryoga

Так оно и будет. Если CV не ждет, то оно пропустит нотификации.

Разве? Вот я изменил пример по ссылке:

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
    // Wait until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});

    // after the wait, we own the lock.
    std::cout << "Worker thread is processing data\n";
    data += " after processing";

    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";

    // Manual unlocking is done before notifying, to avoid waking up
    // the waiting thread only to block again (see notify_one for details)
    lk.unlock();
    cv.notify_one();
}

int main()
{
    data = "Example data";
    // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one(); ////// Сначала нотификация

    std::thread worker(worker_thread); ////// Потом создание треда

    // wait for the worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';

    worker.join();
}


И вот такой результат работы:

main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing
Если бы нотификация была пропущена, то не было бы надписи:
Worker thread is processing data
Или я что-то путаю?

ls-h ★★★★ ()
Последнее исправление: ls-h (всего исправлений: 2)
Ответ на: комментарий от ls-h

Если ожидание сделать вот таким «cv.wait(lk);», т.е. без предиката, то worker_thread() не просыпается. Значит он пропускает нотификацию. И, вроде как, проблема решена.

Но! Везде пишут, что обязательно ещё какое-то условие проверять, т.к. бывают ложные пробуждения (spurious wakeup).

Как это решить просто и красиво?

ls-h ★★★★ ()
Ответ на: комментарий от anonymous

в гугле забанили ?

Это я читал, про spurious wakeup сам писал. Вопрос не в этом, а как игнорировать пропущенные notify_one().

ls-h ★★★★ ()
Ответ на: комментарий от anonymous

доп условиями и флажками по вашему коду

Можно пример? Хочется не городить миллион костылей, но пока простое решение в голову не приходит.

ls-h ★★★★ ()
Ответ на: комментарий от ls-h

пример чего ? это логика вашего кода. вам видней как ее предвидеть

anonymous ()
Ответ на: комментарий от ls-h

Ставь первым потоком перед нотификацией флаг, который второй поток будет сбрасывать перед засыпанием.

anonymous ()

Смотри строчку 15 из примера, читай доку, думай мысли, люби гусей.

pon4ik ★★★★★ ()
Ответ на: комментарий от anonymous

Ставь первым потоком перед нотификацией флаг

Понятно. Соответственно и проверять флаг перед нотификацией. Если ещё поднят, значит нотификацию не делать.
Вопрос в том, где это правильно делать? Внутри вот этого блока (из примера):

{
  std::lock_guard<std::mutex> lk(m);
  ready = true;
  std::cout << "main() signals data ready for processing\n";
}

Соответственно, cv.notify_one() внести внутрь блока с lock_guard?

ls-h ★★★★ ()
Последнее исправление: ls-h (всего исправлений: 1)
Ответ на: комментарий от ls-h

Соответственно, cv.notify_one() внести внутрь блока с lock_guard?

Вообще не принципиально, гонки тут не будет.

anonymous ()
Ответ на: комментарий от anonymous

Вообще не принципиально, гонки тут не будет.

Хорошо, спасибо.
Не совсем понял, когда же происходят эти spurious wakeup.
Можно специально словить?

ls-h ★★★★ ()
Ответ на: комментарий от ls-h

откуда ж вам понять если вы нихрена из гугла и ссылок здесь не читали, хотя заявили что читали

>>в гугле забанили ?

>Это я читал, про spurious wakeup сам писал

anonymous ()
Ответ на: комментарий от ls-h

Можно даже без флага, сделать разделяемый указатель на данные, когда второй поток забирает пакет на обработку - он обнуляет указатель (под защитой mutex), когда первый поток подготовил данные - выставляет указатель (под защитой mutex), и будит второго через condition_variable, связанный с тем же mutex.

Во-первых очереди нет и указатель (лучше умный, unique_ptr) всегда хранит актуальный пакет либо равен nullptr.

Во-вторых борьба с suspicious wakeup упрощается - надо просто проверить указатель.

quiet_readonly ★★★★ ()
Последнее исправление: quiet_readonly (всего исправлений: 1)
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.