Line data Source code
1 : // Copyright (c) 2013-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/cppthread
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : #pragma once
20 :
21 : /** \file
22 : * \brief Thread Runner and Managers.
23 : *
24 : * This file includes the declaration and implementation (For templates)
25 : * of classes used to manage threads the easy way. Especially, our
26 : * implementation is aware of object destructors so a thread manager
27 : * (snap_thread) can be destroyed. It will automatically and properly
28 : * wait for its runner (the actual system pthread) to exit before
29 : * finishing up its and its runner clean up.
30 : */
31 :
32 : // self
33 : //
34 : #include "cppthread/guard.h"
35 : #include "cppthread/mutex.h"
36 :
37 :
38 : // snapdev lib
39 : //
40 : #include <snapdev/not_used.h>
41 :
42 :
43 : // C++ lib
44 : //
45 : #include <numeric>
46 : #include <deque>
47 :
48 :
49 :
50 : namespace cppthread
51 : {
52 :
53 :
54 :
55 : template<class T>
56 6 : class fifo
57 : : public mutex
58 : {
59 : private:
60 : typedef std::deque<T> items_t;
61 :
62 : // void_t is C++17 so to compile on more system, we have our own definition
63 : //
64 : template<typename ...> using void_t = void;
65 :
66 :
67 : // the following templates are used to know whether class T has a
68 : // valid_workload() function returning a bool and if so, we'll use
69 : // it to know whether an item is ready to be popped.
70 : //
71 : template<typename, typename, typename = void_t<>>
72 : struct item_has_predicate
73 : : public std::false_type
74 : {
75 : };
76 :
77 : template<typename C, typename R, typename... A>
78 : struct item_has_predicate<C, R(A...),
79 : void_t<decltype(std::declval<C>().valid_workload(std::declval<A>()...))>>
80 : : public std::is_same<decltype(std::declval<C>().valid_workload(std::declval<A>()...)), R>
81 : {
82 : };
83 :
84 : template<typename C>
85 : struct is_shared_ptr
86 : : std::false_type
87 : {
88 : };
89 :
90 : template<typename C>
91 : struct is_shared_ptr<std::shared_ptr<C>>
92 : : std::true_type
93 : {
94 : };
95 :
96 : /** \brief Validate item.
97 : *
98 : * This function checks whether the T::valid_workload() function
99 : * says the item can be processed now or not.
100 : *
101 : * In this case, the class C is not a shared pointer.
102 : *
103 : * \tparam C The type of the item.
104 : * \param[in] item The item to verify.
105 : *
106 : * \return true if the valid_workload() returns true, false otherwise.
107 : */
108 : template<typename C>
109 : typename std::enable_if<!is_shared_ptr<C>::value
110 : && item_has_predicate<C, bool()>::value
111 : , bool>::type
112 : validate_item(C const & item)
113 : {
114 : return item.valid_workload();
115 : }
116 :
117 : /** \brief Validate item.
118 : *
119 : * This function always returns true. It is used when the item does
120 : * not have a valid_workload() function defined.
121 : *
122 : * \tparam C The type of the item.
123 : * \param[in] item The item to verify.
124 : *
125 : * \return Always true.
126 : */
127 : template<typename C>
128 : typename std::enable_if<!is_shared_ptr<C>::value
129 : && !item_has_predicate<C, bool()>::value
130 : , bool>::type
131 : validate_item(C const & item)
132 : {
133 : snap::NOT_USED(item);
134 : return true;
135 : }
136 :
137 : /** \brief Validate item.
138 : *
139 : * This function checks whether the T::valid_workload() function
140 : * says the item can be processed now or not.
141 : *
142 : * In this case, the class C is a shared pointer to an item T.
143 : *
144 : * \tparam C The type of the item.
145 : * \param[in] item The item to verify.
146 : *
147 : * \return Always true.
148 : */
149 : template<typename C>
150 : typename std::enable_if<is_shared_ptr<C>::value
151 : && item_has_predicate<typename C::element_type, bool()>::value
152 : , bool>::type
153 65 : validate_item(C const & item)
154 : {
155 65 : return item->valid_workload();
156 : }
157 :
158 : /** \brief Validate item.
159 : *
160 : * This function always returns true. It is used when the item is a
161 : * shared pointer and does not have a valid_workload() function defined.
162 : *
163 : * \tparam C The type of the item.
164 : * \param[in] item The item to verify.
165 : *
166 : * \return Always true.
167 : */
168 : template<typename C>
169 : typename std::enable_if<is_shared_ptr<C>::value
170 : && !item_has_predicate<typename C::element_type, bool()>::value
171 : , bool>::type
172 : validate_item(C const & item)
173 : {
174 : snap::NOT_USED(item);
175 : return true;
176 : }
177 :
178 : public:
179 : typedef T value_type;
180 : typedef fifo<value_type> fifo_type;
181 : typedef std::shared_ptr<fifo_type> pointer_t;
182 :
183 32 : bool push_back(T const & v)
184 : {
185 64 : guard lock(*this);
186 32 : if(f_done)
187 : {
188 0 : return false;
189 : }
190 32 : f_queue.push_back(v);
191 32 : signal();
192 32 : return true;
193 : }
194 :
195 33 : bool pop_front(T & v, int64_t const usecs)
196 : {
197 66 : guard lock(*this);
198 :
199 66 : auto cleanup = [&]()
200 : {
201 33 : if(f_done && !f_broadcast && f_queue.empty())
202 : {
203 : // make sure all the threads wake up on this new
204 : // "queue is empty" status
205 : //
206 0 : broadcast();
207 0 : f_broadcast = true;
208 : }
209 33 : };
210 :
211 0 : for(;;)
212 : {
213 : // search for an item we can pop now
214 : //
215 66 : for(auto it(f_queue.begin()); it != f_queue.end(); ++it)
216 : {
217 65 : bool const result(validate_item<T>(*it));
218 65 : if(result)
219 : {
220 32 : v = *it;
221 32 : f_queue.erase(it);
222 32 : cleanup();
223 32 : return true;
224 : }
225 : }
226 :
227 1 : if(f_done)
228 : {
229 0 : break;
230 : }
231 :
232 : // when no items can be returned, wait a bit if possible
233 : // and try again
234 : //
235 1 : if(usecs == -1)
236 : {
237 : // wait until signal() wakes us up
238 : //
239 0 : wait();
240 : }
241 1 : else if(usecs > 0)
242 : {
243 0 : timed_wait(usecs);
244 : }
245 : else // if(usecs == 0)
246 : {
247 : // do not wait
248 : //
249 1 : break;
250 : }
251 : }
252 1 : cleanup();
253 1 : return false;
254 : }
255 :
256 : void clear()
257 : {
258 : guard lock(*this);
259 : items_t empty;
260 : f_queue.swap(empty);
261 : }
262 :
263 : bool empty() const
264 : {
265 : guard lock(const_cast<fifo &>(*this));
266 : return f_queue.empty();
267 : }
268 :
269 : size_t size() const
270 : {
271 : guard lock(const_cast<fifo &>(*this));
272 : return f_queue.size();
273 : }
274 :
275 : size_t byte_size() const
276 : {
277 : guard lock(const_cast<fifo &>(*this));
278 : return std::accumulate(
279 : f_queue.begin(),
280 : f_queue.end(),
281 : 0,
282 : [](size_t accum, T const & obj)
283 : {
284 : return accum + obj.size();
285 : });
286 : }
287 :
288 : void done(bool clear)
289 : {
290 : guard lock(*this);
291 : f_done = true;
292 : if(clear)
293 : {
294 : items_t empty;
295 : f_queue.swap(empty);
296 : }
297 : if(f_queue.empty())
298 : {
299 : broadcast();
300 : f_broadcast = true;
301 : }
302 : }
303 :
304 : bool is_done() const
305 : {
306 : guard lock(const_cast<fifo &>(*this));
307 : return f_done;
308 : }
309 :
310 : private:
311 : items_t f_queue = items_t();
312 : bool f_done = false;
313 : bool f_broadcast = false;
314 : };
315 :
316 :
317 :
318 : } // namespace cppthread
319 : // vim: ts=4 sw=4 et
|