LINUX.ORG.RU

архитектура consumer/producer threads


0

1

Здравствуйте!

Не могу найти оптимальное решение для моей проблемы, поэтому обращаюсь за советом как грамотно построить архитектуру приложения.

В общем ситуация такая: producer thread производит данные (читает данные с сетевухи. после того, как пакеты соберутся выходная информация представляет собой uchar массив размером 3000 байт), consumer thread эти данные обрабатывает. Требуется эти два потока как-то состыковать, чтобы producer не терял пакеты и consumer своевременно обрабатывал.

Я пытался складывать данные в циркулярный буфер на семафорах (см. пример Qt) и потом в потоке обработке считывать, но это решение грузит процессор , относительно конечно, но для моей задачи процент загрузки не приемлим. Можно ли слать данные сигналом и как в таком случае быть уверенным, что все данные автоматически встанут в очередь и не потеряются?

Как грамотно решаются подобные задачи? Заранее спасибо!.

ps: время обработки данных является не постоянной величиной

это решение грузит процессор , относительно конечно, но для моей задачи процент загрузки не приемлим

Как ты пришел к выводу, что именно «это решение» (т.е. очередь) грузит процессор?

tailgunner ★★★★★ ()

Не понял проблему - кто грузит процессор?

Я бы взял boost::asio. Получил данные, сделал

io_service.post(boost::bind(&work_routine, data_buf));
и пошел вычитывать следующее сообщение.

p.s. Я правильно понял, что разговор о C++?

p.p.s data_buf - boost::shared_array<unsigned char> например.

ratatosk ()
Последнее исправление: ratatosk (всего исправлений: 1)

1 тред - producer.

несколько тредов - consumer'ов.

общаются чз lockless ring (1 ring каждому consumer'у - получается относительно ринга - 1 producer/1 consumer)

без сигнализации. consumer крутит busy loop (с разными эвристиками. типа 10 проверок без данных - поспать 1ms, и тд)

anonymous ()

Что-то у меня не хватает фантазии найти где тут проблема и нагрузка на процессор. Пишешь в хвост буфера, читаешь с головы до хвоста, любое чтение и запись указателей хвоста и головы
через pthread_mutex_lock и pthread_mutex_unlock.

ilovewindows ★★★★★ ()

Вот держи, быстро накидал, думаю мысль понятна?

class Test {
    
private:
    std::vector<int> container;
    std::condition_variable m_cvariable;
    std::mutex m_mutex;
    
    void boss()
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        std::cout << "boss: thread_id: " << std::this_thread::get_id() << std::endl;
        container.push_back(rand()%INT32_MAX);
        m_cvariable.notify_one();
    }
    void worker()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        std::cout << "worker: thread_id: " << std::this_thread::get_id() << std::endl;
        m_cvariable.wait(lock, [&]{return container.size()>25;});
        container.clear();
        lock.unlock();
    }
    
public:
    
    void f1()
    {
        while (true) {
            boss();
        }
    }

    void f2()
    {
        for (;;) {
            worker();
        }
    }
    
};


int main(int argc, const char * argv[])
{
    Test m;
    
    std::thread t1(&Test::f2, &m);
    std::thread t2(&Test::f1, &m);
    std::thread t3(&Test::f1, &m);
     
    t1.join();
    t2.join();
    t3.join();
            
    return 0;
}

Boy_from_Jungle ★★★★ ()
Ответ на: комментарий от tailgunner

Не,ну это уж совсем гадости про товарища думать, крутится в бесконечном цикле ожидаючи 3000 байтов, вообще неизвестно ли придут, сетка все-таки.

ilovewindows ★★★★★ ()

В случае двух потоков для межпоточной передачи данных подойдет любая реализация потокобезопасной очереди сообщений (1). Обычная реализация такой очереди содержит условную переменную с мьютексом и предоставляет пользователю метод добавления сообщения в начало очереди и метод получения сообщенния из ее конца с ожиданием по таймауту.

Однако в связи с тем, что Consumer теоретически может «не справиться» с потоком сообщений от Producer-а, а также при наличии требования по отсутствию потерь данных, во избежание переполнения потокобезопасной очереди сообщений тут лучше подойдет шаблон проектирования Active Object (2, 3), когда Producer выдает поток задач на выполнение не одному, а целому пулу предсозданных потоков (Workers). В этом случае вероятность потерь сообщений можно существенно понизить.

illy ()

Ребята, всем огромное спасибо! Я во всем разобрался.

Со мной случился синдром ночного программирования, сжатые сроки и объем проекта сделали свое дело. Оказалось вообще в стороннем модуле программы, который я отключил на время отладки, одна из функция висела в busy wait - захватывала и освобождала мьютекс, и когда настала пора собирать проект в кучу, то про нее я напрочь забыл. Она то и грузила процессор.

А так решение сделать круговой буфер на семафорах оказалось вполне приемлимым.

svetocopy ()
Ответ на: комментарий от svetocopy

А чем хорош круговой буфер на семафорах? Круговой буфер сам по себе хорош тем, что его несложно сделать lockless, но если на семафорах, то это преимущество теряется. Зато он имеет ограниченный размер - нужно обрабатывать его переполнение.

ratatosk ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.