Line data Source code
1 : // Copyright (c) 2013-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/cppthread
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 :
20 : /** \file
21 : * \brief Implementation & documentation of the item_with_predicate.h file.
22 : *
23 : * The item_with_predicate.h file is a class which we implement and
24 : * document here.
25 : *
26 : * The class is used to add sets of items in a FIFO of a pool of workers,
27 : * items which depend on each other. If a dependency is still defined,
28 : * then the item is skipped (left in the FIFO) until the valid_workload()
29 : * returns true, which happens once all the dependencies were worked on.
30 : *
31 : * \note
32 : * We consider this base class as an example. Your own version of the
33 : * valid_workload() function could use all sorts of predicates. For
34 : * example, a workload may become valid only after a given date and time,
35 : * when a file appears, etc.
36 : */
37 :
38 :
39 : // self
40 : //
41 : #include "cppthread/item_with_predicate.h"
42 :
43 : #include "cppthread/exception.h"
44 : #include "cppthread/guard.h"
45 :
46 :
47 : // C++ lib
48 : //
49 : #include <iostream>
50 :
51 :
52 :
53 : namespace cppthread
54 : {
55 :
56 :
57 :
58 : /** \class item_with_predicate
59 : * \brief A runner augmentation allowing for worker threads.
60 : *
61 : * This class allows you to create a pool of worker threads. This is
62 : * useful to add/remove work in a fifo object and have any one
63 : * worker thread pick up the next load as soon as it becomes
64 : * available. This is pretty much the fastest way to get work done
65 : * using threads, however, it really only works if you can easily
66 : * break down the work by chunk.
67 : *
68 : * One pool of worker threads is expected to share one pair of fifo
69 : * objects. Also. the input and output fifo objects must be of the
70 : * same type.
71 : *
72 : * To use your pool of threads, all you have to do is add data to the
73 : * input fifo and grab results from the output fifo. Note that
74 : * the output fifo of one pool of threads can be the input fifo
75 : * of another pool of threads.
76 : */
77 :
78 :
79 : /** \brief Initialize the item with one dependency.
80 : *
81 : * This constructor initializes the item with one \p dependency.
82 : * If you do not yet have the list of dependencies for this item,
83 : * then you can instead use the add_dependency() and add_dependencies()
84 : * functions to add them later.
85 : *
86 : * As long as you have an item and its processing didn't start yet, you can
87 : * add additional dependencies.
88 : *
89 : * \param[in] dependency The dependency to add to this item on creation.
90 : */
91 20 : item_with_predicate::item_with_predicate(pointer_t dependency)
92 20 : : f_dependencies({ dependency })
93 : {
94 20 : }
95 :
96 :
97 : /** \brief Initialize the item with a list of dependencies.
98 : *
99 : * This constructor initializes the item with a list of dependencies.
100 : * If you do not yet have the list of dependencies, then you
101 : * can instead use the add_dependency() or add_dependencies() functions
102 : * to add dependencies later.
103 : *
104 : * As long as you have an item and its processing didn't start yet, you can
105 : * add additional dependencies.
106 : *
107 : * \note
108 : * Since C++11 we can call this function with a list of items as in:
109 : *
110 : * std::make_shared<item_with_predicate>({ a, b, c, d, ... });
111 : *
112 : * which makes this constructor particularly practical.
113 : *
114 : * \param[in] dependencies The dependencies to add to this item.
115 : */
116 0 : item_with_predicate::item_with_predicate(dependencies_t const & dependencies)
117 0 : : f_dependencies(dependencies)
118 : {
119 0 : }
120 :
121 :
122 : /** \brief The destructor of the item with predicate.
123 : *
124 : * This function is here because the class is virtual and thus a destructor
125 : * is always required.
126 : */
127 20 : item_with_predicate::~item_with_predicate()
128 : {
129 20 : }
130 :
131 :
132 : /** \brief Add an item as a predicate of this item.
133 : *
134 : * This function adds the specified item as a predicate of this item.
135 : * This means that predicate item needs to be processed before this
136 : * item gets processed.
137 : *
138 : * The predicate is just another item_with_predicate object. A weak
139 : * pointer is kept by this item_with_predicate object. When the thread
140 : * handling the predicate is done, the shared pointer will be released
141 : * meaning that the weak pointer that this item holds will be released
142 : * too. When all the dependencies added here are released, the
143 : * valid_workload() function returns true and this very workload item
144 : * gets processed.
145 : *
146 : * \exception cppthread_in_use_error
147 : * This exception is raised if this item was already sent to a thread for
148 : * processing since by then it's too late, you just can't hope to stop
149 : * the processing or restart it.
150 : *
151 : * \todo
152 : * See whether we could make the predicate any kind of objects with
153 : * a template?
154 : *
155 : * \param[in] item A predicate item.
156 : */
157 11 : void item_with_predicate::add_dependency(pointer_t item)
158 : {
159 22 : guard lock(f_mutex);
160 :
161 11 : if(f_processing)
162 : {
163 0 : throw cppthread_in_use_error("workload already being processed, you can't add more dependencies to it.");
164 : }
165 :
166 11 : f_dependencies.push_back(item);
167 11 : }
168 :
169 :
170 : /** \brief Add a set of dependencies at once.
171 : *
172 : * This function adds all the dependencies found in the \p dependencies
173 : * parameter to this item_with_predicate object. This is equivalent
174 : * to adding the dependencies one at a time to this item.
175 : *
176 : * \exception cppthread_in_use_error
177 : * This exception is raised if this item was already sent to a thread for
178 : * processing since by then it's too late, you just can't hope to stop
179 : * the processing or restart it.
180 : *
181 : * \param[in] dependencies The dependencies to add to this item.
182 : */
183 7 : void item_with_predicate::add_dependencies(dependencies_t const & dependencies)
184 : {
185 14 : guard lock(f_mutex);
186 :
187 7 : if(f_processing)
188 : {
189 0 : throw cppthread_in_use_error("workload already being processed, you can't add more dependencies to it.");
190 : }
191 :
192 7 : f_dependencies.insert(f_dependencies.begin(), dependencies.cbegin(), dependencies.cend());
193 7 : }
194 :
195 :
196 : /** \brief The valid_workload() to test whether we can process this item.
197 : *
198 : * When working with a thread pool, you add workload items to the FIFO
199 : * and they get executed in order unless you have a valid_workload()
200 : * function. In that case you have to run the process of an item added
201 : * to the FIFO only if:
202 : *
203 : * * There is a thread available,
204 : * * The valid_workload() returns true.
205 : *
206 : * If you want additional tests, you can overload the function since
207 : * it's a virtual function.
208 : *
209 : * \warning
210 : * A side effect of calling this function is to mark the item as being
211 : * processed. At that point, further adding of dependencies is not
212 : * possible.
213 : *
214 : * \return true if the item is ready to be processed (i.e. all of its
215 : * dependencies were processed).
216 : */
217 50 : bool item_with_predicate::valid_workload() const
218 : {
219 100 : guard lock(f_mutex);
220 :
221 138 : for(auto it(f_dependencies.begin()); it != f_dependencies.end(); )
222 : {
223 88 : if(it->expired())
224 : {
225 47 : it = f_dependencies.erase(it);
226 : }
227 : else
228 : {
229 41 : ++it;
230 : }
231 : }
232 :
233 50 : if(f_dependencies.empty())
234 : {
235 20 : f_processing = true;
236 20 : return true;
237 : }
238 :
239 30 : return false;
240 : }
241 :
242 :
243 :
244 :
245 :
246 : /** \typedef item_with_predicate::pointer_t
247 : * \brief The item_with_predicate shared pointer type.
248 : *
249 : * To use an item_with_predicate, we strongly advice that you use a shared
250 : * pointer. This type defines that shared pointer.
251 : */
252 :
253 :
254 : /** \typedef item_with_predicate::weak_pointer_t
255 : * \brief The item_with_predicate weak pointer type.
256 : *
257 : * The items are added as dependencies and in that case we add them as
258 : * weak pointers so when done with a workload, it \em disappears
259 : * automatically and the predicate becomes true.
260 : */
261 :
262 :
263 : /** \typedef item_with_predicate::dependencies_t
264 : * \brief The type representing the list of dependencies.
265 : *
266 : * This type is used to hold the list of dependencies as weak pointers.
267 : * Once that list is empty (we automatically remove pointers which can't
268 : * be locked anymore), the predicate is considered true and the this
269 : * workload can then be worked on by a thread from the pool.
270 : */
271 :
272 :
273 :
274 : /** \var item_with_predicate::f_mutex
275 : * \brief The mutex used to protect the predicate variables.
276 : *
277 : * This mutex is used to make sure that functions that modify the
278 : * variable members do so safely (i.e. only one thread at a time).
279 : */
280 :
281 :
282 : /** \var item_with_predicate::f_dependencies
283 : * \brief Set of dependencies.
284 : *
285 : * This parameter holds a set of _dependencies_, which is a set of other
286 : * items which have to be fully processed before this item can be
287 : * processed.
288 : *
289 : * The fifo::pop_front() function calls the valid_workload() to know
290 : * whether the item has dependencies. It becomes true only once
291 : * all the dependencies were processed.
292 : *
293 : * The set uses weak pointers and detects that another item processing
294 : * is done because it gets released. In your code, you must make sure
295 : * that all the item allocations you've made go out of scope by the time
296 : * you start the execution, otherwise valid_workload() will return false
297 : * forever.
298 : */
299 :
300 :
301 : /** \var item_with_predicate::f_processing
302 : * \brief Whether this workload is being processed.
303 : *
304 : * If this workload predicate is true, then it can be processed and thus
305 : * this flag becomes true. At that point, the add_dependency() and
306 : * add_dependencies() do not work anymore (if you call them, they
307 : * raise an exception).
308 : */
309 :
310 :
311 :
312 :
313 6 : } // namespace cppthread
314 : // vim: ts=4 sw=4 et
|