LINUX.ORG.RU

Удаление и блокировка двунаправленных списков.


0

1

Доброй ночи!
Столкнулся с проблемой реализации работы с двунаправленными списками в ядре linux. А точнее с одновременной работой нескольких потоков ядра с одним списком.

Имеем один список LIST_HEAD(main_head_list);
Он будет содержать список данных структуру:

struct main_struct 
{
  int number_1;
  int number_2;
}

В ядре существует несколько потоков (в моём случае их 16).
Все они обращаются к списку main_head_list, создают, осуществляют поиск а также удаляют записи.
Не знаю, важно ли это, но упомяну. До начала взаимодействия с этим списком потоки, как правило уже удерживают некую спинблокировку.

Вот пример работы одного потока.
1. Выделяет память для структуры main_struct и заносит её в двунаправленный список с помощью list_add.
2. Этот поток производит некие действия, которые для меня являются «чёрным ящиком», т.е. я не знаю как дальше он себя поведёт, будет выгружен, поставлен в очередь на выполнение или что ещё.
3. Тут потоку следует найти в списке main_head_list ту самую структуру которую он занёс в этот список на этапе 1.


Как осуществить корректную работу с этим списком?
При попытках обойтись без синхронизации, с поиском и удалением структуры из списка на этапе 3 при использовании list_for_each_entry происходит зависание системы. (примерно при 80 000 итерации добавления и удаления данных из списка.)

Deleted

Тут только блокировки. Сам подумай: если во время for_each другой тред убирает элемент из списка, то here, there be dragons!

Не берусь судить, но ихмо тут нужен или supervisor этого списка — только один тред, который имеет право писать/читать список или блокировки или другая структура данных (может быть примитивный hashmap).

Т.е. наподобие (на правах бредовой идеи):

struct main_struct (*main_list_map)[NUMBER_OF_THREADS];
и каждый пишет в свой кусок. А кому надо смотрит, что там не NULL.

beastie ★★★★★ ()
Ответ на: комментарий от beastie

С помощью каких блокировок лучше это реализовать?
Что блокировать? сам список или доступ к элементу списка?

Deleted ()
Ответ на: комментарий от Deleted

С помощью каких блокировок лучше это реализовать?

http://stackoverflow.com/questions/5869825/when-should-one-use-a-spinlock-ins...

от себя добавлю - если код может быть вызван из контекста который не может засыпать - обработчики прерываний, softirq (повсеместно в сетевой подсистеме) то без вариантов spinlock.

anonymous ()

Пора прикручивать GC. Методы уже изобретены.

blexey ★★★★ ()
DEFINE_SPINLOCK(main_head_list_spinlock);


/* добавление элемента списка */
int func_add(int num_1, int num_2)
{
  struct main_struct *main_struct_ptr = vmalloc(sizeof(struct main_struct));
  if(main_struct_ptr == NULL) return 0;
  memset(main_struct_ptr, 0, sizeof(struct main_struct));

  /* сохраняем числа структуры */
  main_struct_ptr->number_1 = num_1;
  main_struct_ptr->number_2 = num_2;


  spin_lock(&main_head_list_spinlock);
    list_add(&main_struct_ptr->list, &main_head_list);
  spin_unlock(&main_head_list_spinlock);
}




/* удаление элемента списка */
void func_del(int num_1, int num_2)
{
  struct main_struct *main_struct_ptr = NULL; 
  struct list_head *list_ptr, *tmp;

 spin_lock(&main_head_list_spinlock);
  list_for_each_safe(list_ptr, tmp, &main_head_list)
  {
    main_struct_ptr = list_entry(list_ptr, struct main_struct, list);
    
    if( main_struct_ptr->number_1 == num_1 && main_struct_ptr->number_2 == num_2)
    {
      /* удаляем найденый элемент */
      list_del(&main_struct_ptr->list);
      if(main_struct_ptr) vfree(main_struct_ptr);
    }
  }
 spin_unlock(&main_head_list_spinlock);
return; 
}









/* действия потоков выполнения */
запуск потока.
-----
(тут поток делает всё что угодно)
--------
-----------
int num_1 = любое число;
int num_2 = любое число;



/* заносим в список два числа */
if(func_add(int num_1, int num_2) == 0 ) return ERROR;

--------
(тут снова процесс делает всё что угодно)
----
-------
----------

/* производим удаление элемента из двунаправленного списка */
func_del(num_1, num_2);

При таком подходе наблюдается проблема с синхронизацией. Примерно черерез 10к итераций , происходит зависание.

Deleted ()
Ответ на: комментарий от beastie

там есть list_for_each_safe() и list_for_each_entry_safe() если элементы списка могут удаляться в процессе

geks ()
Ответ на: комментарий от geks

Это немного другое. Safe реализован так, что да, элемет может быть убран и это не поломает loop. Но это ничего не значит, если через список будут «loop-иться» сразу несколько тредов.

beastie ★★★★★ ()
Ответ на: комментарий от beastie

Вся разница (псевдокод):

foreach:

for (p = list_head; p != list_end; p = p->next)
    /* if we remove p here, we brake the loop, since p->next is gone */

foreach_safe:

for (p = list_head; tmp = p->next, p != list_end; p = tmp)
    /* it's safe to remove p now, since we saved p->next for next iteration */

beastie ★★★★★ ()
Ответ на: комментарий от Deleted

Методы те же, что у сборщиков мусора, которому придётся заниматься реальным удалением элементов вместо рабочих тредов.

blexey ★★★★ ()
Ответ на: комментарий от blexey

А теперь уточню. Поменял я valloc на kmalloc с GFP_ATOMIC (и оставил спин блокировки)и теперь система не падает. Работает нормально. Именно нормально, а не как ей следует. Происходит утечка памяти! kfree видимо не освобождает структуру. (Возможно значения num_1 и num_2 в потоках одинаковы.) Думаю изза чего может происходить утечка...

Deleted ()
Ответ на: комментарий от Deleted

Я бы на твоем месте не делал постоянно alloc/free а сразу выделил память под пул и создал в этой памяти список, нужно новое элемент - перемещаешь элемент из списка свободной памяти в свой список, надо освободить - перемещаешь обратно.

http://lxr.free-electrons.com/source/include/linux/list.h#L148

хотя вряд ли это поможет - то что у тебя в ядре 16 потоков создается намекает что без канабиса тут не обошлось.

anonymous ()

Народ, проблемы с синхронизацией всёже возникают. Помогите примером, как ограничить доступ к списку для потоков.

Deleted ()
Ответ на: комментарий от Deleted

Помогите примером, как ограничить доступ к списку для потоков.

В твоем коде при работе со списками видимых ошибок нет - выставил блокировку, поработал со списком, снял блокировку - никакой магии тут нет. Единственное что я вижу - если пара num_1,num_2 не уникальна - удалаются все элементы удовлетворяющие условию, а не первый совпаший элемент спиcка.

Засада с блокировками - это неочевидные ошибки с взаимной блокировкой.

1)

Не знаю, важно ли это, но упомяну. До начала взаимодействия с этим списком потоки, как правило уже удерживают некую спинблокировку.

конечно это важно, скорей всего и является источником ошибки

2) http://www.kernel.org/doc/Documentation/spinlocks.txt

static DEFINE_SPINLOCK(xxx_lock);

	unsigned long flags;

	spin_lock_irqsave(&xxx_lock, flags);
	... critical section here ..
	spin_unlock_irqrestore(&xxx_lock, flags);

The above is always safe но не отменяет возможность ошибки 1), а safe это для случая

The reasons you mustn't use these versions if you have interrupts that
play with the spinlock is that >>> you can get deadlocks <<<:

	spin_lock(&lock);
	...
		<- interrupt comes in:
			spin_lock(&lock);

что может быть еще одним источником ошибки

anonymous ()
Ответ на: комментарий от Deleted

вот пример, где с одним списком работают несколько потоков - прошу прощения за портянку:

#include	<stdio.h>
#include	<stdlib.h>
#include	<pthread.h>

#define	MAXNITEMS 		1000000
#define	MAXNTHREADS			100

int		nitems;			

struct node* List ;

struct {
  pthread_mutex_t	mutex;
  pthread_cond_t	cond;
  int				nready;	
  int	count;
  long producer_sum  ;
  struct node* List ;
} put = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

struct {
  pthread_mutex_t	mutex;
  pthread_cond_t	cond;
  int				nready;	
  int	count;
  long consumer_sum  ;
  struct node* List ;
} get = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

void	*produce(void *), *consume(void *);
int  Length(char * name , struct node* head);
int  Pop(struct node** headRef);
void Add(struct node** headRef, int num) ;
 
int main(int argc, char **argv)
{
	int			i, nthreads, count[MAXNTHREADS] , count2[MAXNTHREADS];
	pthread_t	tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];

	List = NULL ;

	if (argc != 3)
	{
		printf("usage: prodcons <#items> <#threads>\n");
		exit(0);
	}
	nitems = atoi(argv[1]);
	nthreads = atoi(argv[2]);

	for (i = 0; i < nthreads; i++) 
	{
		count[i] = 0;
		pthread_create(&tid_produce[i], NULL, produce, &count[i]);
		count2[i] = 0;
		pthread_create(&tid_consume[i], NULL, consume, &count2[i]);
	}

	for (i = 0; i < nthreads; i++) 
	{
		pthread_join(tid_produce[i], NULL);
		pthread_join(tid_consume[i], NULL);
		printf("count[%d] = %d  count2[%d] = %d\n", i, count[i], i, count2[i]);	
	}

	printf("producer_sum =%d  consumer_sum =%d\n" , put.producer_sum , get.consumer_sum);  // results in len == 3
	printf("producer counter =%d  consumer counter =%d\n" , put.count , get.count);  // results in len == 3
    printf("queue length =%d\n",Length("List",List));  // results in len == 3

	exit(0);
}

void * produce(void *arg)
{
	for ( ; ; ) {
		pthread_mutex_lock(&put.mutex);
		if (put.count >= nitems) {
			pthread_mutex_unlock(&put.mutex);
			return(NULL);		/* array is full, we're done */
		}
		Add(&List, put.count ); 
		put.producer_sum +=put.count;
		put.count++;
		printf("producer =%d  consumer =%d\n" , put.count , get.count);  // results in len == 3
		pthread_mutex_unlock(&put.mutex);

		pthread_mutex_lock(&get.mutex);
		if (get.nready == 0)
			pthread_cond_signal(&get.cond);
		get.nready++;
		pthread_mutex_unlock(&get.mutex);

		*((int *) arg) += 1;
	}
	return(NULL);		/* array is full, we're done */
}

void * consume(void *arg)
{
	for ( ; ; ) {
			pthread_mutex_lock(&get.mutex);
			if (get.count >= nitems ) 
			{
				pthread_mutex_unlock(&get.mutex);
				return(NULL);		/* array is full, we're done */
			}
			if(get.count < put.count)
			{
				int queue = Pop(&List ); 
				get.consumer_sum += queue;
				get.count ++;
				printf("producer =%d  consumer =%d\n" , put.count , get.count);  // results in len == 3
				*((int *) arg) += 1;
			}
			pthread_mutex_unlock(&get.mutex);
	
			pthread_mutex_lock(&put.mutex);
			if (put.nready == 0)
				pthread_cond_signal(&put.cond);
			put.nready++;
			pthread_mutex_unlock(&put.mutex);
	}
	return(NULL);
}



kto_tama ★★★★★ ()
Последнее исправление: kto_tama (всего исправлений: 4)
Ответ на: комментарий от kto_tama

Разве реализация на уровне ядра такаяже как и в пользовательском пространстве?

Deleted ()
Ответ на: комментарий от kto_tama

Но всёравно спасибо за пример кода)

Deleted ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.