MLIB
Loading...
Searching...
No Matches
syncque.h
Go to the documentation of this file.
1/*
2 Copyright (c) Mircea Neacsu (2014-2025) Licensed under MIT License.
3 This file is part of MLIB project. See LICENSE file for full license terms.
4*/
5
7
8#pragma once
9
10#if __has_include("defs.h")
11#include "defs.h"
12#endif
13
14#include <queue>
15#include "semaphore.h"
16#include "critsect.h"
17#include "stopwatch.h"
18
19namespace mlib {
34template <class M>
36{
37public:
40 : message (nullptr)
41 {}
42
44 void produce (const M& obj)
45 {
46 update.enter ();
47 while (message)
48 {
49 update.leave ();
50 prod_sema.wait ();
51 update.enter ();
52 }
53 cons_sema.signal ();
54 message = &obj;
55 update.leave ();
56 }
57
58 void consume (M& obj)
59 {
60 update.enter ();
61 while (!message)
62 {
63 update.leave ();
64 cons_sema.wait ();
65 update.enter ();
66 }
67 prod_sema.signal ();
68 obj = *message;
69 message = nullptr;
70 update.leave ();
71 }
72
73private:
74 criticalsection update;
75 semaphore prod_sema;
76 semaphore cons_sema;
77 const M* message;
78};
79
89template <class M, class C = std::deque<M>>
90class async_queue : protected std::queue<M, C>
91{
92public:
94 async_queue (size_t limit_ = INFINITE)
95 : limit (limit_)
96 {
97 if (limit > 0 && limit < INFINITE)
98 prod_sema.signal ((int)limit);
99 }
100
103 virtual bool produce (const M& obj, DWORD timeout = INFINITE)
104 {
105 if (limit < INFINITE)
106 {
107 // Bounded queue. See if there is enough space to produce
108 stopwatch t; // need a stopwatch to count cumulated wait time
109 if (timeout != INFINITE)
110 t.start ();
111 update.enter ();
112 while (std::queue<M, C>::size () >= limit)
113 {
114 update.leave ();
115 if (timeout != INFINITE)
116 {
117 int t_wait = timeout - (int)t.msecLap ();
118 if (t_wait < 0 || prod_sema.wait (t_wait) == WAIT_TIMEOUT)
119 return false;
120 }
121 else
122 prod_sema.wait ();
123 update.enter ();
124 }
125 std::queue<M, C>::push (obj);
126 cons_sema.signal ();
127 update.leave ();
128 }
129 else
130 {
131 // Unbounded queue. Producer will always be successful
132 lock l (update); // take control of the queue
133 std::queue<M, C>::push (obj); // put a copy of the object at the end
134 cons_sema.signal (); // and signal the semaphore
135 }
136 return true;
137 }
138
141 virtual bool consume (M& result, int timeout = INFINITE)
142 {
143 if (timeout != INFINITE)
144 {
145 stopwatch t;
146 int t_wait;
147 t.start ();
148 update.enter ();
149 while (std::queue<M, C>::empty ())
150 {
151 update.leave ();
152 t_wait = timeout - (int)t.msecLap ();
153 // give up or wait for a producer
154 if (t_wait <= 0 || cons_sema.wait (t_wait) == WAIT_TIMEOUT)
155 return false;
156 update.enter ();
157 }
158 }
159 else
160 {
161 update.enter ();
162 if (std::queue<M, C>::empty ())
163 {
164 while (1)
165 {
166 update.leave ();
167 cons_sema.wait (); // wait for a producer
168 update.enter ();
169 if (!std::queue<M, C>::empty ())
170 break;
171 }
172 }
173 }
174 result = std::queue<M, C>::front (); // get the message
175 std::queue<M, C>::pop ();
176 if (limit != INFINITE)
177 prod_sema.signal (); // signal producers there is space available
178
179 update.leave ();
180 return true;
181 }
182
184 bool empty ()
185 {
186 lock l (update);
187 return std::queue<M, C>::empty ();
188 }
189
191 bool full ()
192 {
193 lock l (this->update);
194 return (std::queue<M, C>::size () == limit);
195 }
196
198 size_t size ()
199 {
200 lock l (update);
201 return std::queue<M, C>::size ();
202 }
203
204protected:
205 size_t limit;
209};
210
211} // namespace mlib
bool empty()
Return true if queue is empty.
Definition syncque.h:184
virtual bool consume(M &result, int timeout=INFINITE)
Definition syncque.h:141
bool full()
Return true if queue is at capacity.
Definition syncque.h:191
semaphore prod_sema
producers' semaphore counts down until queue is full
Definition syncque.h:206
semaphore cons_sema
consumers' semaphore counts down until queue is empty
Definition syncque.h:207
size_t size()
Return queue size.
Definition syncque.h:198
criticalsection update
critical section protects queue's integrity
Definition syncque.h:208
size_t limit
maximum queue size
Definition syncque.h:205
virtual bool produce(const M &obj, DWORD timeout=INFINITE)
Definition syncque.h:103
async_queue(size_t limit_=INFINITE)
Crates a queue with the given maximum size.
Definition syncque.h:94
Lightweight inter-thread synchronization.
Definition critsect.h:27
virtual void enter()
Enter critical section.
Definition critsect.h:99
virtual void leave()
Leave critical section.
Definition critsect.h:111
Automatic wrapper for critical sections.
Definition critsect.h:71
Wrapper for Windows semaphore objects.
Definition semaphore.h:16
int signal(int count=1)
Signal a semaphore object.
Definition semaphore.cpp:40
Simple stopwatch timer (yet another one!)
Definition stopwatch.h:22
double msecLap()
Return number of milliseconds elapsed from start.
Definition stopwatch.cpp:41
void start()
Start the stopwatch.
Definition stopwatch.cpp:24
sync_queue()
Create a synchronous queue.
Definition syncque.h:39
void produce(const M &obj)
Put new element in queue.
Definition syncque.h:44
virtual void wait()
Wait for object to become signaled.
Definition syncbase.h:85
Definitions of mlib::criticalsection and mlib::lock classes.
Definition of mlib::semaphore class.
Definition of mlib::stopwatch class.