LINUX.ORG.RU

[Threads] Реализация неблокирующего чтения на мьютексах


0

1

Суть такова. Есть два класса объектов - читатели и писатели. Необходимо осуществить их совместный доступ к разделяемому ресурсу по следующей схеме: читатели могут одновременно читать ресурс до тех пор, пока не появился хотя бы один писатель. При появлении писателя все вновь прибывающие читатели разбегаются в ужасе блокируются до тех пор, пока писатель не закончит работу. Если на момент появления писателя уже происходило чтение несколькими читателями, то сам писатель блокируется до тех пор, пока последний незаблокированный читатель не закончит читать. Писать одновременно имеет право только один писатель (логично). Важное условие: синхронизацию необходимо осущетвить с помощью мьютексов.

Решение на Си, в котором (где-то) определен тип mutex и функции void lock(mutex *), void unlock(mutex *):



static mutex WR;  /* где-нибудь инициализируется заранее */
static mutex R;   /* аналогично */

static int locked = 0;
static int countR = 0;

void Read(void){
  /* читатели */
  int flag = 0;
  lock(R);
  countR++;
  if(!locked){ lock(RW); flag=1; }
  unlock(R);
   
  /* ... некое чтение ресурса ... */

  lock(R);
  countR--;
  if(flag){ locked = 0; unlock(RW); }
  unlock(R);
}



void Write(void){
  /* писатели */
  int weHaveReaders;
  do{
    lock(R);
    if(countR > 0) weHaveReaders = 1;
    else { 
       locked = 0;
       unlock(R); 
       lock(RW); 
       weHaveReaders = 0; 
    }
  }while(weHaveReaders);


  /* ... некая запись того же ресурса ... */

  lock(R);
  unlock(RW);
  locked = 0;
  unlock(R);
}

Вопросы: корректно ли решение? Как бы вы решили эту задачу?

Спасибо за внимание.



Последнее исправление: gandjubas (всего исправлений: 5)

Ответ на: комментарий от LamerOk

> RW-lock

Это тоже не прокатит, нужен примитив - мьютекс (по условию задачи).

gandjubas
() автор топика
Ответ на: комментарий от Vamp

Нет, требуется железобетонное кросплатформенное решение (не зависящее ни от pthreads, ни от виндовсапи, ни от чего вообще, лок/унлок мьютексов предполагается вообще через колбэки сделать).

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

Выбраны мьютексы, потому что они есть более или менее везде.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

> А я бы сначала условие задачи внимательно прочитал.

Лучше бы ты его сформулировал чётко с первого раза.

tailgunner ★★★★★
()
Ответ на: комментарий от tailgunner

Лучше бы вы конкретно указали что не понятно, потому что я достаточно подробно всё описал.

gandjubas
() автор топика

Неправильно. Нужно считать кол-во активных читателей и разрешать запись только когда оно станет равным нулю. Сейчас у тебя блокирует разрешение на запись только первый из активных читателей. Остальные просто читают данные безо всяких блокировок. Уехать на одних мьютексах не получится, потому что мьютекс должен освобождаться строго тем же тредом, который его захватил. Нужны условия типа pthread_cond.

erDiZz
()
Ответ на: комментарий от gandjubas

неужели на мьютексах не получится сделать?

на одних мьютексах/семафорах эта задача решается только при введении максимального количества читающих. в общем случае нужны условные переменные - ну или готовый rw-lock

jtootf ★★★★★
()

> корректно ли решение?

Нет. На лицо типичный race condition: что будет, если в треде_1 сразу перед /* ... некое чтение ресурса ... */ тред_2, который до того захватил RW, его освободит?

kemm
()
Ответ на: комментарий от erDiZz

Самые суровые, желательно так же ни от каких библиотек не зависеть. Например, если начихать на требование одновременного чтения (видимо, так и придётся сделать, т.к. пишется подсистемка для клиента, а там не так критична производительность, главное, чтобы кросплатформенно было и не глючило), то можно вообще абстрагироваться от мьютексов и инициализировать систему вот такими колбками:

static CRITICAL_SECTION x; /* где-то у клиента подсистемы (это может быть реализовано уже в виде неких макросов, зависящих от дефайнов, которыми можно управлять при компиляции) */
void my_mutex_lock(void){
  EnterCriticalSection(&x);
}

void my_mutex_unlock(void){
  EnterCriticalSection(&x);
}

/* инициализация подсистемы */
ini_subsys(my_mutex_lock, my_mutex_unlock);
gandjubas
() автор топика
Ответ на: комментарий от jtootf

> на одних мьютексах/семафорах эта задача решается только при введении максимального количества читающих

Ну, собственно, ТС такое решение вполне должно устроить, кстати. Это для чего-то общего назначения никуда не годится...

kemm
()
Ответ на: комментарий от kemm

Хрень будет. Райтер запорет ридеров (которые проскочили мимо спящей вахтёрши locked). И вот как от этого избавиться?

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

См. сообщение jtootf. Ограничиваем максимальное кол-во одновременно читающих и используем семафоры (которые поверх мьютексов прекрасно реализуются).

kemm
()
Ответ на: комментарий от kemm

Да ладно, это же в отдельном треде будет.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

> желательно так же ни от каких библиотек не зависеть.

Ты б действительно учился корректно вопросы задавать. Сейчас высокая облачность и телепатические волны толкуют вопросы в разделе как вопросы нормальных программистов, решающих конкретные задачи, а не школьников, делающих сферических коней в вакууме.

Правильный ответ на твой вопрос - используй sig_atomic_t из <signal.h> для эмуляции rw-lock'ов. Можно будет сделать и на С++, но только на имплементации 0х. В обеих случаях будет использована стандартная библиотека. Тормозить всё будет феерично.

И да, я надеюсь, ты понимаешь, что никаких потоков выполнения у тебя быть не может по твоему же определению задачи?

LamerOk ★★★★★
()
Ответ на: комментарий от LamerOk

> Ты б действительно учился корректно вопросы задавать.

Что не так в вопросе? По-русски же написал что нужно сделать на голых мьютексах.

И да, я надеюсь, ты понимаешь, что никаких потоков выполнения у тебя быть не может по твоему же определению задачи?

А мне не нужны потоки, это будет dll-ка/so-шка, в которой надо будет контролировать одновременный доступ к неким ресурсам внутри длл-ки.

gandjubas
() автор топика

есть довольно простые в использовании команды для подобного случая: atomic_cmpxchg и atomic_inc_not_zero:

читатели юзают atomic_inc_not_zero, чтобы регистрировать периоды чтения, а пишущий тред перед записью проверяет есть ли читатели с помощью atomic_cmpxchg.

в совокупности с процессорной паузой эти команды могут разрулить ситуацию быстро и правильно.

единственное ограничение - это современные процессоры.

mrs
()
Ответ на: комментарий от mrs

> единственное ограничение - это современные процессоры.

Нет, уж лучше блокировать наглухо и читателей и писателей.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

> По-русски же написал что нужно сделать на голых мьютексах.

«Голые мьютексы» и «без библиотек» - взаимоисключащие параграфы.

Что не так в вопросе?


То, что вопрос озаглавлен как:

[Threads] Реализация неблокирующего чтения на мьютексах


А выясняеется, что тебе ни подсистема потоков выполнения, ни мьютексы не досупны «по условиям задачи».

Твой вопрос на самом деле звучит так: «Как эмулировать RW-lock'и в C/C++?»

Теперь сравни с тем, что ты понаписал.

LamerOk ★★★★★
()
Ответ на: комментарий от LamerOk

> ни мьютексы не досупны «по условиям задачи».

Ты хорошо понимаешь что такое мьютекс? Судя по тому что ты пишешь, для тебя одна фигня что семафоры, что мьютексы, что RW-локи. Знание вумных слов мало ещё кого спасало.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

> Судя по тому что ты пишешь, для тебя одна фигня

Ну, показывай.

Когда закончишь съезжать с темы и шланговать в стилистике «лучшая защита - нападение, пусть даже бессмысленое и заведомо проигрышное», можешь рассказать, что такое мьютекс, и как он связан со стандартами С89/99 и С++98/03. Вдруг я и вправду не знаю чего-то об одном или другом?

LamerOk ★★★★★
()
Ответ на: комментарий от LamerOk

Когда научишься отвечать по теме, а не кидать дешёвые понты а ля «почитайте 3 тома Кнута», приходи, послушаем.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

Ты лишний раз меня убедил, что нет смысла отвечать идиотам, не способным правильно задать вопрос.

LamerOk ★★★★★
()

Ладно, раз не сессия, тогда расскажу. Не слушай тех, кто считает, что не получится без rw-lock'ов. rw-lock, сам по себе, абстракция более высокого уровня - на уровне команд процессора имеются только atomic-функции (Interlocked) и все. Все системные блокировки реализованы на них, и следовательно, их все можно реализовать самому по определению. Кроме того, не забываем, что наши процессоры гарантируют нужные барьеры в памяти, и чтение/запись простых данных атомарно само по себе, и в специальных огородках не нуждается.

Правда, не факт, что именно на мютексах - и все наши блокировки будут либо разновидностями спинлоков, либо тупо засыпать на таймаут, потому что заснуть и проснуться по сигналу без поддержки ОС не выйдет.
Итак, пример. (псевдокод в части атомарных функций).

[code=cpp]
bool writer_active = false;
int readers = 0;

void Read() {
while (writer_active) sleep(10);
atomic_increment(readers);
do_reading();
atomic_decreament(readers);
}

void Write() {
writer_active = true;
while (readers > 0) sleep(10);
do_writing();
writer_active = false;

[/code]

На самом деле, твоя задача очень простая. Ты разрешаешь писателю блокироваться, пока читатели не закончат свои чтения. Это очень роскошно и кучеряво. Я лично имел дело с системой, в которой райтеру спать было нельзя - он обязан был немедленно записать данные (они приходили с высокой нагрузкой извне) и хоть трава не расти.
А ридеры должны были понимать, что данные были изменены в процессе чтения и перечитать. И тоже все было сделано без каких бы то ни было системных примитивов синхронизации. Кстати, даже без атомарных функций.

Vamp
()
Ответ на: комментарий от Vamp

> А ридеры должны были понимать, что данные были изменены в процессе чтения и перечитать.

Хм, это интересная мысль.

gandjubas
() автор топика
Ответ на: комментарий от kemm

А так?


typedef struct Msg Msg;
typedef void (*action)(void *pData);
typedef void (*decrement)(void);

struct Msg{
  void      *pData;
  action     fpRun; 
  decrement  fpDecrement;
   
  Msg *pNext;
  Msg *pPrev;
}

void
enque(Msg *pMsg){
   /* ... */
}

Msg *
deque(void){
   /* ... */
}
  
static Msg  msgHead;
static mutex Q /* = ... */;
static mutex W /* = ... */;
static int countW = 0;
static int countR = 0;
static int isRunning = 0;

void iniQue(){
   msgHead->pNext = &msgHead;
   msgHead->pPrev = &msgHead;
}

void 
writeNonSin(void *pData){
   /* некое чтение общего ресурса pData */
}

void 
readNonSin(void *pData){
   /* некая запись общего ресурса pData */
}

void DecrementCountW(){
   lock(&W);
   countW--;
   unlock(&W);
}
void DecrementCountR(){
   lock(&W);
   countR--;
   unlock(&W);

}


void runActions(){
   Msg *pNext;
   lock(&Q);
   if(!isRunning){
    isRunning = 1;
    while( NULL != (pNext = deque()) ){
       unlock(&Q);
       pNext->fpRun(pNext->pData);
       pNext->fpDecrement();
       lock(&Q);
    }
    isRunning = 0;
   }
   unlock(&Q);
}


void 
readSin(void *pData){
   int fRunNonSin = 1, fRunMsgQue = 1;
   
   lock(&W);
   countR++;
   if(countW){
       Msg msg;
       msg.pData = pData;
       msg.fpRun = readNonSin;
       msg.fpDecrement = DecrementCountR;
       
       lock(&Q);
       enque(&msg);
       fRunNonSin = 0;
       unlock(&Q);
   }
   unlock(&W);

   if(fRunNonSin){
      
      readNonSin(pData);
      
      lock(&W);
      countR--;
      if(countW){
         unlock(&W);
         lock(&Q);
         if(isRunning) fRunMsgQue = 0;
         unlock(&Q);
         if(fRunMsgQue)   runActions();
      }
      else{
         unlock(&W);
      }
   }
}


void 
writeSin(void *pData){
   int fRunMsgQue = 0;
   Msg msg;
   msg.pData = pData;
   msg.fpRun = writeNonSin;
   msg.fpDecrement = DecrementCountW;
   
   lock(&W);
   countW++;
   lock(&Q);
   enque(&msg);
   if(0 == countR && 1 == countW) fRunMsgQue = 1;
   unlock(&Q);
   unlock(&W);
   
   if(fRunMsgQue) runActions();
}

gandjubas
() автор топика
Ответ на: комментарий от Vamp

А разве не может возникнуть такой ситуации:

void Read() {
while (writer_active) sleep(10);

// Вот здесь происходит переключение контекста
// А мы еще не инкрементировали число ридеров
// Запускается Writer - видит, что число ридеров = 0 и начинает выполнять операцию записи
// А потом контекст возвращается и...

atomic_increment(readers);
// Мы читаем неконсистентные данные :((
do_reading();
atomic_decreament(readers);
}

Slader
()
Ответ на: комментарий от jtootf

на одних мьютексах/семафорах эта задача решается только при введении максимального количества читающих. в общем случае нужны условные переменные - ну или готовый rw-lock

Ха! А вот и не угадали:


#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#define WINDOWS
#ifdef WINDOWS
	#include <Windows.h>
	typedef CRITICAL_SECTION pthread_mutex_t;
	static int pthread_mutex_lock(pthread_mutex_t *m)
	{
		EnterCriticalSection(m);
		return 0;
	}

	static int pthread_mutex_unlock(pthread_mutex_t *m)
	{	
		LeaveCriticalSection(m);
		return 0;
	}

	static void iniMtx(pthread_mutex_t *m){
		InitializeCriticalSection(m);
	}
	static void releaseMtx(pthread_mutex_t *m){
		DeleteCriticalSection(m);
	}
#else
	#include <pthread.h>
	static void iniMtx(pthread_mutex_t *m){
		*m = PTHREAD_MUTEX_INITIALIZER;
	}
	static void releaseMtx(pthread_mutex_t *m){
	}
#endif

#define MAX_THREADS 20
#define BUF_SIZE 255

typedef struct MyData {
    int val1;
    int val2;
} MYDATA, *PMYDATA;

static pthread_mutex_t mtx;

void 
readSin(void *pData);
void 
writeSin(void *pData);

DWORD WINAPI MyThreadFunction( LPVOID lpParam ) 
{
	PMYDATA pDataArray = (PMYDATA)lpParam;
	if(pDataArray->val2)
		readSin(lpParam);
	else
		writeSin(lpParam);
	return 0;
}

void 
iniQue();
void
releaseQue();

int main(void)
{
	 int i;
	PMYDATA pDataArray[MAX_THREADS];
    DWORD   dwThreadIdArray[MAX_THREADS];
    HANDLE  hThreadArray[MAX_THREADS]; 

	iniMtx(&mtx);
	iniQue();

	for( i=0; i<MAX_THREADS; i++ )
    {
        // Allocate memory for thread data.

        pDataArray[i] = (PMYDATA) HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY,
                sizeof(MYDATA));

        if( pDataArray[i] == NULL )
        {
            ExitProcess(2);
        }

        // Generate unique data for each thread to work with.

        pDataArray[i]->val1 = i;
        pDataArray[i]->val2 = i % 3;

        // Create the thread to begin execution on its own.

        hThreadArray[i] = CreateThread( 
            NULL,                   // default security attributes
            0,                      // use default stack size  
            MyThreadFunction,       // thread function name
            pDataArray[i],          // argument to thread function 
            0,                      // use default creation flags 
            &dwThreadIdArray[i]);   // returns the thread identifier 


        // Check the return value for success.
        // If CreateThread fails, terminate execution. 
        // This will automatically clean up threads and memory. 

        if (hThreadArray[i] == NULL) 
        {
           //ErrorHandler(TEXT("CreateThread"));
           ExitProcess(3);
        }
    } // End of main thread creation loop.

    // Wait until all threads have terminated.

    WaitForMultipleObjects(MAX_THREADS, hThreadArray, TRUE, INFINITE);

    // Close all thread handles and free memory allocations.

    for(i=0; i<MAX_THREADS; i++)
    {
        CloseHandle(hThreadArray[i]);
        if(pDataArray[i] != NULL)
        {
            HeapFree(GetProcessHeap(), 0, pDataArray[i]);
            pDataArray[i] = NULL;    // Ensure address is not reused.
        }
    }

	getchar();
	releaseMtx(&mtx);
	releaseQue();
	return 0;
}


typedef struct Msg Msg;
typedef void (*action)(void *pData);
typedef void (*decrement)(void);

struct Msg{
  void      *pData;
  action     fpRun; 
  decrement  fpDecrement;
   
  Msg *pNext;
  Msg *pPrev;
};

typedef struct MsgQue MsgQue;
struct MsgQue{
	pthread_mutex_t Q;
	int isRunning;
	Msg  msgHead;
};

static MsgQue que;
void
enque(Msg *pMsg){
	Msg *pNew = (Msg *)malloc(sizeof(Msg)), *pP;
	if(NULL == pNew) goto fail;
	memcpy(pNew, pMsg, sizeof(Msg));
	pP = que.msgHead.pPrev;
	pP->pNext = pNew;
	que.msgHead.pPrev = pNew;
	pNew->pNext = &(que.msgHead);
	pNew->pPrev = pP;
   /* ... */
fail:
;
}

Msg *
deque(void){
	static Msg m;
	Msg *pN = que.msgHead.pNext;
	if(pN == &(que.msgHead)) return NULL;
	que.msgHead.pNext = pN->pNext;
	pN->pNext->pPrev = &(que.msgHead);
	memcpy(&m, pN, sizeof(m));
	free(pN);
   /* ... */
	return &m;
}
  
static pthread_mutex_t W /* = ... */;
static int countW = 0;
static int countR = 0;
static int countR_NonSin = 0;

void 
iniQue(){
   que.msgHead.pNext = &(que.msgHead);
   que.msgHead.pPrev = &(que.msgHead);
   iniMtx(&W);
   iniMtx(&que.Q);
   que.isRunning = 0;
}
void
releaseQue(){
	releaseMtx(&W);
	releaseMtx(&que.Q);
}

#define info(FMT, ...) do{\
	pthread_mutex_lock(&mtx); \
	printf(FMT, __VA_ARGS__); \
	pthread_mutex_unlock(&mtx); \
}while(0)

void 
writeNonSin(void *pData){
   /* некое чтение общего ресурса pData */
	int i;
	PMYDATA pDataArray = (PMYDATA)pData;
	info("Writer >> %d starting writing!\n", pDataArray->val1);
	//printf("Writer >> %d starting writing!\n", pDataArray->val1);

	for(i=0; i<10; i++)
		info("Writer >> %d is writing...\n", pDataArray->val1);

	info("Writer >> %d finished writing!\n", pDataArray->val1);
}

void 
readNonSin(void *pData){
	int i;
   /* некая запись общего ресурса pData */
	PMYDATA pDataArray = (PMYDATA)pData;
	info("Reader %d starting reading!\n", pDataArray->val1);

	for(i=0; i<10; i++)
		info("Reader %d is reading...\n", pDataArray->val1);

	info("Reader %d finished reading!\n", pDataArray->val1);

}

void 
DecrementCountW(void){
   pthread_mutex_lock(&W);
   countW--;
   pthread_mutex_unlock(&W);
}
void 
DecrementCountR(void){
   pthread_mutex_lock(&W);
   countR--;
   pthread_mutex_unlock(&W);

}


void 
runActions(){
   Msg *pNext;
   pthread_mutex_lock(&W);
   pthread_mutex_lock(&que.Q);
//   if(!countR_NonSin && !que.isRunning) 
//	   info("-- runActions() countR_NonSin == %d que.isRunning == %d\n", countR_NonSin, que.isRunning);
   if(!que.isRunning && !countR_NonSin){
//	   info("-- Actual run \n");
    que.isRunning = 1;
	pthread_mutex_unlock(&W);
    while( NULL != (pNext = deque()) ){
       pthread_mutex_unlock(&que.Q);
	   pthread_mutex_lock(&W);
	   if(!countR_NonSin || pNext->fpRun == readNonSin){
		   pthread_mutex_unlock(&W);
		   pNext->fpRun(pNext->pData);
           pNext->fpDecrement();
		   pthread_mutex_lock(&que.Q);
	   }else{
		   pthread_mutex_unlock(&W);
           pthread_mutex_lock(&que.Q);
		   enque(pNext);
	   }
    }
    que.isRunning = 0;
   }else{
	   pthread_mutex_unlock(&W);
   }
   pthread_mutex_unlock(&que.Q);
}


void 
readSin(void *pData){
	PMYDATA pDataArray = (PMYDATA)pData;
   pthread_mutex_lock(&W);
   countR++;
   //info("<-------------- reader # %d countR == %d , countW == %d ...\n", 
	  // pDataArray->val1,
	  // countR, countW);
   if(countW){
       Msg msg;
       msg.pData = pData;
       msg.fpRun = readNonSin;
       msg.fpDecrement = DecrementCountR;
       
       pthread_mutex_lock(&que.Q);
       enque(&msg);
       pthread_mutex_unlock(&que.Q);
	   pthread_mutex_unlock(&W);
   }
   else{
//	   info("<<<<<<<<<< reader # %d fRunNonSin...\n", pDataArray->val1);
	   countR--; 
	   countR_NonSin++;
	   pthread_mutex_unlock(&W);
	   readNonSin(pData);

	   pthread_mutex_lock(&W);
	   countR_NonSin--;
      if(countW){
         pthread_mutex_unlock(&W);
         runActions();
      }
      else{
         pthread_mutex_unlock(&W);
      }
   }
}


void 
writeSin(void *pData){
   PMYDATA pDataArray = (PMYDATA)pData;
   Msg msg;
   msg.pData = pData;
   msg.fpRun = writeNonSin;
   msg.fpDecrement = DecrementCountW;

   pthread_mutex_lock(&W);
   countW++;
 //  info("<------------ writer # %d countW == %d ...\n", pDataArray->val1, countW);

   pthread_mutex_lock(&que.Q);
   enque(&msg);
   pthread_mutex_unlock(&que.Q);
   pthread_mutex_unlock(&W);
   
   runActions();
}

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

Тут многовато виндувса, но суть ясна в общем. Это решение посл обработки напильником можно и в Линуксе запустить.

gandjubas
() автор топика
Ответ на: комментарий от Vamp

> Не слушай тех, кто считает, что не получится без rw-lock'ов

А кто тут говорил, что без них не получится?

LamerOk ★★★★★
()
Ответ на: комментарий от LamerOk

> Да-да, без библиотек, кроссплатформенно. Мы верим, верим.

Увидел знакомые слова и не смог удержаться чтоб не сморозить очередную чушь? Сочувствую.

gandjubas
() автор топика
Ответ на: комментарий от Slader

Да, был неправ. Поздно писал вчера.
Тогда так (читатель/писатель):

int volatile lock = 0;

[code=c]
bool Read() {
int begin_lock = lock;

read_data(tmp_buf);

int end_lock = lock;

if (begin_lock == end_lock && is_even(lock)) {
process_data(tmp_buf);
return true;
}
return false;
}

void Write() {
atomic_increment(lock);

write_data();

atomic_increment(lock);
}
[/code]

Vamp
()
Ответ на: комментарий от Vamp

Схема интересная, но она, увы, малоэффективна, когда под чтением понимаются какие-нибудь дикие мат. вычисления над независимыми объектами (lockless), а под записью - редкие изменения структуры этих объектов.

gandjubas
() автор топика
Ответ на: комментарий от gandjubas

Смотря как организуешь. Никто не мешает сначала прочесть данные, верифицировать их, а потом уже приступать к диким вычислениям.

Vamp
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.