LINUX.ORG.RU

Thread safe Queue + delete object's with O(1)

 


0

1

Нужно реализовать вот такую незамысловатую штуку. Обычная thread safe очередь но с возможность удаления поставленных в нее объектов.

При этом хотелось бы что бы «опцию» удаления объектов из очереди можно было бы отключить безболезненно если она не нужна.

Реализация сводится к тому что бы в очереди хранить не сам объкт, а некий подставной хендл. Соотвественно при добавлении в очередь очередного объекта возвращать на него хендел через который его можно удалить.

Если пораскинуть мозгами, то появляются вот такие требования к реализации:

  • Сторона которая делает push получает хендел через который можно удалить объект
  • Если кто то сделал pop этого объекта, то удалить его через хендл полученных из push уже нельзя
  • Если объект был удален, то pop возвращает пустой хендл, поэтому перед использованием объекта полученного из pop нужно его проверить (хотя эту проверку можно делать сразу в pop)

У меня получилась вот такая реализация:

template<class T, class H = ::Holder<T> >
class Queue
{
	typedef typename H::Ptr Ptr;

public:
	typedef H Holder;

	object():stop_(false){}

	Holder push(T* t)
	{
		std::unique_lock<std::mutex> lock(mtx_);
		Holder holder(t);
		q_.push_back(holder.ptr());
		cond_.notify_one();
		return holder;
	}
	Holder pop()
	{
		std::unique_lock<std::mutex> lock(mtx_);
		while(q_.empty() && !stop_) cond_.wait(lock);
		if(stop_ && q_.empty())
			throw std::runtime_error("stopped");
		Ptr ptr = q_.front();
		q_.pop_front();
		return Holder(ptr);
	}

	void stop()
	{
		std::unique_lock<std::mutex> lock(mtx_);
		stop_ = true;
		cond_.notify_one();
	}
private:
	std::deque<Ptr> q_;
	bool stop_;
	std::mutex mtx_;
	std::condition_variable cond_;
};
Сразу бросается в глаза, что очередь может работать только с указателями. Но, в противном случае, как бы мы могли удалить объект в очереди (ниже будет понятнее).

Первый тип хендлера объекта, просто замещает обычный указатель. Нужен из за поддержки единиго кода в самой очереди

template<class T>
class Holder
{
public:
	typedef T* Ptr;

public:
	Holder(T* t): t_(t){}

	void reset() {  }

	operator bool(){ return static_cast<bool>(t_); }
	T* operator->() {return t_;}
	Ptr ptr() { return t_; }

private:
	T* t_;
};

Второй тип хендлера, подразумевает что настоящим владельцем (хранителем объекта) будет сам pusher

template<class T>
class RemovableHolder
{
public:
	typedef std::weak_ptr<T> Ptr;

public:
	RemovableHolder(T* t):t_(t){}
	RemovableHolder(const Ptr& ptr):t_(ptr.lock()){}

	void reset() { t_.reset(); }

	operator bool(){ return static_cast<bool>(t_); }
	T* operator->() {return t_.get();}
	Ptr ptr() { return std::weak_ptr<T>(t_); }

private:
	std::shared_ptr<T> t_;
};
Проще говоря в самой очереди мы храним лишь weak_ptr, первый shared_ptr мы отдаем push'ру и он собственно может в любой момент удалить реальный объект. После операции pop у нас появляется еще один шаред, и тем самым мы добиваемся требуемого. Но тут есть одна неприятность. Pusher должен хранить все объекты. Это не очень удобно, вернее это вообще не удобно.

Третий холдер, решает эту проблему

template<class T>
class ExRemovableHolder
{
	struct Impl
	{
	public:
		Impl(T* t):t_(t), is_popped_(false){}
		void destroy(bool from_popper)
		{
			std::unique_lock<std::mutex> lock(mtx_);
			if(is_popped_ && !from_popper)
			{
				return;
			}
			if(t_)
			{
				delete t_;
				t_ = 0;
			}
		}
		~Impl()
		{
			delete t_;
		}

	public:
		std::mutex mtx_;
		T* t_;
		bool is_popped_;
	};

public:
	typedef std::shared_ptr<Impl> Ptr;

public:
	ExRemovableHolder(T* t):impl_(new Impl(t)), popper_(false){}
	ExRemovableHolder(Ptr& ptr):popper_(true)
	{
		std::unique_lock<std::mutex> lock(ptr->mtx_);
		if(ptr->t_)
		{
			impl_ = ptr;
			impl_->is_popped_ = true;
		}
	}

	void reset() { impl_->destroy(popper_); }

	operator bool(){ return static_cast<bool>(impl_); }
	T* operator->() {return impl_->t_;}
	Ptr ptr() { return impl_; }

private:
	std::shared_ptr<Impl> impl_;
	bool popper_;
};
Теперь у нас pusher,popper и в самой очереди хранится shared_ptr. Pusher может его в ручном порядке удалить. Притом только если этот объект еще не был pop'нут. Так же нам в принципе нет необходимости вообще хранить объекты из pushra. Так же объект может быть удален в ручную из pop'ra. И так же объект будет удален автоматически если его вообще никто не удалял.

Вот пример как это используется

typedef Queue<Person, ExRemovableHolder<Person> > TSQueue;
typedef simple_object<TSQueue::Holder> DestroyQueue;

void popper(TSQueue& q)
{
	try
	{
		while(true)
		{
			TSQueue::Holder tmp = q.pop();
			if(tmp)
			{
				tmp->name();
				tmp.reset();
			}
		}
	}
	catch(...)
	{
		std::cout << "popper finished\n";
	}
}
void destroyer(DestroyQueue& q)
{
	try
	{
		while(true)
		{
			TSQueue::Holder tmp = q.pop();
			tmp.reset();
		}
	}
	catch(...)
	{
		std::cout << "destroyer finished\n";
	}
}

int main()
{
	TSQueue q;
	std::thread th(popper, std::ref(q));

	DestroyQueue d;
	std::thread thd(destroyer, std::ref(d));

	const int repeat = 9999999;
	for(int i=1; i<=repeat; i++)
	{
		TSQueue::Holder h = q.push(new Person("Vasya"));
		d.push(h);
	}

	q.stop();
	d.stop();
	th.join();
	thd.join();
}

Зачем вся эта котовасия понадобилась. Это вроде как ActiveObject где активные объекты должны исполнятся в пуле тредов. Т.е. очередь это интерфейс для асинхронного выполнения задач. Притом логично, что после постановки задачи в очередь, может произойти так, что эта задача станет уже не актуальной до момента, когда до нее дойдет очередь выполнения (например клиент отключился), и тогда ее выполнения будет бессмысленным.

Если система слобосвязана, то это по сути единственнй способ отменить выполнения таски.

А теперь я хотел бы выслушать вашу критику. И в плане идеи и в плане реализации.


Почему не сет удаленных и не синхронизированный с ним поп, который при обнаружении объекта в сете удалит его и попнет следующий?

Код не читал.

arturpub ★★
()

(далеко не читал - многа букаф)

1. ThreadSafe - это что? Сколько читателей, сколько пейсателей? Кто из может удолять?

2. Ограничение размера Queue какой-то величиной допустимо?

Pavval ★★★★★
()
Последнее исправление: Pavval (всего исправлений: 1)
Ответ на: комментарий от arturpub

Потому что эта мысль не посетила меня первой :)

Есть один недочет. По сути сет удаленных объектов не нужен. Вот почему: если мы пробуем удалить объект в момент когда его попнули и выполняютБ а мы его положили в очередь удаляемых, то этот объект навсегда останется в очереди на удаление. Для исключения этой ситуации, перед тем как положить элемент в очередь на удаление, его нужно чекнуть в основной очереди. Если мы его там нашли, что нам мешает его сразу удалить? Вот :)

Cupper
() автор топика
Ответ на: комментарий от Pavval

2. Ограничение размера Queue какой-то величиной допустимо?

это не относится к основной задаче, поэтому было опущено.

1. ThreadSafe - это что? Сколько читателей, сколько пейсателей? Кто из может удолять?

не ограничено. Писатель может удалить если читатель еще не успел забрать. Если читатель успел забрать, то объект удаляется либо читателем, либо после того как его все отпустят.

Cupper
() автор топика

Почему бы не заюзать unordered_map <handle, object> + deque <object> например?

nanoolinux ★★★★
()

очередь это интерфейс для асинхронного выполнения задач

В таком случае у хранимых элементов вполне может быть общий базовый класс. А если есть общий базовый класс, то твоё fifo можно построить вокруг boost::intrusive::slist<linear, cache_last>. Преимущество в том, что не будет лишних аллокаций под счетчики шаред-/вик-поинтеров. Мютексы на каждый элемент тоже можно не заводить.

Manhunt ★★★★★
()
Ответ на: комментарий от Cupper

Дисклаймер: это оверинжиниринг, лучше эксклюзивно лочить очередь и удалять из нее объекты, неважно сколько в ней их. То же с попом.

Из спортивного интереса: делаешь мап объекта на число, где число — серийник в очереди. Серийник это абсолютный индекс добавления (все время растет). При изъятии увеличиваешь Поправку. Теперь любой объект в очереди имеет серийник = свой индекс + поправка. Т.о. упомянутый мап всегда однозначно указывает на индекс объекта в очереди. Если этот индекс за пределами очереди, значит задача уже вышла из нее. Если внутри, значит ее надо выкинуть при изъятии.

arturpub ★★
()
Ответ на: комментарий от arturpub

Две мысли в одну затупил. Надо просто выкидывать из мапа изъятые задачи и помещать в него добавляемые. Получаешь очередь-сет. Лукап моментальный, очередность сохраняется, твоя задача решается за О(1) без потерь в других местах.

arturpub ★★
()
Ответ на: комментарий от Manhunt

а вы могли бы примерно описать как тогда должны выполнятся требования описанные в начале поста, про хендлен который пашеру, который попперу, о порядке и ограничениях на удаление объекта. В впервые сталкиваюсь с интрусивными контейнерами, и пока толком не понял как можно избавится от хотябы мютексов для доступа к объекту. Ведь он же шарится между пашером, поппером да еще и самим контейнером.

Cupper
() автор топика
Ответ на: комментарий от asaw

это опечаточка :) исторически сложилось что очередь у нас называется object. Я решил этот бред не выносит на ружу, да бы не путать людей, неполучилось :(

Cupper
() автор топика
Ответ на: комментарий от dzidzitop

Вы про ExRemovableHolder наверно? Выглядит странно, но щас попробую объяснить. объект ExRemovableHolder::Impl находится под shared_ptr, каждый кто владете хендлером на реальный объект хранит этот shared_pt<Impl>.

Если у нас существует, более чем один shared_pt<Impl> то Impl::destroy() и Impl::~Impl() не могут быть вызваны одновременно.

Если у нас остался только один shared_pt<Impl> то они также не могут быть вызваны одновременно. Т.к. один shared_pt<Impl> на один поток выполнения.

Вполне нормальная ситуация когда у нас pusher вызывает Impl::destroy(), а затем вызывается Impl::~Impl() -> delete 0 корректная операция.

Еще раз Impl::destroy() и Impl::~Impl() не могут быть вызваны параллельно.

Cupper
() автор топика
Ответ на: комментарий от Cupper

Управление временем жизни таски осталось полностью за скобками. Код не тестировал, это просто иллюстрация идеи.

#include <boost/intrusive/slist.hpp>
#include <boost/utility.hpp>

#include <condition_variable>
#include <mutex>
#include <cassert>

using namespace boost::intrusive;

class Queue;

class TaskBase : public slist_base_hook<link_mode<normal_link> >, boost::noncopyable
{
	friend class Queue;
	typedef slist<TaskBase, linear<true>, cache_last<true> > List;

	bool m_isInQueue;
public:

	TaskBase() : m_isInQueue(false) {}
	virtual ~TaskBase()  { assert(!m_isInQueue); }

	virtual void Execute() = 0;
};

class Queue
{
	TaskBase::List m_list;
	std::mutex m_mutex;
	std::condition_variable m_condition;
public:
	void Push(TaskBase &task)
	{
		std::unique_lock<std::mutex> lock(m_mutex);

		assert(!task.m_isInQueue);

		m_list.push_back(task);
		task.m_isInQueue = true;

		m_condition.notify_one();
	}
	bool Cancel(TaskBase &task)
	{
		std::unique_lock<std::mutex> lock(m_mutex);

		if(!task.m_isInQueue)
			return false;

		TaskBase::List::iterator it = m_list.iterator_to(task);
		m_list.erase(it);
		task.m_isInQueue = false;

		return true;
	}
	TaskBase &Pop()
	{
		std::unique_lock<std::mutex> lock(m_mutex);

		while(m_list.empty())
			m_condition.wait(lock);

		TaskBase &task = m_list.front();
		m_list.pop_front();
		task.m_isInQueue = false;
		return task;
	}
};
Manhunt ★★★★★
()
Последнее исправление: Manhunt (всего исправлений: 2)
Ответ на: комментарий от Cupper

Вы про ExRemovableHolder наверно?

я про Queue. Он должен синхронизовать память с тем что делали constructor/push/pop

Проблема не в паралельном доступе, но в visibility (memory order).

Про остальные - нужно смотреть юзкейсы. Скорее всего то же самое.

dzidzitop ★★
()
Ответ на: комментарий от dzidzitop

Проблема не в паралельном доступе, но в visibility (memory order).

Поясните. В момент когда Queue разрушается ею владее только один поток, о каком memory order идет речь?

Cupper
() автор топика
Ответ на: комментарий от dzidzitop

Э... ну вобщето T1 не должен удалять Queue пока ей кто то пользуется. Queue как бы сама по себе не threadsafe, она лишь предоставляет threadsafe доступ к хранимым объектам.

А представьте что T2 весит на мютексте, пока T1 удаляет очередь (в деструкторе заблокировали мютекс), и что будет после того как деструктор завершит свою работу?)

Cupper
() автор топика
Ответ на: комментарий от Cupper

повторю: без синхронизации памяти T1 в деструкторе может не увидеть что T2 что-то делал (пусть даже и с мутексом). В результате - undefined behaviour.

Правда и то, что даже если T2 гарантированно не будет ничего делать с Queue в момент вызова деструктора, то с точки зрения Стандарта и современных процессоров меняется мало. Хотя это не будет возможно для x86 & amd64 с их sequental memory model.

dzidzitop ★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.