LINUX.ORG.RU

Ожидание любого завершившегося потока

 , ,


0

1

Здравствуйте! В ходе реализации программы (Алгоритм программы состоит в том, чтобы количество созданных потоков не превышало заданного количества) возникла необходимость в функции, которая приостанавливала бы «родительский» поток, пока не завершится любой из созданный «дочерних». (что-то наподобие wait() в процессах). Смотрел в сторону pthread_join, но она ожидает завершения потока с определенным tid, что не подходит. Может кто-нибудь сталкивался с подобного рода ситуацией и может подсказать что-нибудь по этому поводу?

привожу кусок кода:

if (thread_str_data.pN == config.Nprocess)
{
          printf("Ждем\n");
          
}
else
{
          ++thread_str_data.pN;
         /* данных для потока*/
         data.pathfile1 = top -> path;
         data.pathfile2 = top1 -> path;
         data.size = top -> size;
         /*запускаем поток*/
         pthread_create(&tid, NULL, &thread_func, &data);
}


Для этого можно использовать семафоры. Семафор создаётся в родительском потоке со значением, равным количеству потоков, которые должны быть запущены.

При каждом создании потока значение семафора уменьшается на 1, и когда достигнет 0, родительский поток заблокируется на нём до тех пор, пока значение семафора не увеличится.

В свою очередь дочерний поток, перед завершением, увеличивает значение семафора на единицу, чтобы дать возомжность создать другой поток. В этот момент заблокированный родительский поток просыпается и создаёт ещё один поток.

roy ★★★★★ ()

enjoy your absence of WaitForMultipleObjects

anonymous ()

Погуглил на темы «thread pool implementation c», «posix threads monitor», в обоих случаях наткнулся на использование condition, который посоветовал Reset (pthread_cond_wait и т.д.). :)

zhuravlik ★★★★ ()
Ответ на: комментарий от zhuravlik
roy, спасибо, сейчас пробую реализовать ваш вариант.


Спасибо всем за ответы.
Да, пробовал реализовать через сигнальную переменную, но главный поток зависает после того, как потоковая функция обработала половину моих данных. Ниже привожу участки кода, где реализован мютекс и сигнальная переменная. Буду признателен, если кто-нибудь покажет, что я делаю не так... 


struct strm
{
	int 		pN;
	pthread_cond_t   pcond;
	pthread_mutex_t 	pmutex;

} thread_str_data;
.....

void *thread_func(void *arguments)

{

	struct parametrs *sdata = (struct parametrs *) arguments;

	//tid = pthread_self();



	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

	

	pthread_cleanup_push(&cond_signal, NULL);

	usleep(10000);

	pthread_cleanup_pop(1);

	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

	return NULL;

}



static void cond_signal(void *data)

{

	pthread_mutex_lock (&thread_str_data.pmutex);

	--thread_str_data.pN;

	pthread_cond_signal(&thread_str_data.pcond);

	pthread_mutex_unlock(&thread_str_data.pmutex);

}
.....

pthread_mutex_init(&thread_str_data.pmutex, NULL);
pthread_cond_init(&thread_str_data.pcond, NULL);
....
if ( ((top -> size) == (top1 -> size)) && (top -> size != 0) && (top1 -> size != 0))
{
   pthread_mutex_lock(&thread_str_data.pmutex);
   if (thread_str_data.pN == config.Nprocess)
   {
       printf(" %d Ждем\n", thread_str_data.pN);
       pthread_cond_wait(&thread_str_data.pcond, &thread_str_data.pmutex);
   }
   else
   {
       ++thread_str_data.pN;
       data.pathfile1 = top -> path;
       data.pathfile2 = top1 -> path;
       data.size = top -> size;
       pthread_create(&tid, NULL, &thread_func, &data);
   }
   pthread_mutex_unlock(&thread_str_data.pmutex);

...
}
xin ()
Ответ на: комментарий от Reset

Чем это семафоры сложнее счётчика, который ещё надо синхронизировать, и condition-а? С семафорами нам надо выполнять только две операции - увеличивать и уменьшать, со счётчиком и condition-ом - увеличивать и уменьшать счётчик, дёргать condition и ещё проверять значение счётчика.

К тому же, описанная задача есть по сути прямое назначение семафора.

roy ★★★★★ ()
Ответ на: комментарий от roy

Нет, если бы речь шла не о потоках, а о потомках (отфоркнутых процессах), то так оно и было бы, да и то даже в этом случае можно решить задачу без семафоров. Самафор (posix) это _глобальный_ _системный_ объект синхронизации _процессов_. При использовании в рамках _одного_ процесса будут слишком большие накладные расходы.

Reset ★★★★★ ()
Ответ на: комментарий от Reset

Ну про эффективность я ничего не говорил, ты же вроде сложность упоминал. Возможно что семафор дороже, но это же всё от задачи зависит.

roy ★★★★★ ()
Ответ на: комментарий от Reset

Ничего, что у _POSIX_ семафоров есть локальный для процесса режим?

А вот и пруфы:

#include <stdio.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>

sem_t semaphore;
pthread_t thread;

void *thread_func(void *p) {

	while(1) {
		sem_wait(&semaphore);
		printf("signal\n");
	}
}

int main() {
	int i;
	sem_init(&semaphore, 0, 0);

	pthread_create(&thread, NULL, thread_func, NULL);

	for (i = 0; i < 10; i++) {
		printf("post\n");
		sem_post(&semaphore);
		sleep(2);
	}

	return 0;
}
...
nanosleep({2, 0}, 0xbeeba288)           = 0
write(1, "post\n", 5post
)                   = 5
futex(0x11040, FUTEX_WAKE_PRIVATE, 1signal
)   = 1
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
nanosleep({2, 0}, ^C <unfinished ...>
...

futex() - ЧТД.

AptGet ★★★ ()

возникла необходимость в функции, которая приостанавливала бы «родительский» поток, пока не завершится любой из созданный «дочерних». (что-то наподобие wait() в процессах). Смотрел в сторону...

а кроме как смотреть, пробовал ? конечно архитектуро-зависимо, но wait должен срабатывать.

MKuznetsov ★★★★★ ()
Ответ на: комментарий от roy

Здравствуйте еще раз! Вообщем реализовал posix-семафор.

/*создаем семафор*/
	if ( (sem = sem_open(SEMAPHORE_NAME, O_CREAT, 0777, 0)) == SEM_FAILED )
	{
		perror("sem_open");
		return -1;
	}

	sem_init(sem, 0, 5);

	while (top != NULL)
	{
		top1 = top1buf;
		while (top1 != NULL)
		{
			if ( ((top -> size) == (top1 -> size)) && (top -> size != 0) && (top1 -> size != 0))
			{
				sem_wait(sem);
				data.pathfile1 = top -> path;
				data.pathfile2 = top1 -> path;
				data.size = top -> size;
				pthread_create(&tid, NULL, &thread_func, &data);
			}
			top1 = top1 -> next;
		}
		top = top -> next;
	}

	void *thread_func(void *arguments)
	{
		struct parametrs *sdata = (struct parametrs *) arguments;
	
		usleep(10000);
		printf("%s\n", sdata->pathfile1);
		sem_post(sem);
		return NULL;
	}

Как видно, потоки создаются в цикле, входными данными являются имена файлов. При небольшом объеме входных данных(80-90 итераций в цикле) семафор справляется на ура, однако при большем объеме семафор блокирует главный поток и программа не завершается. Знаю, это очень глупо, но ошибку в упор не вижу. Может я все же неправильно задам семафор?

xin ()
Ответ на: комментарий от xin

А что в это время с потоками происходит? Они работают или все завершаются и главный поток висит на семафоре? Если так - то надо смотреть, какое значение семафора. Возможно, что потоки завершаются, не увеличив семафор.

data.pathfile1 = top -> path;
data.pathfile2 = top1 -> path;
data.size = top -> size;

Где эта data лежит? В стеке и перезаписывается в каждом цикле?

roy ★★★★★ ()
Ответ на: комментарий от roy

Все потоки завершаются, висит только главный. Data лежит не в стеке, но перезаписывается с каждым циклом. Перед sem_wait поместил sem_getvalue(sem, &sem_value); Получается , что перед sem_wait значение семафора 0.

Возможно, что потоки завершаются, не увеличив семафор.

Такое возможно?

xin ()
Ответ на: комментарий от xin

Data лежит не в стеке, но перезаписывается с каждым циклом.

Не думаешь, что здесь может быть проблема? Получается ты положил значение в data, и передал её новому потоку. Затем поменял значение data, и отдал следующему потоку. И при этом не факт, что первый поток успел закончить работу с ней. Получается, что во время работы потока данные, которые он использует, меняются без синхронизации. Если я правильно понял, то это не есть гуд.

Перед sem_post(sem); сделай вывод дебага. Возможно, не доходит до этого оператора выполнение. Почему он может упасть и что при этом будет отображено не скажу - давно уже на С не писал.

roy ★★★★★ ()
Ответ на: комментарий от deterok

Поток 1: 1. Проверяешь условие, например, что счётчик имеет такое-то значение. 2. Если условие не проходит, вешаешься на condition и ждёшь сигнала.

Поток 2: 1. После изменения состояние, например, изменение значение счётчика, дёргаешь condition.

Дёргание condition-а приводит к тому, что потоки, ждущие на нём, просыпаются, и должны проверить условие, в ЦИКЛЕ. Если условие стало истинным - работаем дальше, если нет - снова вешаемся на condition.

roy ★★★★★ ()

До join() в C++ ещё не доросли разве?

iZEN ★★★★★ ()
Ответ на: комментарий от roy

Спасибо вам за хорошие советы.

Профиксил зависание потока на семафоре. Проблема была в том, что я нигде не завершал потоки. В результате этого, после N - ого потока их создание прекращалось и главный поток «зависал» на семафоре. Решил эту проблему путем создания отсоединенных потоков, которые не требуют вызова pthread_join для их нормального завершения.

pthread_attr_init (&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

Получается, что во время работы потока данные, которые он использует, меняются без синхронизации. Если я правильно понял, то это не есть гуд.

Да.Получается, что несколько потоков успевают захватить одни и те же данные. Проблему решил довольно топорным путем - задержка (usleep).

Может у вас есть мысли по поводу того, как можно сделать синхронизацию потоков для этих данных?

xin ()
Ответ на: комментарий от roy

Поток 1: 1. Проверяешь условие, например, что счётчик имеет такое-то значение. >2. Если условие не проходит, вешаешься на condition и ждёшь сигнала.

Поток 2: 1. После изменения состояние, например, изменение значение счётчика, >дёргаешь condition.

Дёргание condition-а приводит к тому, что потоки, ждущие на нём, просыпаются, и >должны проверить условие, в ЦИКЛЕ. Если условие стало истинным - работаем >дальше, если нет - снова вешаемся на condition.

Да, такое тоже реализовал:

struct strm
{
	int 		pN;
	pthread_cond_t   pcond;
	pthread_mutex_t 	pmutex;

} thread_str_data;
.....

void *thread_func(void *arguments)

{

	struct parametrs *sdata = (struct parametrs *) arguments;

	//tid = pthread_self();



	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

	

	pthread_cleanup_push(&cond_signal, NULL);

	usleep(10000);

	pthread_cleanup_pop(1);

	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

	return NULL;

}



static void cond_signal(void *data)

{

	pthread_mutex_lock (&thread_str_data.pmutex);

	--thread_str_data.pN;

	pthread_cond_signal(&thread_str_data.pcond);

	pthread_mutex_unlock(&thread_str_data.pmutex);

}
.....

pthread_mutex_init(&thread_str_data.pmutex, NULL);
pthread_cond_init(&thread_str_data.pcond, NULL);
....
if ( ((top -> size) == (top1 -> size)) && (top -> size != 0) && (top1 -> size != 0))
{
   pthread_mutex_lock(&thread_str_data.pmutex);
   if (thread_str_data.pN == config.Nprocess)
   {
       printf(" %d Ждем\n", thread_str_data.pN);
       pthread_cond_wait(&thread_str_data.pcond, &thread_str_data.pmutex);
   }
   else
   {
       ++thread_str_data.pN;
       data.pathfile1 = top -> path;
       data.pathfile2 = top1 -> path;
       data.size = top -> size;
       pthread_create(&tid, NULL, &thread_func, &data);
   }
   pthread_mutex_unlock(&thread_str_data.pmutex);

...
}

xin ()
Ответ на: комментарий от xin

Самый простой и правильный вариант - отдавать каждому потоку новые данные. Выделяешь динамическую память (malloc), записываешь туда данные, отдаёшь потоку. Для следующих данных так же выделяешь память, записываешь данные, отдаёшь. Главное эту память потом освобождать. В твоём случае освобождать её лучше в потоке, которому их передал.

Другой вариант использовать блокировки, например мьютекс, но я не вижу смысла, зачем.

roy ★★★★★ ()
Ответ на: комментарий от roy

Сделал! Все работает как часы)

Благодарю всех, особенно Вас (roy), за дельные советы и помощь.

xin ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.