LINUX.ORG.RU

Конкурирующая очередь в thread C++ (код)

 ,


0

1

В продолжении темы Конкурирующая очередь в thread C++

суть которой в кратце

  • 3 конкурирующие потока
    • п1 самый быстрый читает кадры с камеры
    • п2 записывает кадр как есть на диск через интервалы времени
    • п3 декодирует кадр, затем кодирует в отдельном потоке и пишет в сеть
  • для потоков создан буфер заранее выделенных кадров (длинной 5 шт должно хватать!?)
  • алгоритм конкуренции выглядит следующим образом - двунаправленный список из структур временной метки и индекса буфера, п1 забирает GetIdxPkt из начала списка индекс, пишет кадр по этому номеру буфера и добавляет PutIdxPkt индекс в конец, п2 и 3 забирают индекс с конца списка и после обработки кадра добавляют индекс в начало.

Собсна код очереди, указатель на которую передам в эти потоки, нуждаюсь в оценке его адекватности

typedef struct
{
    time_t ts;
    uint8_t pos;
} stPktLive_t;

enum eQueuePos_t
{
    FRONT,
    BACK
};

class CQueue
{
private:
    const uint8_t maxQ;

    uint8_t cntQ;
    list<stPktLive_t> ltQ;

    mutex mtx;
    condition_variable cvPut, cvGet;
    using guard = unique_lock<mutex>;

public:

CQueue(uint8_t lQueue)
    : maxQ(lQueue), cntQ(0)
{}

~CQueue()
{
    if (ltQ.size() > 0) ltQ.clear();
}

int PutIdxPkt(stPktLive_t stV, eQueuePos_t pos)
{
    guard g(mtx);
    while (cntQ == maxQ) cvPut.wait(g);

    mtx.lock();

    if (pos == FRONT) ltQ.push_front(stV);
    else ltQ.push_back(stV);
    cntQ++;

    cvGet.notify_all();//notify_one()

    mtx.unlock();

    return cntQ;
}

int GetIdxPkt(stPktLive_t & stV, eQueuePos_t pos)
{
    guard g(mtx);
    while (cntQ == 0) cvGet.wait(g);

    mtx.lock();

    if (pos == FRONT)
    {
        stV = ltQ.front();
        ltQ.pop_front();
    }
    else
    {
        stV = ltQ.back();
        ltQ.pop_back();
    }
    cntQ--;

    cvPut.notify_all();

    mtx.unlock();

    return cntQ;
}
};

стар и остановку работу с очередью еще не обдумал

★★★

Последнее исправление: wolverin (всего исправлений: 1)

uint8_t cntQ;

std::size_t

list<stPktLive_t>

Почему не std::queue?

using namespace std

это фу, даже в cpp файле. Но это вкусовщина.

while (cntQ == 0) cvGet.wait(g);

В wait можно передать предикат и избавится от while.

mtx.lock();

При выходе из wait мьютекс будет заблокированным. Зачем второй раз? Если используешь unique_lock, то вручную не надо мьютексом управлять.

mtx.unlock();

Зачем тебе тогда unique_lock? Только потому что std::condition_variable просит. Выкинь всю ручную работу с мьютексами.

ox55ff ★★★★★
()

cvPut.notify_all();
mtx.unlock();

Сначала освобождай мьютекс, а только потом сигналь потокам. Но я вручную не вызываю unlock. Вместо этого блок кода с мьютексом оборачиваю в {}, чтобы сократить время жизни unique_lock.

void func()
{
    {
       std::unique_lock lk(m);
       cv.wait(lk, []{return ready;});
       ...
    } // Здесь мьютекс освободится
    cv.notify_one();
}
ox55ff ★★★★★
()
Последнее исправление: ox55ff (всего исправлений: 1)
Ответ на: комментарий от ox55ff

std::size_t

он сильно отличается по размеру, особенно в разных ОС

Почему не std::queue?

я так понял list быстрее когда удаление и вставка одновременно с разных сторон выполняются

При выходе из wait мьютекс будет заблокированным.

странно, а пишут что вроде как мутекс освобождается, может не правильно понял.

над остальным думаю.

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от ox55ff

а есть ли разница между notify_one и notify_all, если несколько потоков ждут? все равно же неизвестно какой получит первым доступ, но после освобождения мутекса следующий же должен начать выполнение

wolverin ★★★
() автор топика
Ответ на: комментарий от ox55ff

переписал по вашему совету на лабмаду, вроде компилируется, не знаю только работает или нет как надо )

int PutIdxPkt(stPktLive_t stV, eQueuePos_t pos)
{
    {
        guard g(mtx);
        cvPut.wait(g, [&]{return cntQ == maxQ;});

        if (pos == FRONT) ltQ.push_front(stV);
        else ltQ.push_back(stV);
        cntQ++;
    }

    cvGet.notify_all();//notify_one()

    return cntQ;
}

int GetIdxPkt(stPktLive_t & stV, eQueuePos_t pos)
{
    {
        guard g(mtx);
        cvGet.wait(g, [&]{return cntQ == 0;});
        while (cntQ == 0)

        if (pos == FRONT)
        {
            stV = ltQ.front();
            ltQ.pop_front();
        }
        else
        {
            stV = ltQ.back();
            ltQ.pop_back();
        }
    cntQ--;
    }

    cvPut.notify_all();

    return cntQ;
}
wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

cvPut.wait(g, [&]{return cntQ == maxQ;});

В лямбде нужно писать условие ВЫхода из ожидания. А когда у тебя был цикл while, то в нём было условие продолжения ожидания.

std::size_t сильно отличается по размеру, особенно в разных ОС

Его возвращают контейнеры из stl. Им почему то не мешает. Или ты там всякие хаки с переполнением используешь? Ну тогда да, тебе size_t не подойдёт. Только вот хаки не нужны.

я так понял list быстрее когда удаление и вставка одновременно с разных сторон выполняются

Сложность вставки в начало у list - константа, да. Только вот не забывай, что list хранит элементы в куче по отдельности. А куча это дорого.

странно, а пишут что вроде как мутекс освобождается, может не правильно понял.

Когда поток уходит на ожидание - мьютекс освобождается, когда просыпается - мьютекс берётся повторно.

не знаю только работает или нет как надо

Не надо троллить.

ox55ff ★★★★★
()
Ответ на: комментарий от ox55ff

Спасибо большое, теория стала понятней, не троллю, вопрос не в синтаксисе, а как алгоритм будет работать на практике - с одной стороны читающий поток формирует данные для потоков записи и пересжатия, а с другой это они глобально должны стартануть и остановить чтение… Надо думать дальше )

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от ox55ff

Да я знаю что list побольше в памяти занимает, но я надеюсь буфера в 5 (или немного больше) кадров должно хватать, чтобы ожидание в очереди данных было минимально - нужно выжать максимальный фпс для онлайн видео.

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

у тебя пример написан в коде -

https://en.cppreference.com/w/cpp/thread/condition_variable

ps. пример говенный, там нотифицируется только один тред. это частный случай. на очереди в общем случае могут ждать N тредов и нотифицировать надо все, иначе будет хитрая бага.

alysnix ★★★
()
Последнее исправление: alysnix (всего исправлений: 1)
Ответ на: комментарий от wolverin

У тебя операции с очередью будут происходить 25-60 раз в секунду. Это настолько редко, что никакой разницы в бенчмарках не будет заметно. Большую часть вычислительных ресурсов CPU и шину памяти займёт получение, кодирование и отправка кадров.

i-rinat ★★★★★
()
Ответ на: комментарий от wolverin

с таймаутами надо делать такие функции. любой уважающий себя кондвар, во всех приличных реализациях должен иметь wait с таймаутом. и использовать ЕГО. а не простой wait, где можно встать в ожидание навсегда.

alysnix ★★★
()

для потоков создан буфер заранее выделенных кадров

Если емкость очереди заранее ограничена не очень большим числом, то почему не использовать кольцевой буфер? Один раз выделяешь массив фиксированного размера, указатель на голову и хвост и понеслась.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

потому что мне не нужны все данные, только последний кадр в момент обращения к очереди, все что осталось в очереди просто перезаписывает поток читающий с камеры.

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

вроде ничо, очередь начинает ехать потихоньку, пока в 2 потока чтения с камеры и записи на носитель, единственно бывает kworker сума сходит и грузит одно ядро на 100%

wolverin ★★★
() автор топика