LINUX.ORG.RU

100 pthreads, 1 socket


0

1

Есть несколько десятков локальных объектов (тредов), обрабатывающих данные с такого же количества измерителей. Один из тредов поддерживает связь с удаленным сервером для того, чтобы в случае аномальных значений данных отправить сообщение произвольной длины, среди прочего содержащее номер соответсвующего датчика.

Просветите, как эту схему грамотно реализовать на С++ - опыта в подобных задачах нет; в рамках наивного и наскоро слепленного прототипа каждый из объектов пишет сообщение в статический буфер и длину сообщения в статическую же переменную, блокируя их через мютекс. «Серверный» тред регулярно читает переменную длины, дальше понятно, наверное.

Работать оно, вроде работает, но что-то подсказывает мне, что существуют более подходящие технологии для подобных архитектур, а в моём поделии программа загнется под блокировкой и разблокировкой мютекса.


Ну если у тебя серверный поток только и делает, что туда-сюда мьютекс дергает и проверяет переменную в цикле - то да, будет лишняя трата процессорного времени. Используй тогда средства IPC, в которых есть уведомления о приходе данных.

Например, что мешает серверному потоку открыть еще один сокет для приема сообщений от этих 100 потоков? Пусть периодически проверяет сокет (select) и в случае обнаружения там данных уже пересылает их куда надо. Сокеты сделай через UDP - сразу будет принимать от нескольких потоков, не надо тратить время на соединения. Можешь посмотреть на очереди сообщений посиксовые.

MuZHiK-2 ★★★★ ()

А через что осуществляется взаимодействие с измерителями? Тоже через сокеты?

yoghurt ★★★★★ ()
Ответ на: комментарий от MuZHiK-2

что мешает серверному потоку открыть еще один сокет для приема сообщений от этих 100 потоков? Пусть периодически проверяет сокет (select) и в случае обнаружения там данных уже пересылает их куда надо

Я за очередь!

yoghurt ★★★★★ ()

Что из себя представляют измерительные потоки?

baverman ★★★ ()

ябы сделал проще
у каждого из тредов есть номер fd который на запись из pipe
контролирующий тред - читает fd второй из pipe

каждый тред - при изменении данных на датчике или при только аномальном значении - записывает показатель датчика в структуру - и посылает в fd свой номер (тоесть посылает контролиющему свой номер - записывает 1 байт в fd)

контролирующий получает (простым read-ом одного байта) номер сработавшего треда - по номеру читает данные из структуры в памяти и отсылает их

вот такая простейшая схема без мутексов

ae1234 ★★ ()

2 girls, 1 cup.

anonymous ()
Ответ на: комментарий от MuZHiK-2

это потому что ты тупой.

по теме: что мешает через poll или подобное опрашивать измерители и поочерёдно обрабатывать данные, в случае же аномалии асинхронно отправлять сообщение серверу? Никаких тредов, никаких мьютексов.

anonymous ()

в рамках наивного и наскоро слепленного прототипа каждый из объектов пишет сообщение в статический буфер и длину сообщения в статическую же переменную, блокируя их через мютекс.

ППЦ. Меняй схему. Ключевые слова - «событие» и «очередь». Обработчик (обработчики) сенсоров опрашивают сенсоры и сами решают когда случилась проблема. При возникновении проблемы, обработчик сенсора «выстреливает» событие в очередь. На жабе я бы сделал так:

/*
 * Класс читает информацию с сенсора,
 * при возникновении проблемы выстреливает событие
 */
class SensorReader implements Runnable {
    Sensor target;
    public void run() {
        while (!brk) {
            readSensorData(target);
            if (alertCondition(target)) {
                Monitor.pushAlert(target,timestamp);
            }
    }
}

/*
 * Класс ловит события, произошедшие на сенсорах
 */
class Monitor implements Runnable {
    Vector<Sensor> queue = new Vector<Sensor>();
    public static void pushAlert(Sensor source,DateTime whence) {
        synchronized(queue) { queue.add(source); }
    }
    public void run() {
        thisThread = Thread.currentThread();
        while(!brk) {
            SensorReader tgt = null;
            synchronized (queue) {
                if (queue.size()>0) {
                    tgt = queue.elementAt(0);
                    queue.removeElementAt(0);
                }
                if (tgt!=null) {
                    // Послать алерт тут;
                } else {
                    sleep(5); // Засыпаем на 5 миллисекунд если не было событий
                }
            }
        }
    }
}

Будет ли тут по одному SensorReader на каждый датчик, 10 ридеров каждый на 10 датчиков, или один ридер на 100 датчиков - особой разницы уже нет

no-dashi ★★★★★ ()
Ответ на: комментарий от anonymous

что мешает через poll или подобное опрашивать измерители и поочерёдно обрабатывать данные, в случае же аномалии асинхронно отправлять сообщение серверу? Никаких тредов, никаких мьютексов.

«Задрочка под одну задачу» это называется.

Абсолютно негибкое решение, кривое и неуниверсальное, в лучших традициях преждевременной оптимизации. Потом у ТСа встанет проблема усложнить код обработчика - достаточно например просто захотеть писать события в базу, а не только рассылать уведомление по UDP, или просто добавить мало-мальски сложную логику в обработчик алерта - и всё, все гениальные идеи о замечательном AIO начинают хронически сбоить.

Так что реализовывать классическую модель событий, подписчиков и очереди событий, и не сношать более моск.

no-dashi ★★★★★ ()
Ответ на: комментарий от MuZHiK-2

Например, что мешает серверному потоку открыть еще один сокет для приема сообщений от этих 100 потоков?

А потом кто-нибудь изменит настройки iptables или иного файрвола, и все будут безумно счастливы :-)

no-dashi ★★★★★ ()
Ответ на: комментарий от no-dashi

Потом у ТСа встанет проблема усложнить код обработчика - достаточно например просто захотеть писать события в базу, а не только рассылать уведомление по UDP, или просто добавить мало-мальски сложную логику в обработчик алерта - и всё, все гениальные идеи о замечательном AIO начинают хронически сбоить.

Иксперты, такие иксперты.

baverman ★★★ ()

Чего только не выдумают, лишь бы не использовать EPICS.

anonymous ()
Ответ на: комментарий от baverman

Ну, ты можешь делать как захочешь. Правда, после пары-тройки изменений и дополнений в конечном итоге выкинешь всю низкоуровневую лапшу и придешь к той же самой схеме.

no-dashi ★★★★★ ()
Ответ на: комментарий от MuZHiK-2

Дергать сокеты для межпоточной коммуникации это как-то не особо удачно, IMHO - слишком большие накладные расходы. И еще есть минус - через сокеты работа идет как с потоком (байты), а при обычном подходе можно оперировать нормальными структурами.

no-dashi ★★★★★ ()
Ответ на: комментарий от no-dashi

низкоуровневую лапшу

Реактор он и в африке реактор. Чую, ты какой-то эпичный неосилятор.

baverman ★★★ ()
Ответ на: комментарий от no-dashi

Дергать сокеты для межпоточной коммуникации это как-то не особо удачно, IMHO - слишком большие накладные расходы. И еще есть минус - через сокеты работа идет как с потоком (байты), а при обычном подходе можно оперировать нормальными структурами.

Вот видишь, ты противоречишь сам себе :) А если автору понадобится разнести потоки на процессы? А если объем данных передаваемых возрастет? А если надо будет портировать хрен знает куда? Тем более издержки на сокеты не такие большие, да и думается ядро локалхостовые пакеты сразу кидает куда надо, там издержки сократятся.

Я к тому, что универсальных решений не бывает. Равно как и масштабируемых во все стороны. Сперва надо определиться с целями, потом куда возможно расширение, а потом выбирать инструмент. В данном случае ни первого, ни второго озвучено не было, поэтому и было высказано предположение под конкретную задачу, ни больше ни меньше.

MuZHiK-2 ★★★★ ()
Ответ на: комментарий от no-dashi

дергать сокеты равноценно по ресурсам как мутекс
суть в том что и в том и в другом случае приходиться переключаться в ядро
и это и есть основной накладной расход

ae1234 ★★ ()
Ответ на: комментарий от ae1234

Мутекс дешевле. При передаче данных у тебя еще и данные передаются/копируются, их надо сериализовать/десериализовать и так далее.

no-dashi ★★★★★ ()
Ответ на: комментарий от ae1234

pthread_mutex() в наши благословенные времена использует futex, который гораздо дешевле других системных вызовов, т. к., собственно системный вызов делается только в случае, если ко мьютексу одновременно полезли несколько потоков.

Однако, использовать local-сокеты для общения внутри процесса — нормально для не абсолютно критичных по производительности программ, коих большинство. Даже с учетом стоимости системных вызовов. Для оптимизации должны быть показания.

anonymous ()
Ответ на: комментарий от no-dashi

Абсолютно негибкое решение, кривое и неуниверсальное, в лучших традициях преждевременной оптимизации.

Криво и негибко - это работать через sleep, а не по событию. Кстати, приведенное тобой решение - это как раз то, что уже сделал ТС.

P.S. poll - не AIO.

tailgunner ★★★★★ ()
Ответ на: комментарий от tailgunner

Криво и негибко - это работать через sleep, а не по событию

И что? Можно подвесить «вечный sleep» в мониторе и в pushAlert() делать Thread.interrupt(). Суть (модель событие-подписчик-очередь) от этого не изменится.

no-dashi ★★★★★ ()
Ответ на: комментарий от ae1234

а скопировать 1 байтик в моей схеме - это мелоч

У тебя хроническая боязнь мутексов?

Ладно, попробуй ответить на вопрос - а если с датчика надо передать не только сам факт возникновения события, но и дополнительную информацию, размер которой заранее неизвестен - мало ли, вдруг датчик у него это какой-нибудь измеритель состава воздуха, или это какой-нибудь плотномер, который меряет уровень+плотность+температуру?

Я просто модифицирую метод pushAlert, либо меняю его интерфейс чтобы он сразу принимал «экземпляр события», но существенной перемены логики при этом не происходит, а с твоей завязкой на «передачу одного байта» ты получаешь проблему - SensorReader уже не может просто послать событие что «у меня случился алярм», ему еще придется подождать, пока с него не снимут состояние. Либо ты начнешь реализовывать очередь событий. И не приведи господи, организуешь её внутри SensorReader, вот тогда и начнется настоящее веселье. Вот кстати еще один нюанс - а если будет 257 датчиков, что будешь делать тогда, ведь порядок следования байтов становится важен? :-)

P.S.: вообще-то, я писал подобную логику несколько раз (четыре раза как минимум). После энного количества итераций всегда приходил к схеме очереди событий.

P.P.S.: когда я вижу число «100», то автоматически считаю «от 1 до 100000», когда вижу фразу «отправить сообщение» то читаю «а также записать в базу, в логфайл, и еще наверное вызвать программу». Наверное потому, что много работал с постановщиками, которые сами не поняли сути задачи.

no-dashi ★★★★★ ()
Ответ на: комментарий от no-dashi

эка задачка
по порядку

когда сенсор послал сигнал проверящему - сенсор вполне может все также измерять данные свои - и класть в структуру свою - единственно различие - что это нужно делать атомарно

257 так 257 - fd может быть и не один :)

суть в том что метод работы с fd ничем принципиально не отличаеться от мутексов
зато бывает понятнее для обьяснения

ae1234 ★★ ()
Ответ на: комментарий от ae1234

суть в том что метод работы с fd ничем принципиально не отличаеться от мутексов

Отличается. Операция с дескриптором - это обязательно вход в ядро (и выход из него); операции с мютексом на линуксе - не обязательно.

tailgunner ★★★★★ ()
Ответ на: комментарий от tailgunner

наивность ...
в ядро все равно нужно свитчиться - если не вериш - то проверь

равноценны они
просто по одной тупой причине - что взаимодействие между процесами - без участия третье стороны (ядра) иль неизобрели иль ресурсоемки

ae1234 ★★ ()
Ответ на: комментарий от ae1234

наивность ...

Расскажи мне об этом.

в ядро все равно нужно свитчиться - если не вериш - то проверь

Что именно во фразе «не обязательно» ты не понял?

tailgunner ★★★★★ ()
Ответ на: комментарий от anonymous

для схемы той что описал - НИКАКИХ локов и унлоков ненужно
все делает третья сторона - ядро

ae1234 ★★ ()

Спасибо всем. Однозначного ответа я не получил, но кое-что намотал на ус. Теперь присматриваюсь к conditional variables и к очередям. По первому пункту есть вопрос: будет ли оно потреблять меньше ресурсов чем непрерывное mutex_lock-чтение-mutex_unlock? В туториале пишут, что будет, но я не вижу, засчёт чего происходит этот выигрыш. Механизм ведь в принципе тот же, разве нет?

stormy ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.