ewmscp  ..
waitQueues.h
Go to the documentation of this file.
1 #ifndef __waitQueues_h_
2 #define __waitQueues_h_
3 /*
4  template functions to handle wait queues
5  Copyright (C) 2018 Juergen Hannappel
6 
7  This program is free software: you can redistribute it and/or modify
8  it under the terms of the GNU General Public License as published by
9  the Free Software Foundation, either version 3 of the License, or
10  (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20 #include <deque>
21 #include <map>
22 #include <set>
23 #include <mutex>
24 #include <condition_variable>
25 #include <stdexcept>
26 #include <thread>
27 #include <memory>
28 #include <iostream>
29 
30 namespace waitQueues {
31  template <typename T> class simple {
32  private:
33  std::deque<std::unique_ptr<T>> queue;
34  std::mutex queue_mutex;
35  std::condition_variable cv;
36  public:
37  void enqueue(std::unique_ptr<T>& item) {
38  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
39  if (!queue.empty() && queue.back() == nullptr) {
40  throw std::logic_error("after adding a nullptr no further stuff may be added to a wait queue");
41  }
42  queue.emplace_back(std::move(item));
43  cv.notify_all();
44  }
45  template <class ... Types> void emplace(Types ... args) {
46  std::unique_ptr<T> item(new T(args...));
47  enqueue(item);
48  }
49  void signalDone() {
50  std::unique_ptr<T> end(nullptr);
51  enqueue(end);
52  }
53  void resetDone() {
54  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
55  while (!queue.empty() && queue.back() == nullptr) {
56  queue.pop_back();
57  }
58  }
59  template <class ... Types> std::unique_ptr<T> dequeue(bool mayCreateNew, Types ... args) {
60  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
61  while (queue.empty()) {
62  if (mayCreateNew) {
63  return std::unique_ptr<T>(new T(args...));
64  }
65  cv.wait(lock);
66  }
67  auto item = std::move(queue.front());
68  if (item) { // nullptrs are end markers, multiple readers need many of them...
69  queue.pop_front();
70  }
71  return item;
72  }
73  template <typename durationType> std::unique_ptr<T> dequeue(durationType timeout, bool &timedOut) {
74  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex);
75  while (queue.empty()) {
76  if (cv.wait_for(lock, timeout) == std::cv_status::timeout) {
77  timedOut = true;
78  return nullptr;
79  }
80  }
81  std::unique_ptr<T> item(std::move(queue.front()));
82  if (item) { // nullptrs are end markers, multiple readers need many of them...
83  queue.pop_front();
84  }
85  timedOut = false;
86  return item;
87  }
88  std::unique_ptr<T> dequeue() {
89  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex);
90  while (queue.empty()) {
91  cv.wait(lock);
92  }
93  auto item = std::move(queue.front());
94  if (item) { // nullptrs are end markers, multiple readers need many of them...
95  queue.pop_front();
96  }
97  return item;
98  }
99  decltype(queue.size()) size() const {
100  return queue.size();
101  }
102  decltype(queue.empty()) empty() const {
103  return queue.empty();
104  }
105  template <typename iterType> void signalDone(iterType begin,
106  iterType end) {
107  for (auto it = begin; it != end; ++it) {
108  it->join();
109  }
110  signalDone();
111  }
112  template <typename iterType> std::thread signalDoneThread(iterType begin,
113  iterType end) {
114  return std::thread(&simple::signalDone<iterType>, this, begin, end);
115  }
116  };
117 
118  template <typename T, bool ascending = true> class itemComparator {
119  public:
120  bool operator () (const T* const& lhs, const T* const& rhs) const {
121  if (lhs == nullptr) { // ensure nullptrs always go at the end of the container
122  return false;
123  }
124  if (rhs == nullptr) {
125  return true;
126  }
127  return ascending ? (*lhs < *rhs) : (*rhs < *lhs);
128  }
129  };
130 
131  template <typename T,
132  bool ascending = true,
133  typename Container = std::multiset<T*, itemComparator<T, ascending>>> class ordered {
134  private:
135  Container queue;
136  std::mutex queue_mutex;
137  std::condition_variable cv;
138  public:
139  void enqueue(std::unique_ptr<T>& item) {
140  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
141  if (!queue.empty() && *(queue.crbegin()) == nullptr) {
142  throw std::logic_error("after adding a nullptr no further stuff may be added to a wait queue");
143  }
144  queue.emplace(item.get());
145  item.release();
146  cv.notify_all();
147  }
148  template <class ... Types> void emplace(Types ... args) {
149  std::unique_ptr<T> item(new T(args...));
150  enqueue(item);
151  }
152  void signalDone() {
153  std::unique_ptr<T> end(nullptr);
154  enqueue(end);
155  }
156  void resetDone() {
157  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
158  while (!queue.empty() && *(queue.crbegin()) == nullptr) {
159  queue.erase(queue.rbegin());
160  }
161  }
162  template <typename durationType> std::unique_ptr<T> dequeue(durationType timeout, bool &timedOut) {
163  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex);
164  while (queue.empty()) {
165  if (cv.wait_for(lock, timeout) == std::cv_status::timeout) {
166  timedOut = true;
167  return nullptr;
168  }
169  }
170  auto it = queue.begin();
171  auto item = *it;
172  if (item != nullptr) { // nullptrs are end markers, multiple readers need many of them...
173  queue.erase(it);
174  }
175  return std::unique_ptr<T>(item);
176  }
177 
178  std::unique_ptr<T> dequeue() {
179  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
180  while (queue.empty()) {
181  cv.wait(lock);//(NOLINT clang-analyzer-alpha.core.CastToStruct)
182  }
183  auto it = queue.begin();
184  auto item = *it;
185  if (item != nullptr) { // nullptrs are end markers, multiple readers need many of them...
186  queue.erase(it);
187  }
188  return std::unique_ptr<T>(item);
189  }
190  template <typename iterType> void signalDone(iterType begin,
191  iterType end) {
192  for (auto it = begin; it != end; ++it) {
193  it->join();
194  }
195  enqueue(nullptr);
196  }
197  template <typename iterType> std::thread signalDoneThread(iterType begin,
198  iterType end) {
199  return std::thread(&ordered::signalDone<iterType>, this, begin, end);
200  }
201  decltype(queue.size()) size() const {
202  return queue.size();
203  }
204  decltype(queue.empty()) empty() const {
205  return queue.empty();
206  }
207  };
208 
209 
210  template <typename T, typename clock_type = std::chrono::steady_clock> class timed {
211  private:
212  std::multimap<typename clock_type::time_point, std::unique_ptr<T>> queue;
213  std::mutex queue_mutex;
214  std::condition_variable cv;
215  public:
216  void enqueue(std::unique_ptr<T>& item, typename clock_type::time_point when) {
217  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
218  if (!queue.empty() && queue.crbegin()->second == nullptr) {
219  throw std::logic_error("after adding a nullptr no further stuff may be added to a wait queue");
220  }
221  if (item == nullptr) {
222  when = clock_type::time_point::max();
223  }
224  queue.emplace(when, std::move(item));
225  cv.notify_all();
226  }
227  template <typename delayType> void enqueue(std::unique_ptr<T>& item, delayType delay) {
228  enqueue(item, clock_type::now() + delay);
229  }
230  void signalDone() {
231  std::unique_ptr<T> end(nullptr);
232  enqueue(end, clock_type::time_point::max());
233  }
234  void resetDone() {
235  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
236  while (!queue.empty() && *(queue.crbegin()) == nullptr) {
237  queue.erase(queue.rbegin());
238  }
239  }
240  std::unique_ptr<T> dequeue() {
241  std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); //(NOLINT clang-analyzer-alpha.core.CastToStruct)
242  while (true) {
243  while (queue.empty()) {
244  cv.wait(lock);
245  }
246  auto it = queue.begin();
247  if (! it->second) {
248  return nullptr;
249  }
250  if (it->first < clock_type::now()) {
251  std::unique_ptr<T>item(std::move(it->second));
252  queue.erase(it);
253  return item;
254  }
255  auto status = cv.wait_until(lock, it->first);
256  if (status == std::cv_status::timeout) {
257  auto item = std::move(it->second);
258  queue.erase(it);
259  return item;
260  }
261  }
262  }
263  decltype(queue.size()) size() const {
264  return queue.size();
265  }
266  decltype(queue.empty()) empty() const {
267  return queue.empty();
268  }
269 
270  };
271 
272 } // namespace waitQueues
273 
274 #endif
waitQueues::ordered::empty
decltype(queue.empty()) empty() const
Definition: waitQueues.h:204
waitQueues::ordered::size
decltype(queue.size()) size() const
Definition: waitQueues.h:201
waitQueues::timed::enqueue
void enqueue(std::unique_ptr< T > &item, delayType delay)
Definition: waitQueues.h:227
waitQueues::timed::size
decltype(queue.size()) size() const
Definition: waitQueues.h:263
waitQueues::ordered::queue_mutex
std::mutex queue_mutex
Definition: waitQueues.h:136
waitQueues::timed
Definition: waitQueues.h:210
waitQueues::simple::resetDone
void resetDone()
Definition: waitQueues.h:53
waitQueues::simple::size
decltype(queue.size()) size() const
Definition: waitQueues.h:99
waitQueues::ordered::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:139
waitQueues::itemComparator::operator()
bool operator()(const T *const &lhs, const T *const &rhs) const
Definition: waitQueues.h:120
waitQueues::ordered::dequeue
std::unique_ptr< T > dequeue()
Definition: waitQueues.h:178
waitQueues::simple::queue_mutex
std::mutex queue_mutex
Definition: waitQueues.h:34
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue()
Definition: waitQueues.h:88
waitQueues::simple::signalDoneThread
std::thread signalDoneThread(iterType begin, iterType end)
Definition: waitQueues.h:112
waitQueues::ordered::dequeue
std::unique_ptr< T > dequeue(durationType timeout, bool &timedOut)
Definition: waitQueues.h:162
waitQueues::simple::signalDone
void signalDone(iterType begin, iterType end)
Definition: waitQueues.h:105
waitQueues::ordered::cv
std::condition_variable cv
Definition: waitQueues.h:137
waitQueues::ordered::signalDoneThread
std::thread signalDoneThread(iterType begin, iterType end)
Definition: waitQueues.h:197
waitQueues::simple::empty
decltype(queue.empty()) empty() const
Definition: waitQueues.h:102
waitQueues::simple::emplace
void emplace(Types ... args)
Definition: waitQueues.h:45
waitQueues::timed::queue_mutex
std::mutex queue_mutex
Definition: waitQueues.h:213
waitQueues::ordered::signalDone
void signalDone(iterType begin, iterType end)
Definition: waitQueues.h:190
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
waitQueues::timed::signalDone
void signalDone()
Definition: waitQueues.h:230
waitQueues::simple::signalDone
void signalDone()
Definition: waitQueues.h:49
waitQueues::timed::enqueue
void enqueue(std::unique_ptr< T > &item, typename clock_type::time_point when)
Definition: waitQueues.h:216
waitQueues::itemComparator
Definition: waitQueues.h:118
waitQueues::simple
Definition: waitQueues.h:31
waitQueues::timed::dequeue
std::unique_ptr< T > dequeue()
Definition: waitQueues.h:240
waitQueues::simple::cv
std::condition_variable cv
Definition: waitQueues.h:35
waitQueues
Definition: waitQueues.h:30
waitQueues::timed::queue
std::multimap< typename clock_type::time_point, std::unique_ptr< T > > queue
Definition: waitQueues.h:212
waitQueues::timed::empty
decltype(queue.empty()) empty() const
Definition: waitQueues.h:266
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(durationType timeout, bool &timedOut)
Definition: waitQueues.h:73
waitQueues::ordered::signalDone
void signalDone()
Definition: waitQueues.h:152
waitQueues::ordered::emplace
void emplace(Types ... args)
Definition: waitQueues.h:148
waitQueues::ordered::resetDone
void resetDone()
Definition: waitQueues.h:156
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
Definition: waitQueues.h:59
waitQueues::simple::queue
std::deque< std::unique_ptr< T > > queue
Definition: waitQueues.h:33
waitQueues::timed::cv
std::condition_variable cv
Definition: waitQueues.h:214
waitQueues::timed::resetDone
void resetDone()
Definition: waitQueues.h:234
waitQueues::ordered::queue
Container queue
Definition: waitQueues.h:135
waitQueues::ordered
Definition: waitQueues.h:133