MLIB
Loading...
Searching...
No Matches
syncque.h
Go to the documentation of this file.
1
6#pragma once
7
8#if __has_include("defs.h")
9#include "defs.h"
10#endif
11
12#include <queue>
13#include "semaphore.h"
14#include "critsect.h"
15#include "stopwatch.h"
16
17namespace mlib {
31template <class M>
33{
34public:
37 : message (nullptr)
38 {}
39
41 void produce (const M& obj)
42 {
43 update.enter ();
44 while (message)
45 {
46 update.leave ();
47 prod_sema.wait ();
48 update.enter ();
49 }
50 cons_sema.signal ();
51 message = &obj;
52 update.leave ();
53 }
54
55 void consume (M& obj)
56 {
57 update.enter ();
58 while (!message)
59 {
60 update.leave ();
61 cons_sema.wait ();
62 update.enter ();
63 }
64 prod_sema.signal ();
65 obj = *message;
66 message = nullptr;
67 update.leave ();
68 }
69
70private:
71 criticalsection update;
72 semaphore prod_sema;
73 semaphore cons_sema;
74 const M* message;
75};
76
86template <class M, class C = std::deque<M>>
87class async_queue : protected std::queue<M, C>
88{
89public:
91 async_queue (size_t limit_ = INFINITE)
92 : limit (limit_)
93 {
94 if (limit > 0 && limit < INFINITE)
95 prod_sema.signal ((int)limit);
96 }
97
100 virtual bool produce (const M& obj, DWORD timeout = INFINITE)
101 {
102 if (limit < INFINITE)
103 {
104 // Bounded queue. See if there is enough space to produce
105 stopwatch t; // need a stopwatch to count cumulated wait time
106 if (timeout != INFINITE)
107 t.start ();
108 update.enter ();
109 while (std::queue<M, C>::size () >= limit)
110 {
111 update.leave ();
112 if (timeout != INFINITE)
113 {
114 int t_wait = timeout - (int)t.msecLap ();
115 if (t_wait < 0 || prod_sema.wait (t_wait) == WAIT_TIMEOUT)
116 return false;
117 }
118 else
119 prod_sema.wait ();
120 update.enter ();
121 }
122 std::queue<M, C>::push (obj);
123 cons_sema.signal ();
124 update.leave ();
125 }
126 else
127 {
128 // Unbounded queue. Producer will always be successful
129 lock l (update); // take control of the queue
130 std::queue<M, C>::push (obj); // put a copy of the object at the end
131 cons_sema.signal (); // and signal the semaphore
132 }
133 return true;
134 }
135
138 virtual bool consume (M& result, int timeout = INFINITE)
139 {
140 if (timeout != INFINITE)
141 {
142 stopwatch t;
143 int t_wait;
144 t.start ();
145 update.enter ();
146 while (std::queue<M, C>::empty ())
147 {
148 update.leave ();
149 t_wait = timeout - (int)t.msecLap ();
150 // give up or wait for a producer
151 if (t_wait <= 0 || cons_sema.wait (t_wait) == WAIT_TIMEOUT)
152 return false;
153 update.enter ();
154 }
155 }
156 else
157 {
158 update.enter ();
159 if (std::queue<M, C>::empty ())
160 {
161 while (1)
162 {
163 update.leave ();
164 cons_sema.wait (); // wait for a producer
165 update.enter ();
166 if (!std::queue<M, C>::empty ())
167 break;
168 }
169 }
170 }
171 result = std::queue<M, C>::front (); // get the message
172 std::queue<M, C>::pop ();
173 if (limit != INFINITE)
174 prod_sema.signal (); // signal producers there is space available
175
176 update.leave ();
177 return true;
178 }
179
181 bool empty ()
182 {
183 lock l (update);
184 return std::queue<M, C>::empty ();
185 }
186
188 bool full ()
189 {
190 lock l (this->update);
191 return (std::queue<M, C>::size () == limit);
192 }
193
195 size_t size ()
196 {
197 lock l (update);
198 return std::queue<M, C>::size ();
199 }
200
201protected:
202 size_t limit;
206};
207
208} // namespace mlib
A template class that implements "asynchronous queues".
Definition syncque.h:88
bool empty()
Return true if queue is empty.
Definition syncque.h:181
virtual bool consume(M &result, int timeout=INFINITE)
Definition syncque.h:138
bool full()
Return true if queue is at capacity.
Definition syncque.h:188
semaphore prod_sema
producers' semaphore counts down until queue is full
Definition syncque.h:203
semaphore cons_sema
consumers' semaphore counts down until queue is empty
Definition syncque.h:204
size_t size()
Return queue size.
Definition syncque.h:195
criticalsection update
critical section protects queue's integrity
Definition syncque.h:205
virtual bool produce(const M &obj, DWORD timeout=INFINITE)
Definition syncque.h:100
async_queue(size_t limit_=INFINITE)
Crates a queue with the given maximum size.
Definition syncque.h:91
Lightweight inter-thread synchronization.
Definition critsect.h:25
virtual void enter()
Enter critical section.
Definition critsect.h:97
virtual void leave()
Leave critical section.
Definition critsect.h:109
Automatic wrapper for critical sections.
Definition critsect.h:69
Wrapper for Windows semaphore objects.
Definition semaphore.h:16
int signal(int count=1)
Signal a semaphore object.
Definition semaphore.cpp:45
Simple stopwatch timer (yet another one!)
Definition stopwatch.h:20
double msecLap()
Return number of milliseconds elapsed from start.
Definition stopwatch.cpp:42
void start()
Start the stopwatch.
Definition stopwatch.cpp:25
A template class that implements a "synchronous queue", in effect a mailbox that can store one "messa...
Definition syncque.h:33
sync_queue()
Create a synchronous queue.
Definition syncque.h:36
void produce(const M &obj)
Put new element in queue.
Definition syncque.h:41
virtual void wait()
Wait for object to become signaled.
Definition syncbase.h:86
criticalsection and lock classes
semaphore class definition
Definition of stopwatch class.