LINUX.ORG.RU

Завершение многопоточного приложения

 , ,


0

4

Всем привет. Есть некоторое приложение (еще недопиленное), порождается некоторое кол-во потоков. Некоторые потоки идут на блокирующие операции (чтение с консоли, сокеты). В связи с этим вопрос возникает - а как это хозяйство завершить?

  1. Делать для всех потоков join() - потоки заблокированы и хз когда освободятся, отметаем.
  2. Сгодится что-то вроде terminate(), но без всякого - «аварийный останов, корки скинуты на диск». exit() - начинает разрушать софтину деликатно - с разрушением ГВ, а потоки-то живут, в общем опять может случиться какой-нибудь «сег фолт».
  3. Сделать потоком detach(), опять же - когда софтина начнет завершаться, потоки живут, и потом - должны ведь и библиотеки отваливаться начать. Я вообще никогда не видел описания процесса отстегивания либ при завершении - сначала перестает выделяться процессорное время всем тредам, а потом отстегивается, или как-то иначе.
  4. Сделать все глобальные объекты неудаляемыми, т.е. вначале main() делать new, и пусть живут «вечно». Но опять же вопрос с моментом отстегивания либ - до или после момента, когда потокам перестает выделяться процессорное время. Это если либы so’шки, а если статические, то вообще без вариантов - никакой надежды на «либы переживут треды» нет.

Понятно, что как-то в любом случае завершится, но хочется без всяких жалоб в консоль на мою криворукость.
Минимальный пример смоделированный на коленках (понятно, что момент выгрузки либ он не затрагивает), который падаете с сег фолтом:

#include <thread>
#include <chrono>
using namespace std;

struct Q {
	int *i;
	Q() {i = new int;}
	~Q() {delete i; i = 0; this_thread::sleep_for(5s);}
};
int *get() {
	static Q q;
	return q.i;
}
void f() {
	this_thread::sleep_for(2s);
	*get() = 5;
}
int main() {
	get();
	thread t(f);
	t.detach();
}
★★

В идеале - нужна функция kill_totally_and_not_complain(), которая убьет процесс, и мне вообще неважно что насколько корректно это произойдет. Пусть сначала остановятся все потоки, а потом пусть завершается как ядро хочет и не говорит, что я говнокодер.

pavlick ★★
() автор топика

Вариант 1.

потоки заблокированы и хз когда освободятся

You are doing it wrong. Нужно сообщить всем потокам чтобы они завершились самостоятельно. Прибивать потоки небезопасно и это ломает логику программы.

X512 ★★★★★
()
Ответ на: комментарий от pavlick

и не говорит, что я говнокодер

Как же не говорить если это правда? Изучайте condition variable. Они поддерживают в том числе и таймер. sleep - признак говнокода.

X512 ★★★★★
()
Последнее исправление: X512 (всего исправлений: 1)
Ответ на: комментарий от X512

Да я так и хотел, сделал пул потоков

class Thread_pool
{
...
public:
	~Thread_pool();
	std::stop_token get_token();
	void request_stop();
	// Signature of fn: fn(Run_tag, ...)
	template<typename Fn, typename ...Args>
		void run(Fn &&fn, Args &&...args);
};

любой поток может запросить остановку через request_stop(), каждый поток понимает это через get_token(). Но потоки заблокированы на сокетах, …, они просто не могут узнать состояние стоп_токена. Делать операции неблокирующими? Неэффективно.

pavlick ★★
() автор топика

Делать для всех потоков join() - потоки заблокированы и хз когда освободятся, отметаем.

а ты не блокируй.

я бы делал join()…

@X512

Вариант 1.

Согласен.

fsb4000 ★★★★★
()
Последнее исправление: fsb4000 (всего исправлений: 1)
Ответ на: комментарий от pavlick

Делать операции неблокирующими?

Да.

Неэффективно.

Ещё как эффективно. Высоконагруженные серверы именно так и работают.

X512 ★★★★★
()
Ответ на: комментарий от pavlick

Ну и Thread_pool автоматом делает join() при своем разрушении. Все красиво, НО блокирующие операции. Читать из сокет без блокировки? Это слишком много процессорного времени отнимет, получать данные нужно оперативно, таймер в 100 милисек не сделаешь.

pavlick ★★
() автор топика

Ребят, если кто подскажет как попросить ядро перестать выделять потокам процессорное время (кроме главного, естественно), то это будет охренительно.

pavlick ★★
() автор топика

Как тебе и сказали выше, стоит выбрать первый вариант, используя неблокирующие операции на сокетах. Либо еще вариант, если приемлемо использовать всякие select/poll/epoll/etc - создать pipe и заставить select/poll/epoll мониторить еще и его помимо твоих сокетов и консолей, обрабатывая случай, когда ввод случился в этот пайп. Хочешь заставить поток завершаться - запиши в pipe байт. Ну, все это применимо к соектам POSIX. Если у тебя что-то отличное от них, то подход может быть другой.

nvevg
()

Разобрался, можно послать процессу сингал SIGALARM все потоки в блокирующих операциях пробудятся. Просто для теста набросал, также будит и поток, который ожидает ввода с консоли.

#include <thread>
#include <signal.h>
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <endian.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <chrono>
using namespace std;

const char *iface_ip = "127.0.0.1";
unsigned short port = 10500;


void f() {
	int tcp_listener = socket(AF_INET, SOCK_STREAM, 0);
	if(tcp_listener == -1)
		return;

	in_addr fip{};
	if( inet_aton(iface_ip, &fip) != 1)
		return;
	sockaddr_in my_addr{};
	my_addr.sin_family = AF_INET;
	my_addr.sin_port = htobe16(port);
	my_addr.sin_addr = fip;
	if( bind(tcp_listener, (sockaddr*)(&my_addr), sizeof(my_addr) ) != 0)
		return;

	if( listen(tcp_listener, 1) != 0)
		return;

	int tcp_socket = accept(tcp_listener, NULL, NULL);
	if(tcp_socket == -1)
		return;

}
int main() {
	thread t(f);
	this_thread::sleep_for(2s);
	kill(getpid(), SIGALRM);
	t.join();
}

По поводу sleep_for() - естественно, что в боевом варианте не применяю (не впадаю в join() псле 2 сек), потоки завершаются и декрементируют счетчик, когда он равен нулю - все потоки завершились

class Run_tag  // передается аргументом в каждый созданный поток через Thread pool
{
	friend class Thread_pool;
	std::atomic_uint &m_counter;
	Run_tag &operator=(const Run_tag &) = delete;
	Run_tag(std::atomic_uint &counter): m_counter(counter)
	{
		this->m_counter.fetch_add(1, std::memory_order_relaxed);
	}
public:
	~Run_tag() {this->m_counter.fetch_sub(1, std::memory_order_release);}
	Run_tag(const Run_tag &other_tag): m_counter(other_tag.m_counter)
	{
		this->m_counter.fetch_add(1, std::memory_order_relaxed);
	}
};

inline Thread_pool::~Thread_pool()
{
	this->request_stop();
	while (this->m_counter.load(std::memory_order_acquire) != 0) {
                // здесь можно слать сигнал потокам хоть до посинения
		using namespace std;
		std::this_thread::sleep_for(50ms);
	}
	for (auto &t : this->m_th)
		t.join();
}

ЗЫ: спасибо.

pavlick ★★
() автор топика
Последнее исправление: pavlick (всего исправлений: 2)
Ответ на: комментарий от pavlick

Обычно не блокирующие операции считаются дюже быстрыми. Можешь пояснить в каком смысле «Неэффективно»?

zerhud
()
Ответ на: комментарий от zerhud

Если делать неблокирующие, то мне надо опрашивать сокеты и т.п. в цикле без таймера (нагрузка на цпу огромная), со sleep’ом() (оперативность уведомления о поступления данных страдает). А блокирующие операции - ядро ведь создаст внутри что-то вроде condition_variable для ождающего потока и разбудит оперативно. Тут epoll советовали - так это та же блокировка ведь. Ошибаюсь?

pavlick ★★
() автор топика
Последнее исправление: pavlick (всего исправлений: 1)
Ответ на: комментарий от pavlick

Тут epoll советовали - так это та же блокировка ведь.

Не совсем, обычно это относится к асинхронному вводу-выводу. Оно позволяет ожидать несколько файлов и событий ввода-вывода и разблокироваться при возникновении любого из событий. Вместе с этим можно ожидать событие завершения потока.

X512 ★★★★★
()

Вариант 1: каждому потоку послать сигнал kill(tid, SIGUSR2), и в обработчике сигнала может быть можно позвать pthread_exit.

2: каждому потоку сделать pthread_cancel. Смотри man pthread_cancel.

без всяких жалоб в консоль на мою криворукость

3: делаешь wrapper script для своей программы, подавляющий stderr. В самой программе, просто делаешь abort().

rupert ★★★★★
()
Ответ на: комментарий от rupert

Вариант 1: каждому потоку послать сигнал kill(tid, SIGUSR2), и в обработчике сигнала может быть можно позвать pthread_exit.

Что будет с деструкторами объектов на стеке?

2: каждому потоку сделать pthread_cancel. Смотри man pthread_cancel.

https://sourceware.org/bugzilla/show_bug.cgi?id=12683

X512 ★★★★★
()
Последнее исправление: X512 (всего исправлений: 1)

если есть возможность использовать boost, то не надо ничего придумывать, там уже реализовано все что нужно. Доки, см. функцию interrupt()

Lrrr ★★★★★
()
Ответ на: комментарий от X512

Что будет с деструкторами объектов на стеке?

Ничего хорошего. Но главная цель достигнута: никто не узнает, что у автора руки из жопы растут.

rupert ★★★★★
()
Ответ на: комментарий от X512

Что будет с деструкторами объектов на стеке?

Дело в том, что меня это не сильно то и волнует. Образ процесса сохраняется с помощью CRIU, после сего действа хоть потоп. Вообще нужна в стд функция аналогичная abort(), как-бы штатная аварийная остановка, но без дампа и гневных ругательства в stderr.

pavlick ★★
() автор топика
Ответ на: комментарий от pavlick

Кстати, посыл SIGALARM также будит потоки, которые ожидают condition_variable (та форма wait(), что без stop_token вызывается). Не знал до сегодня, что так можно через сигнала делать.

pavlick ★★
() автор топика
Ответ на: комментарий от pavlick

Никто ничего не будит, процесс просто ложится без выполнения каких-либо деструкторов статиков. Лучше даже слать SIGINT (такой же посылается при ctrl+c). В принципе, мне это подходит.

pavlick ★★
() автор топика
Ответ на: комментарий от pavlick

Прочитал вопрос, сразу подумал: щас точно какой-то идиот предложит вместо нормального решения (асинхронных операций или неблокирующих) влепить сигналы и сделать изначально нормальный кроссплатформенный код прибитым гвоздями к *nix.

Разобрался, можно послать процессу сингал SIGALARM…

И угадайте кто им оказался?

Pavval ★★★★★
()
Ответ на: комментарий от X512

Спасибо. Я в самом начале хотел делать через асио, потом подумал - а нафиг мне эта лишняя абстракция, теперь, наверное, заюзаю.

pavlick ★★
() автор топика
Ответ на: комментарий от pavlick

Образ процесса сохраняется с помощью CRIU

Вообще нужна в стд функция аналогичная abort()

В Linux есть системный вызов _exit.

anonymous
()
Ответ на: комментарий от pavlick

нет, в данном случае epoll будет быстрее, как я понимаю. unlike the older system calls, which operate in O(n) time, epoll operates in O(1) time. то есть только для одного объекта особой разницы нет, а для многих epoll выигрывает. также вот тут например можно посмотреть таблицу performance: select & poll vs epoll

ну а чтобы завершится, можно просто дергать ее в цикле, чтобы она возращала управление, скажем, каждые Х миллесекунд. после этого можно как в бусте, сделать функцию, которая кидает исключение, если установлен флаг соответствующий. тогда сработает стандартный механизм, все деструкторы выполнятся. где-нибудь наверху ловишь это исключение и спокойно завершаешься.

а вобще read скажет EINTR на сигнал, так что можно и этим воспользоватся.

ну и вот что я нагуглил

zerhud
()

exit() - начинает разрушать софтину деликатно - с разрушением ГВ, а потоки-то живут, в общем опять может случиться какой-нибудь «сег фолт».

Что такое «ГВ»? В мане exit(3) ничего ни про какой «ГВ» не сказано.

Harald ★★★★★
()
Ответ на: комментарий от X512

Смотря что потоки делают если что-то сиччтают без записи куда либо то можно смело прибивать и нечего их ждать, если они как то с внешними ресурсами файлами/прочем контактируют то либо дожидаться нормально либо думать над архитектурой и изолировать потоки от взаимодействия с ресурсами что-бы их можно было смело прибивать без разговоров.

anonymous
()
Ответ на: комментарий от Harald

А оно само грамотно всегда потушит потоки? Или как? вот не понимаю что оно делает конкретно.

anonymous
()
Ответ на: комментарий от anonymous

Если ядро ОС не прибивает болтающиеся потоки, то очевидно это плохое негодное и неправильное ядро. Ядро линукса не такое, поэтому очевидно оно всё разруливает как надо

Harald ★★★★★
()

void threadPrSignal(int sig)
{
    if(sig == SIGUSR1)
    {
        pthread_exit(0);
    }
}

void threadCleanup()
{
    \\
}

void* threadFunction(void* )
{
    signal(SIGUSR1,threadPrSignal);
    pthread_cleanup_push(threadCleanup,NULL);

    while(1)
    {
       pause();
       pthread_testcancel();
    }

    pthread_cleanup_pop(1);
    return NULL;
}

void startThread()
{
    sigset_t set;
    sigemptyset(&set);
    sigaddset(&set, SIGUSR1);
    pthread_sigmask(SIG_BLOCK,&set,NULL);
    pthread_attr_init(&threadAttr);
    pthread_create(&thread,&threadAttr,threadFunction,NULL);
}

void stopThread()
{
   pthread_cancel(thread);
   timespec ts;
   ts.tv_sec = 2;
   ts.tv_nsec = 0;
   int res = pthread_timedjoin_np(thread,NULL,&ts);
   if(res != 0)
   {
      pthread_kill(thread,SIGUSR1);
   }
}

anonymous
()
Ответ на: комментарий от Harald

никакие глобальные переменные exit не разрушает

Ещё как разрушает.

#include <stdlib.h>
#include <stdio.h>

class Object
{
public:
    Object() {printf("+Object\n");}
    ~Object() {printf("-Object\n");}
    void Do() {
        printf("+Object::Do()\n");
        exit(0);
        printf("-Object::Do()\n");
    }
};

Object obj;

int main()
{
  obj.Do();
  return 0;
}

+Object
+Object::Do()
-Object

exit сразу весь процесс разрушает

Вы перепутали с _exit.

X512 ★★★★★
()
Последнее исправление: X512 (всего исправлений: 1)
Ответ на: комментарий от Harald

exit() вызывает финализаторы в таблице dynamic ELF модуля в которых вызываются деструкторы.

X512 ★★★★★
()

Делать для всех потоков join() - потоки заблокированы и хз когда освободятся, отметаем.

Не отметаем, а только так и делаем. Потоки разблокировать можно сигналами или использовать блокирующие вызовы с таймаутами.

slovazap ★★★★★
()
Ответ на: комментарий от Harald

Под линуксом, если убить главный поток с помощью pthread_exit(), а не вызывать exit() или давать вывалиться из main(), то остальные потоки продолжат выполнение. Получится эдакий живой зомби.

i-rinat ★★★★★
()
Ответ на: комментарий от pavlick

Если делать неблокирующие, то мне надо опрашивать сокеты и т.п. в цикле без таймера (нагрузка на цпу огромная)

Чтобы сэмулировать блокирующие операции на неблокирующих сокетах, нужно не в цикле операцию вызывать, а использовать poll(). Каждому потоку выделяешь либо одну часть трубы (pipe), либо один сокет из пары сокетов. По этому каналу передаётся информация о завершении. Когда нужно выполнить операцию над неблокирующим дескриптором, готовишь массив из двух struct pollfd — собственно целевой дескриптор и дескриптор канала передачи информации о завершении. Потом зовёшь poll(), передав ему этот массив. Вызов poll() завершится либо по сигналу, либо по событию на любом из этой пары дескрипторов. Чтобы послать сообщение о завершении, достаточно в главной нити в цикле послать любой байт по всем сигнальным парам.

Вариант с kill() самого себя ненадёжен, потому что ты не можешь гарантировать, что все нити прервут выполнение системных вызовов. Где-то в мануале вроде даже упоминается, что поведение в многопоточной среде не определено.

i-rinat ★★★★★
()
Ответ на: комментарий от pavlick

Если делать неблокирующие, то мне надо опрашивать сокеты и т.п. в цикле без таймера (нагрузка на цпу огромная), со sleep’ом() (оперативность уведомления о поступления данных страдает)

посмотри в сторону io_uring. Там можно не только делать неблокирующие сисколлы, но и пытаться отменять их (в мане явно написано, что для ввода-вывода в сокеты это сработает), или например выставлять таймаут - таймаут сработает, если в заданном ринге не было выполнено заданное число сисколлов за заданное время.

Единственная проблема - этим api поддерживаются далеко не все сисколлы, ну и версия ядра должна быть достаточно свежей.

Lrrr ★★★★★
()
Ответ на: комментарий от Lrrr

посмотри в сторону io_uring. Там можно не только делать неблокирующие сисколлы, но и пытаться отменять их (в мане явно написано, что для ввода-вывода в сокеты это сработает), или например выставлять таймаут - таймаут сработает, если в заданном ринге не было выполнено заданное число сисколлов за заданное время

О, всего-лишь 25 лет нужно было для того, чтобы фичу из винды наконец реализовали в лине.

byko3y ★★★★
()
Последнее исправление: byko3y (всего исправлений: 1)
Ответ на: комментарий от pavlick

Можно отправить sigint, блокирующие операции вернут ошибку eintr. А дальше им надо проверить флаг и вместо повтора операции умереть.

KivApple ★★★★★
()
Ответ на: комментарий от KivApple

Я его слал, софтина после этого умирает без всякого продолжения, никакие инструкции после «выхода» из блокирующей операции не выполняются.

ЗЫ: спасисбо всем за накинутые варианты. Я пошел в сторону asio.

pavlick ★★
() автор топика
Ответ на: комментарий от pavlick

И правильно делает: по ctrl-c софт по-умолчанию должен прекращать работу. Но по получению этого сигнала можно и, например, попытаться сначала сохранить самое важное состояние программы перед непосредственным завершением работы. Для этого сигнал нужно ловить в обработчике, который в начале программы надо зарегистрировать. В обработчике устанавливают флаг типа want_exit в TRUE/единичку. И ошибки блокирующих операций проверяют на EINTR, и если при этом want_exit положителен, аккуратно выходят.

gag ★★★★★
()
Последнее исправление: gag (всего исправлений: 1)
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.