1 #ifndef __waitQueues_h_
2 #define __waitQueues_h_
24 #include <condition_variable>
33 std::deque<std::unique_ptr<T>>
queue;
35 std::condition_variable
cv;
39 if (!
queue.empty() &&
queue.back() ==
nullptr) {
40 throw std::logic_error(
"after adding a nullptr no further stuff may be added to a wait queue");
42 queue.emplace_back(std::move(item));
45 template <
class ... Types>
void emplace(Types ... args) {
46 std::unique_ptr<T> item(
new T(args...));
50 std::unique_ptr<T> end(
nullptr);
55 while (!
queue.empty() &&
queue.back() ==
nullptr) {
59 template <
class ... Types> std::unique_ptr<T>
dequeue(
bool mayCreateNew, Types ... args) {
61 while (
queue.empty()) {
63 return std::unique_ptr<T>(
new T(args...));
67 auto item = std::move(
queue.front());
73 template <
typename durationType> std::unique_ptr<T>
dequeue(durationType timeout,
bool &timedOut) {
75 while (
queue.empty()) {
76 if (
cv.wait_for(lock, timeout) == std::cv_status::timeout) {
81 std::unique_ptr<T> item(std::move(
queue.front()));
90 while (
queue.empty()) {
93 auto item = std::move(
queue.front());
103 return queue.empty();
107 for (
auto it = begin; it != end; ++it) {
114 return std::thread(&simple::signalDone<iterType>,
this, begin, end);
120 bool operator () (
const T*
const& lhs,
const T*
const& rhs)
const {
121 if (lhs ==
nullptr) {
124 if (rhs ==
nullptr) {
127 return ascending ? (*lhs < *rhs) : (*rhs < *lhs);
131 template <
typename T,
132 bool ascending =
true,
133 typename Container = std::multiset<T*, itemComparator<T, ascending>>>
class ordered {
137 std::condition_variable
cv;
141 if (!
queue.empty() && *(
queue.crbegin()) ==
nullptr) {
142 throw std::logic_error(
"after adding a nullptr no further stuff may be added to a wait queue");
144 queue.emplace(item.get());
148 template <
class ... Types>
void emplace(Types ... args) {
149 std::unique_ptr<T> item(
new T(args...));
153 std::unique_ptr<T> end(
nullptr);
158 while (!
queue.empty() && *(
queue.crbegin()) ==
nullptr) {
162 template <
typename durationType> std::unique_ptr<T>
dequeue(durationType timeout,
bool &timedOut) {
164 while (
queue.empty()) {
165 if (
cv.wait_for(lock, timeout) == std::cv_status::timeout) {
170 auto it =
queue.begin();
172 if (item !=
nullptr) {
175 return std::unique_ptr<T>(item);
180 while (
queue.empty()) {
183 auto it =
queue.begin();
185 if (item !=
nullptr) {
188 return std::unique_ptr<T>(item);
192 for (
auto it = begin; it != end; ++it) {
199 return std::thread(&ordered::signalDone<iterType>,
this, begin, end);
205 return queue.empty();
210 template <
typename T,
typename clock_type = std::chrono::steady_clock>
class timed {
212 std::multimap<typename clock_type::time_point, std::unique_ptr<T>>
queue;
214 std::condition_variable
cv;
216 void enqueue(std::unique_ptr<T>& item,
typename clock_type::time_point when) {
218 if (!
queue.empty() &&
queue.crbegin()->second ==
nullptr) {
219 throw std::logic_error(
"after adding a nullptr no further stuff may be added to a wait queue");
221 if (item ==
nullptr) {
222 when = clock_type::time_point::max();
224 queue.emplace(when, std::move(item));
227 template <
typename delayType>
void enqueue(std::unique_ptr<T>& item, delayType delay) {
228 enqueue(item, clock_type::now() + delay);
231 std::unique_ptr<T> end(
nullptr);
232 enqueue(end, clock_type::time_point::max());
236 while (!
queue.empty() && *(
queue.crbegin()) ==
nullptr) {
243 while (
queue.empty()) {
246 auto it =
queue.begin();
250 if (it->first < clock_type::now()) {
251 std::unique_ptr<T>item(std::move(it->second));
255 auto status =
cv.wait_until(lock, it->first);
256 if (status == std::cv_status::timeout) {
257 auto item = std::move(it->second);
267 return queue.empty();