Line data Source code
1 : // Snap Websites Server -- advanced handling of Unix thread
2 : // Copyright (c) 2013-2019 Made to Order Software Corp. All Rights Reserved
3 : //
4 : // This program is free software; you can redistribute it and/or modify
5 : // it under the terms of the GNU General Public License as published by
6 : // the Free Software Foundation; either version 2 of the License, or
7 : // (at your option) any later version.
8 : //
9 : // This program is distributed in the hope that it will be useful,
10 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : // GNU General Public License for more details.
13 : //
14 : // You should have received a copy of the GNU General Public License
15 : // along with this program; if not, write to the Free Software
16 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 : #pragma once
18 :
19 : /** \file
20 : * \brief Thread Runner and Managers.
21 : *
22 : * This file includes the declaration and implementation (For templates)
23 : * of classes used to manage threads the easy way. Especially, our
24 : * implementation is aware of object destructors so a thread manager
25 : * (snap_thread) can be destroyed. It will automatically and properly
26 : * wait for its runner (the actual system pthread) to exit before
27 : * finishing up its and its runner clean up.
28 : */
29 :
30 : // our lib
31 : //
32 : #include "snapwebsites/snap_exception.h"
33 :
34 : // snapdev lib
35 : //
36 : #include <snapdev/not_reached.h>
37 :
38 : // C++ lib
39 : //
40 : #include <memory>
41 : #include <numeric>
42 : #include <queue>
43 :
44 : // C lib
45 : //
46 : #include <sys/time.h>
47 :
48 :
49 : namespace snap
50 : {
51 :
52 0 : class snap_thread_exception : public snap_exception
53 : {
54 : public:
55 0 : explicit snap_thread_exception(char const * whatmsg) : snap_exception("snap_thread", whatmsg) {}
56 0 : explicit snap_thread_exception(std::string const & whatmsg) : snap_exception("snap_thread", whatmsg) {}
57 : };
58 :
59 : class snap_thread_exception_not_started : public snap_thread_exception
60 : {
61 : public:
62 : explicit snap_thread_exception_not_started(char const * whatmsg) : snap_thread_exception(whatmsg) {}
63 : explicit snap_thread_exception_not_started(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
64 : };
65 :
66 0 : class snap_thread_exception_in_use_error : public snap_thread_exception
67 : {
68 : public:
69 : explicit snap_thread_exception_in_use_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
70 0 : explicit snap_thread_exception_in_use_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
71 : };
72 :
73 0 : class snap_thread_exception_not_locked_error : public snap_thread_exception
74 : {
75 : public:
76 0 : explicit snap_thread_exception_not_locked_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
77 : explicit snap_thread_exception_not_locked_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
78 : };
79 :
80 : class snap_thread_exception_not_locked_once_error : public snap_thread_exception
81 : {
82 : public:
83 : explicit snap_thread_exception_not_locked_once_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
84 : explicit snap_thread_exception_not_locked_once_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
85 : };
86 :
87 0 : class snap_thread_exception_mutex_failed_error : public snap_thread_exception
88 : {
89 : public:
90 0 : explicit snap_thread_exception_mutex_failed_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
91 : explicit snap_thread_exception_mutex_failed_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
92 : };
93 :
94 0 : class snap_thread_exception_invalid_error : public snap_thread_exception
95 : {
96 : public:
97 0 : explicit snap_thread_exception_invalid_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
98 : explicit snap_thread_exception_invalid_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
99 : };
100 :
101 0 : class snap_thread_exception_system_error : public snap_thread_exception
102 : {
103 : public:
104 0 : explicit snap_thread_exception_system_error(char const * whatmsg) : snap_thread_exception(whatmsg) {}
105 : explicit snap_thread_exception_system_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
106 : };
107 :
108 :
109 :
110 :
111 : class snap_thread
112 : {
113 : public:
114 : typedef std::shared_ptr<snap_thread> pointer_t;
115 : typedef std::vector<pointer_t> vector_t;
116 :
117 :
118 : // a mutex to ensure single threaded work
119 : //
120 : class snap_mutex
121 : {
122 : public:
123 : typedef std::shared_ptr<snap_mutex> pointer_t;
124 :
125 : snap_mutex();
126 : ~snap_mutex();
127 :
128 : void lock();
129 : bool try_lock();
130 : void unlock();
131 : void wait();
132 : bool timed_wait(uint64_t const usec);
133 : bool dated_wait(uint64_t const usec);
134 : void signal();
135 : void broadcast();
136 :
137 : private:
138 : uint32_t f_reference_count = 0;
139 : pthread_mutex_t f_mutex = pthread_mutex_t();
140 : pthread_cond_t f_condition = pthread_cond_t();
141 : };
142 :
143 : class snap_lock
144 : {
145 : public:
146 : snap_lock(snap_mutex & mutex);
147 : snap_lock(snap_lock const & rhs) = delete;
148 : ~snap_lock();
149 :
150 : snap_lock & operator = (snap_lock const & rhs) = delete;
151 :
152 : void unlock();
153 :
154 : private:
155 : snap_mutex * f_mutex = nullptr;
156 : };
157 :
158 :
159 : // this is the actual thread because we cannot use the main thread
160 : // object destructor to properly kill a thread in a C++ environment
161 : //
162 : class snap_runner
163 : {
164 : public:
165 : typedef std::shared_ptr<snap_runner> pointer_t;
166 : typedef std::vector<pointer_t> vector_t;
167 :
168 : snap_runner(std::string const & name);
169 : snap_runner(snap_runner const & rhs) = delete;
170 : virtual ~snap_runner();
171 :
172 : snap_runner & operator = (snap_runner const & rhs) = delete;
173 :
174 : std::string const & get_name() const;
175 : virtual bool is_ready() const;
176 : virtual bool continue_running() const;
177 : virtual void run() = 0;
178 : snap_thread * get_thread() const;
179 : pid_t gettid() const;
180 :
181 : protected:
182 : mutable snap_mutex f_mutex = snap_mutex();
183 :
184 : private:
185 : friend class snap_thread;
186 :
187 : snap_thread * f_thread = nullptr;
188 : std::string const f_name = std::string();
189 : };
190 :
191 :
192 : template<class T>
193 0 : class snap_fifo : public snap_mutex
194 : {
195 : private:
196 : typedef std::queue<T> items_t;
197 :
198 : public:
199 : typedef T value_type;
200 : typedef snap_fifo<value_type> fifo_type;
201 : typedef std::shared_ptr<fifo_type> pointer_t;
202 :
203 0 : bool push_back(T const & v)
204 : {
205 0 : snap_lock lock_mutex(*this);
206 0 : if(f_done)
207 : {
208 0 : return false;
209 : }
210 0 : f_queue.push(v);
211 0 : signal();
212 0 : return true;
213 : }
214 :
215 0 : bool pop_front(T & v, int64_t const usecs)
216 : {
217 0 : snap_lock lock_mutex(*this);
218 0 : if(!f_done && f_queue.empty())
219 : {
220 : // when empty wait a bit if possible and try again
221 : //
222 0 : if(usecs == -1)
223 : {
224 : // wait until signal() wakes us up
225 : //
226 0 : wait();
227 : }
228 0 : else if(usecs > 0)
229 : {
230 0 : timed_wait(usecs);
231 : }
232 : }
233 0 : bool const result(!f_queue.empty());
234 0 : if(result)
235 : {
236 0 : v = f_queue.front();
237 0 : f_queue.pop();
238 : }
239 0 : if(f_done && !f_broadcast && f_queue.empty())
240 : {
241 : // make sure all the threads wake up on this new
242 : // "queue is empty" status
243 : //
244 0 : broadcast();
245 0 : f_broadcast = true;
246 : }
247 0 : return result;
248 : }
249 :
250 : void clear()
251 : {
252 : snap_lock lock_mutex(*this);
253 : items_t empty;
254 : f_queue.swap(empty);
255 : }
256 :
257 : bool empty() const
258 : {
259 : snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
260 : return f_queue.empty();
261 : }
262 :
263 : size_t size() const
264 : {
265 : snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
266 : return f_queue.size();
267 : }
268 :
269 : size_t byte_size() const
270 : {
271 : snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
272 : return std::accumulate(
273 : f_queue.begin(),
274 : f_queue.end(),
275 : 0,
276 : [](size_t accum, T const & obj)
277 : {
278 : return accum + obj.size();
279 : });
280 : }
281 :
282 : void done(bool clear)
283 : {
284 : snap_lock lock_mutex(*this);
285 : f_done = true;
286 : if(clear)
287 : {
288 : items_t empty;
289 : f_queue.swap(empty);
290 : }
291 : if(f_queue.empty())
292 : {
293 : broadcast();
294 : f_broadcast = true;
295 : }
296 : }
297 :
298 : bool is_done() const
299 : {
300 : snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
301 : return f_done;
302 : }
303 :
304 : private:
305 : items_t f_queue = items_t();
306 : bool f_done = false;
307 : bool f_broadcast = false;
308 : };
309 :
310 :
311 : /** \brief A runner augmentation allowing for worker threads.
312 : *
313 : * This class allows you to create a pool of worker threads. This is
314 : * useful to add/remove work in a snap_fifo object and have any one
315 : * worker thread pick up the next load as soon as it becomes
316 : * available. This is pretty much the fastest way to get work done
317 : * using threads, however, it really only works if you can easily
318 : * break down the work by chunk.
319 : *
320 : * One pool of worker threads is expected to share one pair of snap_fifo
321 : * objects. Also. the input and output snap_fifo objects must be of the
322 : * same type.
323 : *
324 : * To use your pool of threads, all you have to do is add data to the
325 : * input snap_fifo and grab results from the output snap_fifo. Note that
326 : * the output snap_fifo of one pool of threads can be the input snap_fifo
327 : * of another pool of threads.
328 : */
329 : template<class T>
330 : class snap_worker : public snap_runner
331 : {
332 : public:
333 : typedef T work_load_type;
334 :
335 : /** \brief Initialize a worker thread.
336 : *
337 : * This function initializes a worker thread. The name should be
338 : * different for each worker, although there is no test to verify
339 : * that is the case.
340 : *
341 : * The \p in and \p out parameters are snap_fifo objects used to
342 : * receive work and return work that was done.
343 : *
344 : * Keep in mind that data added and removed from the snap_fifo
345 : * is being copied. It is suggested that you make use of a
346 : * shared_ptr<> to an object and not directly an object. This
347 : * will make the copies much more effective.
348 : *
349 : * \param[in] name The name of this new worker thread.
350 : * \param[in] position The worker thread position.
351 : * \param[in] in The input FIFO.
352 : * \param[in] out The output FIFO.
353 : */
354 : snap_worker<T>(std::string const & name
355 : , size_t position
356 : , typename snap_fifo<T>::pointer_t in
357 : , typename snap_fifo<T>::pointer_t out)
358 : : snap_runner(name)
359 : , f_in(in)
360 : , f_out(out)
361 : , f_position(position)
362 : {
363 : }
364 :
365 : snap_worker<T>(snap_worker const & rhs) = delete;
366 : snap_worker<T> & operator = (snap_worker<T> const & rhs) = delete;
367 :
368 :
369 : /** \brief Get the worker thread position.
370 : *
371 : * Whenever the snap_thread_pool class creates a worker thread,
372 : * it assigns a position to it. The position is not used, but
373 : * it may help you when you try to debug the system.
374 : *
375 : * \return This worker thread position in the snap_thread_pool vector.
376 : */
377 : size_t position() const
378 : {
379 : return f_position;
380 : }
381 :
382 :
383 : /** \brief Check whether this specific worker thread is busy.
384 : *
385 : * This function let you know whether this specific worker thread
386 : * picked a work load and is currently processing it. The processing
387 : * includes copying the data to the output FIFO. However, there is
388 : * a small period of time between the time another work load object
389 : * is being picked up and the time it gets used that the thread is
390 : * not marked as working yet. So in other words, this function may
391 : * be lying at that point.
392 : *
393 : * \return Whether the thread is still working.
394 : */
395 : bool is_working() const
396 : {
397 : snap_lock lock(f_mutex);
398 : return f_working;
399 : }
400 :
401 :
402 : /** \brief Number of time this worker got used.
403 : *
404 : * This function returns the number of time this worker ended up
405 : * running against a work load.
406 : *
407 : * The function may return 0 if the worker never ran. If you create
408 : * a large pool of threads but do not have much work, this is not
409 : * unlikely to happen.
410 : *
411 : * \return Number of times this worker ran your do_work() function.
412 : */
413 : size_t runs() const
414 : {
415 : snap_lock lock(f_mutex);
416 : return f_runs;
417 : }
418 :
419 :
420 : /** \brief Implement the worker loop.
421 : *
422 : * This function is the overload of the snap_runner run() function.
423 : * It takes care of waiting for more data and run your process
424 : * by calling the do_work() function.
425 : *
426 : * You may reimplement this function if you need to do some
427 : * initialization or clean up as follow:
428 : *
429 : * \code
430 : * virtual void run()
431 : * {
432 : * // initialize my variable
433 : * m_my_var = allocate_object();
434 : *
435 : * snap_worker::run();
436 : *
437 : * // make sure to delete that resource
438 : * delete_object(m_my_var);
439 : * }
440 : * \endcode
441 : */
442 : virtual void run()
443 : {
444 : // on a re-run, f_working could be true
445 : {
446 : snap_lock lock(f_mutex);
447 : f_working = false;
448 : }
449 :
450 : while(continue_running())
451 : {
452 : if(f_in->pop_front(f_workload, -1))
453 : {
454 : if(continue_running())
455 : {
456 : {
457 : snap_lock lock(f_mutex);
458 : f_working = true;
459 : ++f_runs;
460 : }
461 :
462 : // note: if do_work() throws, then f_working remains
463 : // set to 'true' which should not matter
464 : //
465 : if(do_work())
466 : {
467 : f_out->push_back(f_workload);
468 : }
469 :
470 : {
471 : snap_lock lock(f_mutex);
472 : f_working = false;
473 : }
474 : }
475 : }
476 : else
477 : {
478 : // if the FIFO is empty and it is marked as done, we
479 : // want to exit immediately
480 : //
481 : if(f_in->is_done())
482 : {
483 : break;
484 : }
485 : }
486 : }
487 : }
488 :
489 : /** \brief Worker Function.
490 : *
491 : * This function is your worker function which will perform work
492 : * against the work load automatically retrieved in the run()
493 : * function.
494 : *
495 : * Your load is available in the f_workload variable member.
496 : * You are free to modify it. The snap_worker object ignores
497 : * its content. It retrieved it from the input fifo (f_in)
498 : * and will save it in the output fifo once done (f_out).
499 : */
500 : virtual bool do_work() = 0;
501 :
502 : protected:
503 : T f_workload = T();
504 : typename snap_fifo<T>::pointer_t f_in;
505 : typename snap_fifo<T>::pointer_t f_out;
506 :
507 : private:
508 : size_t const f_position;
509 : bool f_working = false;
510 : size_t f_runs = 0;
511 : };
512 :
513 : /** \brief Manage a pool of worker threads.
514 : *
515 : * This function manages a pool of worker threads. It allocates the
516 : * threads, accepts incoming data, and returns outgoing data. Everything
517 : * else is managed for you.
518 : *
519 : * To make use of this template, you need to overload the snap_worker
520 : * template and implement your own snap_worker::do_work() function.
521 : *
522 : * Something like this:
523 : *
524 : * \code
525 : * struct data
526 : * {
527 : * std::string f_func = "counter";
528 : * int f_counter = 0;
529 : * };
530 : *
531 : * class foo
532 : * : public snap_worker<data>
533 : * {
534 : * void do_work()
535 : * {
536 : * if(f_workload.f_func == "counter")
537 : * {
538 : * ++f_workload.f_counter;
539 : * }
540 : * else if(f_workload.f_func == "odd-even")
541 : * {
542 : * f_workload.f_counter ^= 1;
543 : * }
544 : * // ...etc...
545 : * }
546 : * };
547 : *
548 : * snap_thread_pool<foo> pool("my pool", 10);
549 : *
550 : * // generate input
551 : * data data_in;
552 : * ...fill data_in...
553 : * pool.push_back(data_in);
554 : *
555 : * // retrieve output
556 : * data data_out;
557 : * if(pool.pop_front(data_out))
558 : * {
559 : * ...use data_out...
560 : * }
561 : * \endcode
562 : *
563 : * You can do all the push_data() you need before doing any pop_front().
564 : * You may also want to consider looping with both interleaved or even
565 : * have two threads: one feeder which does the push_data() and one
566 : * consumer which does the pop_front().
567 : *
568 : * Note that the thread pool does not guarantee order of processing.
569 : * You must make sure that each individual chunk of data you pass in
570 : * the push_front() function can be processed in any order.
571 : */
572 : template<class W, class ...A>
573 : class snap_thread_pool
574 : {
575 : public:
576 : typedef std::shared_ptr<snap_thread_pool<W, A...>> pointer_t;
577 : typedef typename W::work_load_type work_load_type;
578 : typedef snap_fifo<work_load_type> worker_fifo_t;
579 :
580 : private:
581 : /** \brief Class used to manage the worker and worker thread.
582 : *
583 : * This class creates a worker thread, it adds it to a thread,
584 : * and it starts the thread. It is here so we have a single list
585 : * of _worker threads_.
586 : *
587 : * \note
588 : * I had to allocate a snap_thread object because at this point
589 : * the snap_thread is not yet fully defined so it would not take
590 : * it otherwise. It certainly would be possible to move this
591 : * declaration and those depending on it to avoid this problem,
592 : * though.
593 : */
594 : class worker_thread_t
595 : {
596 : public:
597 : typedef std::shared_ptr<worker_thread_t> pointer_t;
598 : typedef std::vector<pointer_t> vector_t;
599 :
600 : worker_thread_t(std::string const & name
601 : , size_t i
602 : , typename worker_fifo_t::pointer_t in
603 : , typename worker_fifo_t::pointer_t out
604 : , A... args)
605 : : f_worker(name + " (worker #" + std::to_string(i) + ")"
606 : , i
607 : , in
608 : , out
609 : , args...)
610 : , f_thread(std::make_shared<snap_thread>(name, &f_worker))
611 : {
612 : f_thread->start();
613 : }
614 :
615 : W & get_worker()
616 : {
617 : return f_worker;
618 : }
619 :
620 : W const & get_worker() const
621 : {
622 : return f_worker;
623 : }
624 :
625 : private:
626 : W f_worker; // runner before thread; this is safe
627 : snap_thread::pointer_t f_thread;
628 : };
629 :
630 :
631 : public:
632 : /** \brief Initializes a pool of worker threads.
633 : *
634 : * This constructor is used to initialize the specified number
635 : * of worker threads in this pool.
636 : *
637 : * At this time, all the worker threads get allocated upfront.
638 : *
639 : * \todo
640 : * We may want to add more threads as the work load increases
641 : * which could allow for much less unused threads created. To
642 : * do so we want to (1) time the do_work() function to get
643 : * an idea of how long a thread takes to perform its work;
644 : * and (2) have a threshold so we know when the client wants
645 : * to create new worker threads if the load increases instead
646 : * of assuming it should happen as soon as data gets added
647 : * to the input FIFO. The \p pool_size parameter would then
648 : * become a \p max_pool_size.
649 : *
650 : * \param[in] name The name of the pool.
651 : * \param[in] pool_size The number of threads to create.
652 : * \param[in] in The input FIFO (where workers receive workload.)
653 : * \param[in] out The output FIFO (where finished work is sent.)
654 : * \param[in] args Extra arguments to initialize the workers.
655 : */
656 : snap_thread_pool<W, A...>(std::string const & name
657 : , size_t pool_size
658 : , typename worker_fifo_t::pointer_t in
659 : , typename worker_fifo_t::pointer_t out
660 : , A... args)
661 : : f_name(name)
662 : , f_in(in)
663 : , f_out(out)
664 : {
665 : if(pool_size == 0)
666 : {
667 : throw snap_thread_exception_invalid_error("the pool size must be a positive number (1 or more)");
668 : }
669 : if(pool_size > 1000)
670 : {
671 : throw snap_thread_exception_invalid_error("pool size too large (we accept up to 1000 at this time, which is already very very large!)");
672 : }
673 : for(size_t i(0); i < pool_size; ++i)
674 : {
675 : f_workers.push_back(std::make_shared<worker_thread_t>(
676 : f_name
677 : , i
678 : , f_in
679 : , f_out
680 : , args...));
681 : }
682 : }
683 :
684 :
685 : /** \brief Make sure that the thread pool is cleaned up.
686 : *
687 : * The destructor of the snap thread pool stops all the threads
688 : * and then waits on them. Assuming that your code doesn't loop
689 : * forever, the result is that all the threads are stopped, joined,
690 : * and all the incoming and outgoing data was cleared.
691 : *
692 : * If you need to get the output data, then make sure to call
693 : * stop(), wait(), and pop_front() until it returns false.
694 : * Only then can you call the destructor.
695 : *
696 : * You are safe to run that stop process in your own destructor.
697 : */
698 : ~snap_thread_pool<W, A...>()
699 : {
700 : stop(false);
701 : wait();
702 : }
703 :
704 :
705 : /** \brief Retrieve the number of workers.
706 : *
707 : * This function returns the number of workers this thread pool
708 : * is handling. The number is at least 1 as a thread pool can't
709 : * currently be empty.
710 : *
711 : * \return The number of workers in this thread pool.
712 : */
713 : size_t size() const
714 : {
715 : return f_workers.size();
716 : }
717 :
718 :
719 : /** \brief Get worker at index `i`.
720 : *
721 : * This function returns a reference to the worker at index `i`.
722 : *
723 : * \exception std::range_error
724 : * If the index is out of range (negative or larger or equal to
725 : * the number of workers) then this exception is raised.
726 : *
727 : * \param[in] i The index of the worker to retrieve.
728 : *
729 : * \return A reference to worker `i`.
730 : */
731 : W & get_worker(int i)
732 : {
733 : if(static_cast<size_t>(i) >= f_workers.size())
734 : {
735 : throw std::range_error("snap::snap_thread::snap_thread_pool::get_worker() called with an index out of bounds.");
736 : }
737 : return f_workers[i]->get_worker();
738 : }
739 :
740 :
741 : /** \brief Get worker at index `i` (constant version).
742 : *
743 : * This function returns a reference to the worker at index `i`.
744 : *
745 : * \exception std::range_error
746 : * If the index is out of range (negative or larger or equal to
747 : * the number of workers) then this exception is raised.
748 : *
749 : * \param[in] i The index of the worker to retrieve.
750 : *
751 : * \return A reference to worker `i`.
752 : */
753 : W const & get_worker(int i) const
754 : {
755 : if(static_cast<size_t>(i) >= f_workers.size())
756 : {
757 : throw std::range_error("snap::snap_thread::snap_thread_pool::get_worker() called with an index out of bounds.");
758 : }
759 : return f_workers[i]->get_worker();
760 : }
761 :
762 :
763 : /** \brief Push one work load of data.
764 : *
765 : * This function adds a work load of data to the input. One of the
766 : * worker threads will automatically pick up that work and have
767 : * its snap_worker::do_work() function called.
768 : *
769 : * \param[in] v The work load of data to add to this pool.
770 : *
771 : * \sa pop_front()
772 : */
773 : void push_back(work_load_type const & v)
774 : {
775 : f_in.push_back(v);
776 : }
777 :
778 :
779 : /** \brief Retrieve one work load of processed data.
780 : *
781 : * This function retrieves one T object from the output FIFO.
782 : *
783 : * The \p usecs parameter can be set to -1 to wait until output
784 : * is received. Set it to 0 to avoid waiting (check for data,
785 : * if nothing return immediately) and a positive number to wait
786 : * up to that many microseconds.
787 : *
788 : * Make sure to check the function return value. If false, the
789 : * \p v parameter is unmodified.
790 : *
791 : * Once you called the stop() function, the pop_front() function
792 : * can still be called until the out queue is emptied. The proper
793 : * sequence is to
794 : *
795 : * \param[in] v A reference where the object gets copied.
796 : * \param[in] usecs The number of microseconds to wait.
797 : *
798 : * \return true if an object was retrieved.
799 : *
800 : * \sa snap_mutex::pop_front()
801 : */
802 : bool pop_front(work_load_type & v, int64_t usecs)
803 : {
804 : if(f_in.is_done())
805 : {
806 : usecs = 0;
807 : }
808 : return f_out.pop_front(v, usecs);
809 : }
810 :
811 :
812 : /** \brief Stop the threads.
813 : *
814 : * This function is called to ask all the threads to stop.
815 : *
816 : * When the \p immediate parameter is set to true, whatever is
817 : * left in the queue gets removed. This means you are likely to
818 : * get invalid or at least incomplete results in the output.
819 : * It should only be used if the plan is to cancel the current
820 : * work process and trash everything away.
821 : *
822 : * After a call to the stop() function, you may want to retrieve
823 : * the last bits of data with the pop_front() function until
824 : * it returns false and then call the join() function to wait
825 : * on all the threads and call pop_front() again to make sure
826 : * that you got all the output.
827 : *
828 : * You may also call the join() function right after stop()
829 : * and then the pop_front().
830 : *
831 : * \note
832 : * It is not necessary to call pop_front() if you are cancelling
833 : * the processing anyway. You can just ignore that data and
834 : * it will be deleted as soon as the thread pool gets deleted.
835 : *
836 : * \note
837 : * The function can be called any number of times. It really only
838 : * has an effect on the first call, though.
839 : *
840 : * \param[in] immediate Whether the clear the remaining items on
841 : * the queue.
842 : */
843 : void stop(bool immediate)
844 : {
845 : if(!f_in->is_done())
846 : {
847 : f_in->done(immediate);
848 : }
849 : }
850 :
851 :
852 : /** \brief Wait on the threads to be done.
853 : *
854 : * This function waits on all the worker threads until they all
855 : * exited. This ensures that all the output (see the pop_front()
856 : * function) was generated and you can therefore end your
857 : * processing.
858 : *
859 : * The order of processing is as follow:
860 : *
861 : * \code
862 : * snap_thread_pool my_pool;
863 : *
864 : * ...initialization...
865 : *
866 : * data_t in;
867 : * data_t out;
868 : *
869 : * while(load_data(in))
870 : * {
871 : * pool.push_back(in);
872 : *
873 : * while(pool.pop_front(out))
874 : * {
875 : * // handle out, maybe:
876 : * save_data(out);
877 : * }
878 : * }
879 : *
880 : * // no more input
881 : * pool.stop();
882 : *
883 : * // optionally, empty the output pipe before the wait()
884 : * while(pool.pop_front(out))
885 : * {
886 : * // handle out...
887 : * }
888 : *
889 : * // make sure the workers are done
890 : * pool.wait();
891 : *
892 : * // make sure the output is all managed
893 : * while(pool.pop_front(out))
894 : * {
895 : * // handle out...
896 : * }
897 : *
898 : * // now we're done
899 : * \endcode
900 : *
901 : * Emptying the output queue between the stop() and wait() is not
902 : * required. It may always be empty or you will anyway get only
903 : * a few small chunks that can wait in the buffer.
904 : *
905 : * \note
906 : * The function can be called any number of times. After the first
907 : * time, though, the vector of worker threads is empty so really
908 : * nothing happens.
909 : *
910 : * \attention
911 : * This function can't be called from one of the workers.
912 : */
913 : void wait()
914 : {
915 : f_workers.clear();
916 : }
917 :
918 :
919 : private:
920 : typedef typename worker_thread_t::vector_t workers_t;
921 :
922 : std::string const f_name;
923 : typename worker_fifo_t::pointer_t f_in;
924 : typename worker_fifo_t::pointer_t f_out;
925 : workers_t f_workers = workers_t();
926 : };
927 :
928 : class snap_thread_life
929 : {
930 : public:
931 : /** \brief Initialize a "thread life" object.
932 : *
933 : * This type of objects are used to record a thread and make sure
934 : * that it gets destroyed once done with it.
935 : *
936 : * The constructor makes sure that the specified thread is not
937 : * a null pointer and it starts the thread. If the thread is
938 : * already running, then the constructor will throw.
939 : *
940 : * Once such an object was created, it is not possible to prevent
941 : * the thread life desrtuctor from calling the stop() function and
942 : * waiting for the thread to be done.
943 : *
944 : * \note
945 : * If possible, you should consider using the snap_thread::pointer_t
946 : * instead of the snap_thread_life which expects a bare pointer.
947 : * There are situations, though, where this class is practical
948 : * because you have a thread as a variable member in a class.
949 : *
950 : * \param[in] thread The thread which life is to be controlled.
951 : */
952 : snap_thread_life(snap_thread * const thread)
953 : : f_thread(thread)
954 : {
955 : if(f_thread == nullptr)
956 : {
957 : throw snap_logic_exception("snap_thread_life pointer is nullptr");
958 : }
959 : if(!f_thread->start())
960 : {
961 : // we cannot really just generate an error if the thread
962 : // does not start because we do not offer a way for the
963 : // user to know so we have to throw for now
964 : throw snap_thread_exception_not_started("somehow the thread was not started, an error should have been logged");
965 : }
966 : }
967 :
968 : snap_thread_life(snap_thread_life const & rhs) = delete;
969 :
970 : /** \brief Make sure the thread stops.
971 : *
972 : * This function requests that the attach thread stop. It will block
973 : * until such happens. You are responsible to make sure that the
974 : * stop happens early on if your own object needs to access the
975 : * thread while stopping.
976 : */
977 : ~snap_thread_life()
978 : {
979 : //SNAP_LOG_TRACE() << "stopping snap_thread_life...";
980 : f_thread->stop();
981 : //SNAP_LOG_TRACE() << "snap_thread_life stopped!";
982 : }
983 :
984 : snap_thread_life & operator = (snap_thread_life const & rhs) = delete;
985 :
986 : private:
987 : /** \brief The pointer to the thread being managed.
988 : *
989 : * This pointer is a pointer to the thread. Once a thread life
990 : * object is initialized, the pointer is never nullptr (we
991 : * throw before in the constructor if that is the case.)
992 : *
993 : * The user of the snap_thread_life class must view the
994 : * thread pointer as owned by the snap_thread_life object
995 : * (similar to a smart pointer.)
996 : */
997 : snap_thread * f_thread = nullptr;
998 : };
999 :
1000 : snap_thread(std::string const & name, snap_runner * runner);
1001 : ~snap_thread();
1002 : snap_thread(snap_thread const & rhs) = delete;
1003 : snap_thread & operator = (snap_thread const & rhs) = delete;
1004 :
1005 : std::string const & get_name() const;
1006 : snap_runner * get_runner() const;
1007 : bool is_running() const;
1008 : bool is_stopping() const;
1009 : bool start();
1010 : void stop();
1011 : bool kill(int sig);
1012 : pid_t get_thread_tid() const;
1013 : snap_mutex & get_thread_mutex() const;
1014 :
1015 : static int get_total_number_of_processors();
1016 : static int get_number_of_available_processors();
1017 : static pid_t gettid();
1018 :
1019 : private:
1020 : // internal function to start the runner
1021 : friend void * func_internal_start(void * thread);
1022 : void internal_run();
1023 :
1024 : std::string const f_name = std::string();
1025 : snap_runner * f_runner = nullptr;
1026 : mutable snap_mutex f_mutex = snap_mutex();
1027 : bool f_running = false;
1028 : bool f_started = false;
1029 : bool f_stopping = false;
1030 : pid_t f_tid = -1;
1031 : pthread_t f_thread_id = -1;
1032 : pthread_attr_t f_thread_attr = pthread_attr_t();
1033 : std::exception_ptr f_exception = std::exception_ptr();
1034 : };
1035 :
1036 : } // namespace snap
1037 : // vim: ts=4 sw=4 et
|