KISSCPP
a C++ library for rapid application development
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
threadsafe_persisted_priority_queue.hpp
Go to the documentation of this file.
1 // File : threadsafe_persisted_priority_queue.hpp
2 // Author: Dirk J. Botha <bothadj@gmail.com>
3 //
4 // This file is part of kisscpp library.
5 //
6 // The kisscpp library is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU Lesser General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // The kisscpp library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public License
17 // along with the kisscpp library. If not, see <http://www.gnu.org/licenses/>.
18 
19 #ifndef _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
20 #define _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
21 
22 #include <vector>
23 #include <boost/scoped_ptr.hpp>
24 #include <boost/shared_ptr.hpp>
25 #include <boost/thread/mutex.hpp>
26 #include <boost/thread/locks.hpp>
27 #include "persisted_queue.hpp"
28 
29 //--------------------------------------------------------------------------------
30 namespace kisscpp
31 {
32 
33 template <class _qoT, class _sT>
34 class ThreadsafePersistedPriorityQueue : public boost::noncopyable
35 {
36  public:
37  ThreadsafePersistedPriorityQueue(const std::string& queueName,
38  const std::string& queueWorkingDir,
39  const unsigned priorityLevels,
40  const unsigned maxItemsPerPage) :
41  name(queueName),
42  workingDir(queueWorkingDir),
43  levels(priorityLevels),
44  maxPageSize(maxItemsPerPage),
45  lastPopLevel(-1)
46  {
47  lastPopedObject.reset();
48  createQueues();
49  }
50 
52  {
53  destroyQueues();
54  }
55 
56  void push(boost::shared_ptr<_qoT> p, unsigned priority)
57  {
58  boost::lock_guard<boost::mutex> guard(objectMutex);
59  if(priority >= levels) {
60  priority = levels-1;
61  }
62  persistedQList[priority]->push_back(p);
63  }
64 
65  boost::shared_ptr<_qoT> pop()
66  {
67  boost::lock_guard<boost::mutex> guard(objectMutex);
68  unsigned i = 0;
69  while(i < levels && persistedQList[i]->empty()) {
70  ++i;
71  }
72  lastPopLevel = i;
73  lastPopedObject = persistedQList[i]->pop_front();
74  return lastPopedObject;
75  }
76 
77  boost::shared_ptr<_qoT> last_pop_object()
78  {
79  return lastPopedObject;
80  }
81 
83  {
84  if(lastPopLevel >= 0) {
85  boost::lock_guard<boost::mutex> guard(objectMutex);
86  persistedQList[lastPopLevel]->push_back(lastPopedObject);
87  lastPopLevel = -1;
88  lastPopedObject.reset();
89  }
90  }
91 
92  bool empty()
93  {
94  boost::lock_guard<boost::mutex> guard(objectMutex);
95  unsigned i = 0;
96  while(i < levels && persistedQList[i]->empty()) {
97  ++i;
98  }
99  return (i == levels);
100  }
101 
102  void clear(unsigned level)
103  {
104  persistedQList[level]->clear();
105  }
106 
107  void clear()
108  {
109  for(unsigned i = 0; i < levels; ++i) {
110  persistedQList[i]->clear();
111  }
112  }
113 
114  protected:
115  private:
116  void createQueues()
117  {
118  boost::lock_guard<boost::mutex> guard(objectMutex);
119  for(unsigned i = 0; i < levels; ++i) {
120  std::stringstream ss;
121  std::string qn;
122 
123  ss << "priority_" << i;
124  qn = name + ss.str();
125 
126  boost::shared_ptr<PersistedQueue<_qoT, _sT> > persistedQ;
127  persistedQ.reset(new PersistedQueue<_qoT, _sT>(qn, workingDir, maxPageSize));
128  persistedQList.push_back(persistedQ);
129  }
130  }
131 
132  void destroyQueues()
133  {
134  boost::lock_guard<boost::mutex> guard(objectMutex);
135  for(unsigned i = 0; i < levels; ++i) {
136  persistedQList[i].reset();
137  }
138  }
139 
140  std::string name;
141  std::string workingDir;
142  unsigned levels;
143  unsigned maxPageSize;
144  int lastPopLevel;
145  boost::shared_ptr<_qoT> lastPopedObject;
146  std::vector<boost::shared_ptr<PersistedQueue<_qoT, _sT> > > persistedQList;
147  boost::mutex objectMutex;
148 };
149 
150 }
151 
152 #endif // _THREADSAFE_PERSISTED_PRIORITY_QUEUE_HPP_
153 
void clear()
Definition: threadsafe_persisted_priority_queue.hpp:107
Definition: persisted_queue.hpp:114
void clear(unsigned level)
Definition: threadsafe_persisted_priority_queue.hpp:102
Definition: threadsafe_persisted_priority_queue.hpp:34
ThreadsafePersistedPriorityQueue(const std::string &queueName, const std::string &queueWorkingDir, const unsigned priorityLevels, const unsigned maxItemsPerPage)
Definition: threadsafe_persisted_priority_queue.hpp:37
~ThreadsafePersistedPriorityQueue()
Definition: threadsafe_persisted_priority_queue.hpp:51
bool empty()
Definition: threadsafe_persisted_priority_queue.hpp:92
void push_back_last_pop()
Definition: threadsafe_persisted_priority_queue.hpp:82
boost::shared_ptr< _qoT > last_pop_object()
Definition: threadsafe_persisted_priority_queue.hpp:77
void push(boost::shared_ptr< _qoT > p, unsigned priority)
Definition: threadsafe_persisted_priority_queue.hpp:56
boost::shared_ptr< _qoT > pop()
Definition: threadsafe_persisted_priority_queue.hpp:65