19 #ifndef _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
20 #define _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
23 #include <boost/scoped_ptr.hpp>
24 #include <boost/shared_ptr.hpp>
25 #include <boost/thread/mutex.hpp>
26 #include <boost/thread/locks.hpp>
33 template <
class _qoT,
class _sT>
38 const std::string& queueWorkingDir,
39 const unsigned priorityLevels,
40 const unsigned maxItemsPerPage) :
42 workingDir(queueWorkingDir),
43 levels(priorityLevels),
44 maxPageSize(maxItemsPerPage),
47 lastPopedObject.reset();
56 void push(boost::shared_ptr<_qoT> p,
unsigned priority)
58 boost::lock_guard<boost::mutex> guard(objectMutex);
59 if(priority >= levels) {
62 persistedQList[priority]->push_back(p);
65 boost::shared_ptr<_qoT>
pop()
67 boost::lock_guard<boost::mutex> guard(objectMutex);
69 while(i < levels && persistedQList[i]->
empty()) {
73 lastPopedObject = persistedQList[i]->pop_front();
74 return lastPopedObject;
79 return lastPopedObject;
84 if(lastPopLevel >= 0) {
85 boost::lock_guard<boost::mutex> guard(objectMutex);
86 persistedQList[lastPopLevel]->push_back(lastPopedObject);
88 lastPopedObject.reset();
94 boost::lock_guard<boost::mutex> guard(objectMutex);
96 while(i < levels && persistedQList[i]->
empty()) {
104 persistedQList[level]->clear();
109 for(
unsigned i = 0; i < levels; ++i) {
110 persistedQList[i]->clear();
118 boost::lock_guard<boost::mutex> guard(objectMutex);
119 for(
unsigned i = 0; i < levels; ++i) {
120 std::stringstream ss;
123 ss <<
"priority_" << i;
124 qn = name + ss.str();
126 boost::shared_ptr<PersistedQueue<_qoT, _sT> > persistedQ;
128 persistedQList.push_back(persistedQ);
134 boost::lock_guard<boost::mutex> guard(objectMutex);
135 for(
unsigned i = 0; i < levels; ++i) {
136 persistedQList[i].reset();
141 std::string workingDir;
143 unsigned maxPageSize;
145 boost::shared_ptr<_qoT> lastPopedObject;
146 std::vector<boost::shared_ptr<PersistedQueue<_qoT, _sT> > > persistedQList;
147 boost::mutex objectMutex;
152 #endif // _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
void clear()
Definition: threadsafe_persisted_priority_queue.hpp:107
Definition: persisted_queue.hpp:114
void clear(unsigned level)
Definition: threadsafe_persisted_priority_queue.hpp:102
Definition: threadsafe_persisted_priority_queue.hpp:34
ThreadsafePersistedPriorityQueue(const std::string &queueName, const std::string &queueWorkingDir, const unsigned priorityLevels, const unsigned maxItemsPerPage)
Definition: threadsafe_persisted_priority_queue.hpp:37
~ThreadsafePersistedPriorityQueue()
Definition: threadsafe_persisted_priority_queue.hpp:51
bool empty()
Definition: threadsafe_persisted_priority_queue.hpp:92
void push_back_last_pop()
Definition: threadsafe_persisted_priority_queue.hpp:82
boost::shared_ptr< _qoT > last_pop_object()
Definition: threadsafe_persisted_priority_queue.hpp:77
void push(boost::shared_ptr< _qoT > p, unsigned priority)
Definition: threadsafe_persisted_priority_queue.hpp:56
boost::shared_ptr< _qoT > pop()
Definition: threadsafe_persisted_priority_queue.hpp:65