cppthread 1.1.16
C++ Thread Library
fifo.h
Go to the documentation of this file.
1// Copyright (c) 2013-2025 Made to Order Software Corp. All Rights Reserved
2//
3// https://snapwebsites.org/project/cppthread
4// contact@m2osw.com
5//
6// This program is free software; you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation; either version 2 of the License, or
9// (at your option) any later version.
10//
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15//
16// You should have received a copy of the GNU General Public License along
17// with this program; if not, write to the Free Software Foundation, Inc.,
18// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19#pragma once
20
32// self
33//
34#include <cppthread/guard.h>
35#include <cppthread/mutex.h>
36
37
38// snapdev
39//
40#include <snapdev/not_used.h>
41#include <snapdev/is_smart_pointer.h>
42
43
44// C++
45//
46#include <numeric>
47#include <deque>
48
49
50
51namespace cppthread
52{
53
54
55
56template<class T>
57class fifo
58 : public mutex
59{
60private:
61 typedef std::deque<T> items_t;
62
63 // void_t is C++17 so to compile on more systems, we have our own definition
64 //
65 template<typename ...> using void_t = void;
66
67
68 // the following templates are used to know whether class T has a
69 // valid_workload() function returning a bool and if so, we'll use
70 // it to know whether an item is ready to be popped.
71 //
72 template<typename, typename, typename = void_t<>>
74 : public std::false_type
75 {
76 };
77
78 template<typename C, typename R, typename... A>
79 struct item_has_predicate<C, R(A...),
80 void_t<decltype(std::declval<C>().valid_workload(std::declval<A>()...))>>
81 : public std::is_same<decltype(std::declval<C>().valid_workload(std::declval<A>()...)), R>
82 {
83 };
84
97 template<typename C>
98 typename std::enable_if<!snapdev::is_shared_ptr<C>::value
99 && item_has_predicate<C, bool()>::value
100 , bool>::type
101 validate_item(C const & item)
102 {
103 return item.valid_workload();
104 }
105
116 template<typename C>
117 typename std::enable_if<!snapdev::is_shared_ptr<C>::value
118 && !item_has_predicate<C, bool()>::value
119 , bool>::type
120 validate_item(C const & item)
121 {
122 snapdev::NOT_USED(item);
123 return true;
124 }
125
138 template<typename C>
139 typename std::enable_if<snapdev::is_shared_ptr<C>::value
140 && item_has_predicate<typename C::element_type, bool()>::value
141 , bool>::type
142 validate_item(C const & item)
143 {
144 return item->valid_workload();
145 }
146
157 template<typename C>
158 typename std::enable_if<snapdev::is_shared_ptr<C>::value
159 && !item_has_predicate<typename C::element_type, bool()>::value
160 , bool>::type
161 validate_item(C const & item)
162 {
163 snapdev::NOT_USED(item);
164 return true;
165 }
166
167public:
168 typedef T value_type;
170 typedef std::shared_ptr<fifo_type> pointer_t;
171
172 bool push_back(T const & v)
173 {
174 guard lock(*this);
175 if(f_done)
176 {
177 return false;
178 }
179 f_queue.push_back(v);
180 signal();
181 return true;
182 }
183
184 bool pop_front(T & v, int64_t const usecs)
185 {
186 guard lock(*this);
187
188 auto cleanup = [&]()
189 {
190 if(f_done && !f_broadcast && f_queue.empty())
191 {
192 // make sure all the threads wake up on this new
193 // "queue is empty" status
194 //
195 broadcast();
196 f_broadcast = true;
197 }
198 };
199
200 for(;;)
201 {
202 // search for an item we can pop now
203 //
204 for(auto it(f_queue.begin()); it != f_queue.end(); ++it)
205 {
206 bool const result(validate_item<T>(*it));
207 if(result)
208 {
209 v = *it;
210 f_queue.erase(it);
211 cleanup();
212 return true;
213 }
214 }
215
216 if(f_done)
217 {
218 break;
219 }
220
221 // when no items can be returned, wait a bit if possible
222 // and try again
223 //
224 if(usecs == -1)
225 {
226 // wait until signal() wakes us up
227 //
228 wait();
229 }
230 else if(usecs > 0)
231 {
232 if(!timed_wait(usecs))
233 {
234 break;
235 }
236 }
237 else // if(usecs == 0)
238 {
239 // do not wait
240 //
241 break;
242 }
243 }
244 cleanup();
245 return false;
246 }
247
248 void clear()
249 {
250 guard lock(*this);
252 f_queue.swap(empty);
253 }
254
255 bool empty() const
256 {
257 guard lock(const_cast<fifo &>(*this));
258 return f_queue.empty();
259 }
260
261 size_t size() const
262 {
263 guard lock(const_cast<fifo &>(*this));
264 return f_queue.size();
265 }
266
267 size_t byte_size() const
268 {
269 guard lock(const_cast<fifo &>(*this));
270 return std::accumulate(
271 f_queue.begin(),
272 f_queue.end(),
273 0,
274 [](size_t accum, T const & obj)
275 {
276 return accum + obj.size();
277 });
278 }
279
280 void done(bool clear)
281 {
282 guard lock(*this);
283 f_done = true;
284 if(clear)
285 {
287 f_queue.swap(empty);
288 }
289 if(f_queue.empty())
290 {
291 broadcast();
292 f_broadcast = true;
293 }
294 }
295
296 bool is_done() const
297 {
298 guard lock(const_cast<fifo &>(*this));
299 return f_done;
300 }
301
302private:
304 bool f_done = false;
305 bool f_broadcast = false;
306};
307
308
309
310} // namespace cppthread
311// vim: ts=4 sw=4 et
Create a thread safe FIFO.
Definition fifo.h:59
std::enable_if< snapdev::is_shared_ptr< C >::value &&!item_has_predicate< typenameC::element_type, bool()>::value, bool >::type validate_item(C const &item)
Validate item.
Definition fifo.h:161
bool f_broadcast
Whether the done() function called broadcast().
Definition fifo.h:305
size_t size() const
Return the number of items in the FIFO.
Definition fifo.h:261
T value_type
The type of value to push and pop from the FIFO.
Definition fifo.h:168
bool empty() const
Test whether the FIFO is empty.
Definition fifo.h:255
bool f_done
Whether the FIFO is done.
Definition fifo.h:304
std::deque< T > items_t
The container type of our items.
Definition fifo.h:61
void done(bool clear)
Mark the FIFO as done.
Definition fifo.h:280
bool push_back(T const &v)
Push data on this FIFO.
Definition fifo.h:172
bool is_done() const
Check whether the FIFO was marked as done.
Definition fifo.h:296
items_t f_queue
The actual FIFO.
Definition fifo.h:303
void clear()
Clear the current FIFO.
Definition fifo.h:248
std::shared_ptr< fifo_type > pointer_t
A smart pointer to the FIFO.
Definition fifo.h:170
std::enable_if< snapdev::is_shared_ptr< C >::value &&item_has_predicate< typenameC::element_type, bool()>::value, bool >::type validate_item(C const &item)
Validate item.
Definition fifo.h:142
bool pop_front(T &v, int64_t const usecs)
Retrieve one value from the FIFO.
Definition fifo.h:184
std::enable_if<!snapdev::is_shared_ptr< C >::value &&item_has_predicate< C, bool()>::value, bool >::type validate_item(C const &item)
Validate item.
Definition fifo.h:101
fifo< value_type > fifo_type
The type of the FIFO as a typedef.
Definition fifo.h:169
size_t byte_size() const
Return the total size of the FIFO uses in memory.
Definition fifo.h:267
std::enable_if<!snapdev::is_shared_ptr< C >::value &&!item_has_predicate< C, bool()>::value, bool >::type validate_item(C const &item)
Validate item.
Definition fifo.h:120
Lock a mutex in an RAII manner.
Definition guard.h:42
A mutex object to ensures atomicity.
Definition mutex.h:55
void lock()
Lock a mutex.
Definition mutex.cpp:369
bool timed_wait(std::uint64_t const usec)
Wait on a mutex condition with a time limit.
Definition mutex.cpp:549
void wait()
Wait on a mutex condition.
Definition mutex.cpp:494
void signal()
Signal at least one mutex.
Definition mutex.cpp:737
void broadcast()
Broadcast a mutex signal.
Definition mutex.cpp:838
Thread Runner and Managers.
Thread Runner and Managers.

This document is part of the Snap! Websites Project.

Copyright by Made to Order Software Corp.