LCOV - code coverage report
Current view: top level - snapwebsites - snap_thread.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 36 0.0 %
Date: 2019-12-15 17:13:15 Functions: 0 23 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Snap Websites Server -- advanced handling of Unix thread
       2             : // Copyright (c) 2013-2019  Made to Order Software Corp.  All Rights Reserved
       3             : //
       4             : // This program is free software; you can redistribute it and/or modify
       5             : // it under the terms of the GNU General Public License as published by
       6             : // the Free Software Foundation; either version 2 of the License, or
       7             : // (at your option) any later version.
       8             : //
       9             : // This program is distributed in the hope that it will be useful,
      10             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             : // GNU General Public License for more details.
      13             : //
      14             : // You should have received a copy of the GNU General Public License
      15             : // along with this program; if not, write to the Free Software
      16             : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      17             : #pragma once
      18             : 
      19             : /** \file
      20             :  * \brief Thread Runner and Managers.
      21             :  *
      22             :  * This file includes the declaration and implementation (For templates)
      23             :  * of classes used to manage threads the easy way. Especially, our
      24             :  * implementation is aware of object destructors so a thread manager
      25             :  * (snap_thread) can be destroyed. It will automatically and properly
      26             :  * wait for its runner (the actual system pthread) to exit before
      27             :  * finishing up its and its runner clean up.
      28             :  */
      29             : 
      30             : // our lib
      31             : //
      32             : #include "snapwebsites/snap_exception.h"
      33             : 
      34             : // snapdev lib
      35             : //
      36             : #include <snapdev/not_reached.h>
      37             : 
      38             : // C++ lib
      39             : //
      40             : #include <memory>
      41             : #include <numeric>
      42             : #include <queue>
      43             : 
      44             : // C lib
      45             : //
      46             : #include <sys/time.h>
      47             : 
      48             : 
      49             : namespace snap
      50             : {
      51             : 
      52           0 : class snap_thread_exception : public snap_exception
      53             : {
      54             : public:
      55           0 :     explicit snap_thread_exception(char const *        whatmsg) : snap_exception("snap_thread", whatmsg) {}
      56           0 :     explicit snap_thread_exception(std::string const & whatmsg) : snap_exception("snap_thread", whatmsg) {}
      57             : };
      58             : 
      59             : class snap_thread_exception_not_started : public snap_thread_exception
      60             : {
      61             : public:
      62             :     explicit snap_thread_exception_not_started(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      63             :     explicit snap_thread_exception_not_started(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      64             : };
      65             : 
      66           0 : class snap_thread_exception_in_use_error : public snap_thread_exception
      67             : {
      68             : public:
      69             :     explicit snap_thread_exception_in_use_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      70           0 :     explicit snap_thread_exception_in_use_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      71             : };
      72             : 
      73           0 : class snap_thread_exception_not_locked_error : public snap_thread_exception
      74             : {
      75             : public:
      76           0 :     explicit snap_thread_exception_not_locked_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      77             :     explicit snap_thread_exception_not_locked_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      78             : };
      79             : 
      80             : class snap_thread_exception_not_locked_once_error : public snap_thread_exception
      81             : {
      82             : public:
      83             :     explicit snap_thread_exception_not_locked_once_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      84             :     explicit snap_thread_exception_not_locked_once_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      85             : };
      86             : 
      87           0 : class snap_thread_exception_mutex_failed_error : public snap_thread_exception
      88             : {
      89             : public:
      90           0 :     explicit snap_thread_exception_mutex_failed_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      91             :     explicit snap_thread_exception_mutex_failed_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      92             : };
      93             : 
      94           0 : class snap_thread_exception_invalid_error : public snap_thread_exception
      95             : {
      96             : public:
      97           0 :     explicit snap_thread_exception_invalid_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
      98             :     explicit snap_thread_exception_invalid_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
      99             : };
     100             : 
     101           0 : class snap_thread_exception_system_error : public snap_thread_exception
     102             : {
     103             : public:
     104           0 :     explicit snap_thread_exception_system_error(char const *        whatmsg) : snap_thread_exception(whatmsg) {}
     105             :     explicit snap_thread_exception_system_error(std::string const & whatmsg) : snap_thread_exception(whatmsg) {}
     106             : };
     107             : 
     108             : 
     109             : 
     110             : 
     111             : class snap_thread
     112             : {
     113             : public:
     114             :     typedef std::shared_ptr<snap_thread>    pointer_t;
     115             :     typedef std::vector<pointer_t>          vector_t;
     116             : 
     117             : 
     118             :     // a mutex to ensure single threaded work
     119             :     //
     120             :     class snap_mutex
     121             :     {
     122             :     public:
     123             :         typedef std::shared_ptr<snap_mutex>     pointer_t;
     124             : 
     125             :                             snap_mutex();
     126             :                             ~snap_mutex();
     127             : 
     128             :         void                lock();
     129             :         bool                try_lock();
     130             :         void                unlock();
     131             :         void                wait();
     132             :         bool                timed_wait(uint64_t const usec);
     133             :         bool                dated_wait(uint64_t const usec);
     134             :         void                signal();
     135             :         void                broadcast();
     136             : 
     137             :     private:
     138             :         uint32_t            f_reference_count = 0;
     139             :         pthread_mutex_t     f_mutex = pthread_mutex_t();
     140             :         pthread_cond_t      f_condition = pthread_cond_t();
     141             :     };
     142             : 
     143             :     class snap_lock
     144             :     {
     145             :     public:
     146             :                             snap_lock(snap_mutex & mutex);
     147             :                             snap_lock(snap_lock const & rhs) = delete;
     148             :                             ~snap_lock();
     149             : 
     150             :         snap_lock &         operator = (snap_lock const & rhs) = delete;
     151             : 
     152             :         void                unlock();
     153             : 
     154             :     private:
     155             :         snap_mutex *        f_mutex = nullptr;
     156             :     };
     157             : 
     158             : 
     159             :     // this is the actual thread because we cannot use the main thread
     160             :     // object destructor to properly kill a thread in a C++ environment
     161             :     //
     162             :     class snap_runner
     163             :     {
     164             :     public:
     165             :         typedef std::shared_ptr<snap_runner>    pointer_t;
     166             :         typedef std::vector<pointer_t>          vector_t;
     167             : 
     168             :                             snap_runner(std::string const & name);
     169             :                             snap_runner(snap_runner const & rhs) = delete;
     170             :         virtual             ~snap_runner();
     171             : 
     172             :         snap_runner &       operator = (snap_runner const & rhs) = delete;
     173             : 
     174             :         std::string const & get_name() const;
     175             :         virtual bool        is_ready() const;
     176             :         virtual bool        continue_running() const;
     177             :         virtual void        run() = 0;
     178             :         snap_thread *       get_thread() const;
     179             :         pid_t               gettid() const;
     180             : 
     181             :     protected:
     182             :         mutable snap_mutex  f_mutex = snap_mutex();
     183             : 
     184             :     private:
     185             :         friend class snap_thread;
     186             : 
     187             :         snap_thread *       f_thread = nullptr;
     188             :         std::string const   f_name = std::string();
     189             :     };
     190             : 
     191             : 
     192             :     template<class T>
     193           0 :     class snap_fifo : public snap_mutex
     194             :     {
     195             :     private:
     196             :         typedef std::queue<T>   items_t;
     197             : 
     198             :     public:
     199             :         typedef T                               value_type;
     200             :         typedef snap_fifo<value_type>           fifo_type;
     201             :         typedef std::shared_ptr<fifo_type>      pointer_t;
     202             : 
     203           0 :         bool push_back(T const & v)
     204             :         {
     205           0 :             snap_lock lock_mutex(*this);
     206           0 :             if(f_done)
     207             :             {
     208           0 :                 return false;
     209             :             }
     210           0 :             f_queue.push(v);
     211           0 :             signal();
     212           0 :             return true;
     213             :         }
     214             : 
     215           0 :         bool pop_front(T & v, int64_t const usecs)
     216             :         {
     217           0 :             snap_lock lock_mutex(*this);
     218           0 :             if(!f_done && f_queue.empty())
     219             :             {
     220             :                 // when empty wait a bit if possible and try again
     221             :                 //
     222           0 :                 if(usecs == -1)
     223             :                 {
     224             :                     // wait until signal() wakes us up
     225             :                     //
     226           0 :                     wait();
     227             :                 }
     228           0 :                 else if(usecs > 0)
     229             :                 {
     230           0 :                     timed_wait(usecs);
     231             :                 }
     232             :             }
     233           0 :             bool const result(!f_queue.empty());
     234           0 :             if(result)
     235             :             {
     236           0 :                 v = f_queue.front();
     237           0 :                 f_queue.pop();
     238             :             }
     239           0 :             if(f_done && !f_broadcast && f_queue.empty())
     240             :             {
     241             :                 // make sure all the threads wake up on this new
     242             :                 // "queue is empty" status
     243             :                 //
     244           0 :                 broadcast();
     245           0 :                 f_broadcast = true;
     246             :             }
     247           0 :             return result;
     248             :         }
     249             : 
     250             :         void clear()
     251             :         {
     252             :             snap_lock lock_mutex(*this);
     253             :             items_t empty;
     254             :             f_queue.swap(empty);
     255             :         }
     256             : 
     257             :         bool empty() const
     258             :         {
     259             :             snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
     260             :             return f_queue.empty();
     261             :         }
     262             : 
     263             :         size_t size() const
     264             :         {
     265             :             snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
     266             :             return f_queue.size();
     267             :         }
     268             : 
     269             :         size_t byte_size() const
     270             :         {
     271             :             snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
     272             :             return std::accumulate(
     273             :                         f_queue.begin(),
     274             :                         f_queue.end(),
     275             :                         0,
     276             :                         [](size_t accum, T const & obj)
     277             :                         {
     278             :                             return accum + obj.size();
     279             :                         });
     280             :         }
     281             : 
     282             :         void done(bool clear)
     283             :         {
     284             :             snap_lock lock_mutex(*this);
     285             :             f_done = true;
     286             :             if(clear)
     287             :             {
     288             :                 items_t empty;
     289             :                 f_queue.swap(empty);
     290             :             }
     291             :             if(f_queue.empty())
     292             :             {
     293             :                 broadcast();
     294             :                 f_broadcast = true;
     295             :             }
     296             :         }
     297             : 
     298             :         bool is_done() const
     299             :         {
     300             :             snap_lock lock_mutex(const_cast<snap_fifo &>(*this));
     301             :             return f_done;
     302             :         }
     303             : 
     304             :     private:
     305             :         items_t                 f_queue = items_t();
     306             :         bool                    f_done = false;
     307             :         bool                    f_broadcast = false;
     308             :     };
     309             : 
     310             : 
     311             :     /** \brief A runner augmentation allowing for worker threads.
     312             :      *
     313             :      * This class allows you to create a pool of worker threads. This is
     314             :      * useful to add/remove work in a snap_fifo object and have any one
     315             :      * worker thread pick up the next load as soon as it becomes
     316             :      * available. This is pretty much the fastest way to get work done
     317             :      * using threads, however, it really only works if you can easily
     318             :      * break down the work by chunk.
     319             :      *
     320             :      * One pool of worker threads is expected to share one pair of snap_fifo
     321             :      * objects. Also. the input and output snap_fifo objects must be of the
     322             :      * same type.
     323             :      *
     324             :      * To use your pool of threads, all you have to do is add data to the
     325             :      * input snap_fifo and grab results from the output snap_fifo. Note that
     326             :      * the output snap_fifo of one pool of threads can be the input snap_fifo
     327             :      * of another pool of threads.
     328             :      */
     329             :     template<class T>
     330             :     class snap_worker : public snap_runner
     331             :     {
     332             :     public:
     333             :         typedef T       work_load_type;
     334             : 
     335             :         /** \brief Initialize a worker thread.
     336             :          *
     337             :          * This function initializes a worker thread. The name should be
     338             :          * different for each worker, although there is no test to verify
     339             :          * that is the case.
     340             :          *
     341             :          * The \p in and \p out parameters are snap_fifo objects used to
     342             :          * receive work and return work that was done.
     343             :          *
     344             :          * Keep in mind that data added and removed from the snap_fifo
     345             :          * is being copied. It is suggested that you make use of a
     346             :          * shared_ptr<> to an object and not directly an object. This
     347             :          * will make the copies much more effective.
     348             :          *
     349             :          * \param[in] name  The name of this new worker thread.
     350             :          * \param[in] position  The worker thread position.
     351             :          * \param[in] in  The input FIFO.
     352             :          * \param[in] out  The output FIFO.
     353             :          */
     354             :         snap_worker<T>(std::string const & name
     355             :                      , size_t position
     356             :                      , typename snap_fifo<T>::pointer_t in
     357             :                      , typename snap_fifo<T>::pointer_t out)
     358             :             : snap_runner(name)
     359             :             , f_in(in)
     360             :             , f_out(out)
     361             :             , f_position(position)
     362             :         {
     363             :         }
     364             : 
     365             :         snap_worker<T>(snap_worker const & rhs) = delete;
     366             :         snap_worker<T> & operator = (snap_worker<T> const & rhs) = delete;
     367             : 
     368             : 
     369             :         /** \brief Get the worker thread position.
     370             :          *
     371             :          * Whenever the snap_thread_pool class creates a worker thread,
     372             :          * it assigns a position to it. The position is not used, but
     373             :          * it may help you when you try to debug the system.
     374             :          *
     375             :          * \return This worker thread position in the snap_thread_pool vector.
     376             :          */
     377             :         size_t position() const
     378             :         {
     379             :             return f_position;
     380             :         }
     381             : 
     382             : 
     383             :         /** \brief Check whether this specific worker thread is busy.
     384             :          *
     385             :          * This function let you know whether this specific worker thread
     386             :          * picked a work load and is currently processing it. The processing
     387             :          * includes copying the data to the output FIFO. However, there is
     388             :          * a small period of time between the time another work load object
     389             :          * is being picked up and the time it gets used that the thread is
     390             :          * not marked as working yet. So in other words, this function may
     391             :          * be lying at that point.
     392             :          *
     393             :          * \return Whether the thread is still working.
     394             :          */
     395             :         bool is_working() const
     396             :         {
     397             :             snap_lock lock(f_mutex);
     398             :             return f_working;
     399             :         }
     400             : 
     401             : 
     402             :         /** \brief Number of time this worker got used.
     403             :          *
     404             :          * This function returns the number of time this worker ended up
     405             :          * running against a work load.
     406             :          *
     407             :          * The function may return 0 if the worker never ran. If you create
     408             :          * a large pool of threads but do not have much work, this is not
     409             :          * unlikely to happen.
     410             :          *
     411             :          * \return Number of times this worker ran your do_work() function.
     412             :          */
     413             :         size_t runs() const
     414             :         {
     415             :             snap_lock lock(f_mutex);
     416             :             return f_runs;
     417             :         }
     418             : 
     419             : 
     420             :         /** \brief Implement the worker loop.
     421             :          *
     422             :          * This function is the overload of the snap_runner run() function.
     423             :          * It takes care of waiting for more data and run your process
     424             :          * by calling the do_work() function.
     425             :          *
     426             :          * You may reimplement this function if you need to do some
     427             :          * initialization or clean up as follow:
     428             :          *
     429             :          * \code
     430             :          *      virtual void run()
     431             :          *      {
     432             :          *          // initialize my variable
     433             :          *          m_my_var = allocate_object();
     434             :          *
     435             :          *          snap_worker::run();
     436             :          *
     437             :          *          // make sure to delete that resource
     438             :          *          delete_object(m_my_var);
     439             :          *      }
     440             :          * \endcode
     441             :          */
     442             :         virtual void run()
     443             :         {
     444             :             // on a re-run, f_working could be true
     445             :             {
     446             :                 snap_lock lock(f_mutex);
     447             :                 f_working = false;
     448             :             }
     449             : 
     450             :             while(continue_running())
     451             :             {
     452             :                 if(f_in->pop_front(f_workload, -1))
     453             :                 {
     454             :                     if(continue_running())
     455             :                     {
     456             :                         {
     457             :                             snap_lock lock(f_mutex);
     458             :                             f_working = true;
     459             :                             ++f_runs;
     460             :                         }
     461             : 
     462             :                         // note: if do_work() throws, then f_working remains
     463             :                         //       set to 'true' which should not matter
     464             :                         //
     465             :                         if(do_work())
     466             :                         {
     467             :                             f_out->push_back(f_workload);
     468             :                         }
     469             : 
     470             :                         {
     471             :                             snap_lock lock(f_mutex);
     472             :                             f_working = false;
     473             :                         }
     474             :                     }
     475             :                 }
     476             :                 else
     477             :                 {
     478             :                     // if the FIFO is empty and it is marked as done, we
     479             :                     // want to exit immediately
     480             :                     //
     481             :                     if(f_in->is_done())
     482             :                     {
     483             :                         break;
     484             :                     }
     485             :                 }
     486             :             }
     487             :         }
     488             : 
     489             :         /** \brief Worker Function.
     490             :          *
     491             :          * This function is your worker function which will perform work
     492             :          * against the work load automatically retrieved in the run()
     493             :          * function.
     494             :          *
     495             :          * Your load is available in the f_workload variable member.
     496             :          * You are free to modify it. The snap_worker object ignores
     497             :          * its content. It retrieved it from the input fifo (f_in)
     498             :          * and will save it in the output fifo once done (f_out).
     499             :          */
     500             :         virtual bool do_work() = 0;
     501             : 
     502             :     protected:
     503             :         T                       f_workload = T();
     504             :         typename snap_fifo<T>::pointer_t f_in;
     505             :         typename snap_fifo<T>::pointer_t f_out;
     506             : 
     507             :     private:
     508             :         size_t const            f_position;
     509             :         bool                    f_working = false;
     510             :         size_t                  f_runs = 0;
     511             :     };
     512             : 
     513             :     /** \brief Manage a pool of worker threads.
     514             :      *
     515             :      * This function manages a pool of worker threads. It allocates the
     516             :      * threads, accepts incoming data, and returns outgoing data. Everything
     517             :      * else is managed for you.
     518             :      *
     519             :      * To make use of this template, you need to overload the snap_worker
     520             :      * template and implement your own snap_worker::do_work() function.
     521             :      *
     522             :      * Something like this:
     523             :      *
     524             :      * \code
     525             :      *      struct data
     526             :      *      {
     527             :      *          std::string f_func = "counter";
     528             :      *          int         f_counter = 0;
     529             :      *      };
     530             :      *
     531             :      *      class foo
     532             :      *          : public snap_worker<data>
     533             :      *      {
     534             :      *          void do_work()
     535             :      *          {
     536             :      *              if(f_workload.f_func == "counter")
     537             :      *              {
     538             :      *                  ++f_workload.f_counter;
     539             :      *              }
     540             :      *              else if(f_workload.f_func == "odd-even")
     541             :      *              {
     542             :      *                  f_workload.f_counter ^= 1;
     543             :      *              }
     544             :      *              // ...etc...
     545             :      *          }
     546             :      *      };
     547             :      *
     548             :      *      snap_thread_pool<foo> pool("my pool", 10);
     549             :      *
     550             :      *      // generate input
     551             :      *      data data_in;
     552             :      *      ...fill data_in...
     553             :      *      pool.push_back(data_in);
     554             :      *
     555             :      *      // retrieve output
     556             :      *      data data_out;
     557             :      *      if(pool.pop_front(data_out))
     558             :      *      {
     559             :      *          ...use data_out...
     560             :      *      }
     561             :      * \endcode
     562             :      *
     563             :      * You can do all the push_data() you need before doing any pop_front().
     564             :      * You may also want to consider looping with both interleaved or even
     565             :      * have two threads: one feeder which does the push_data() and one
     566             :      * consumer which does the pop_front().
     567             :      *
     568             :      * Note that the thread pool does not guarantee order of processing.
     569             :      * You must make sure that each individual chunk of data you pass in
     570             :      * the push_front() function can be processed in any order.
     571             :      */
     572             :     template<class W, class ...A>
     573             :     class snap_thread_pool
     574             :     {
     575             :     public:
     576             :         typedef std::shared_ptr<snap_thread_pool<W, A...>>  pointer_t;
     577             :         typedef typename W::work_load_type                  work_load_type;
     578             :         typedef snap_fifo<work_load_type>                   worker_fifo_t;
     579             : 
     580             :     private:
     581             :         /** \brief Class used to manage the worker and worker thread.
     582             :          *
     583             :          * This class creates a worker thread, it adds it to a thread,
     584             :          * and it starts the thread. It is here so we have a single list
     585             :          * of _worker threads_.
     586             :          *
     587             :          * \note
     588             :          * I had to allocate a snap_thread object because at this point
     589             :          * the snap_thread is not yet fully defined so it would not take
     590             :          * it otherwise. It certainly would be possible to move this
     591             :          * declaration and those depending on it to avoid this problem,
     592             :          * though.
     593             :          */
     594             :         class worker_thread_t
     595             :         {
     596             :         public:
     597             :             typedef std::shared_ptr<worker_thread_t>    pointer_t;
     598             :             typedef std::vector<pointer_t>              vector_t;
     599             : 
     600             :             worker_thread_t(std::string const & name
     601             :                           , size_t i
     602             :                           , typename worker_fifo_t::pointer_t in
     603             :                           , typename worker_fifo_t::pointer_t out
     604             :                           , A... args)
     605             :                 : f_worker(name + " (worker #" + std::to_string(i) + ")"
     606             :                          , i
     607             :                          , in
     608             :                          , out
     609             :                          , args...)
     610             :                 , f_thread(std::make_shared<snap_thread>(name, &f_worker))
     611             :             {
     612             :                 f_thread->start();
     613             :             }
     614             : 
     615             :             W & get_worker()
     616             :             {
     617             :                 return f_worker;
     618             :             }
     619             : 
     620             :             W const & get_worker() const
     621             :             {
     622             :                 return f_worker;
     623             :             }
     624             : 
     625             :         private:
     626             :             W                       f_worker;   // runner before thread; this is safe
     627             :             snap_thread::pointer_t  f_thread;
     628             :         };
     629             : 
     630             : 
     631             :     public:
     632             :         /** \brief Initializes a pool of worker threads.
     633             :          *
     634             :          * This constructor is used to initialize the specified number
     635             :          * of worker threads in this pool.
     636             :          *
     637             :          * At this time, all the worker threads get allocated upfront.
     638             :          *
     639             :          * \todo
     640             :          * We may want to add more threads as the work load increases
     641             :          * which could allow for much less unused threads created. To
     642             :          * do so we want to (1) time the do_work() function to get
     643             :          * an idea of how long a thread takes to perform its work;
     644             :          * and (2) have a threshold so we know when the client wants
     645             :          * to create new worker threads if the load increases instead
     646             :          * of assuming it should happen as soon as data gets added
     647             :          * to the input FIFO. The \p pool_size parameter would then
     648             :          * become a \p max_pool_size.
     649             :          *
     650             :          * \param[in] name  The name of the pool.
     651             :          * \param[in] pool_size  The number of threads to create.
     652             :          * \param[in] in  The input FIFO (where workers receive workload.)
     653             :          * \param[in] out The output FIFO (where finished work is sent.)
     654             :          * \param[in] args  Extra arguments to initialize the workers.
     655             :          */
     656             :         snap_thread_pool<W, A...>(std::string const & name
     657             :                              , size_t pool_size
     658             :                              , typename worker_fifo_t::pointer_t in
     659             :                              , typename worker_fifo_t::pointer_t out
     660             :                              , A... args)
     661             :             : f_name(name)
     662             :             , f_in(in)
     663             :             , f_out(out)
     664             :         {
     665             :             if(pool_size == 0)
     666             :             {
     667             :                 throw snap_thread_exception_invalid_error("the pool size must be a positive number (1 or more)");
     668             :             }
     669             :             if(pool_size > 1000)
     670             :             {
     671             :                 throw snap_thread_exception_invalid_error("pool size too large (we accept up to 1000 at this time, which is already very very large!)");
     672             :             }
     673             :             for(size_t i(0); i < pool_size; ++i)
     674             :             {
     675             :                 f_workers.push_back(std::make_shared<worker_thread_t>(
     676             :                               f_name
     677             :                             , i
     678             :                             , f_in
     679             :                             , f_out
     680             :                             , args...));
     681             :             }
     682             :         }
     683             : 
     684             : 
     685             :         /** \brief Make sure that the thread pool is cleaned up.
     686             :          *
     687             :          * The destructor of the snap thread pool stops all the threads
     688             :          * and then waits on them. Assuming that your code doesn't loop
     689             :          * forever, the result is that all the threads are stopped, joined,
     690             :          * and all the incoming and outgoing data was cleared.
     691             :          *
     692             :          * If you need to get the output data, then make sure to call
     693             :          * stop(), wait(), and pop_front() until it returns false.
     694             :          * Only then can you call the destructor.
     695             :          *
     696             :          * You are safe to run that stop process in your own destructor.
     697             :          */
     698             :         ~snap_thread_pool<W, A...>()
     699             :         {
     700             :             stop(false);
     701             :             wait();
     702             :         }
     703             : 
     704             : 
     705             :         /** \brief Retrieve the number of workers.
     706             :          *
     707             :          * This function returns the number of workers this thread pool
     708             :          * is handling. The number is at least 1 as a thread pool can't
     709             :          * currently be empty.
     710             :          *
     711             :          * \return The number of workers in this thread pool.
     712             :          */
     713             :         size_t size() const
     714             :         {
     715             :             return f_workers.size();
     716             :         }
     717             : 
     718             : 
     719             :         /** \brief Get worker at index `i`.
     720             :          *
     721             :          * This function returns a reference to the worker at index `i`.
     722             :          *
     723             :          * \exception std::range_error
     724             :          * If the index is out of range (negative or larger or equal to
     725             :          * the number of workers) then this exception is raised.
     726             :          *
     727             :          * \param[in] i  The index of the worker to retrieve.
     728             :          *
     729             :          * \return A reference to worker `i`.
     730             :          */
     731             :         W & get_worker(int i)
     732             :         {
     733             :             if(static_cast<size_t>(i) >= f_workers.size())
     734             :             {
     735             :                 throw std::range_error("snap::snap_thread::snap_thread_pool::get_worker() called with an index out of bounds.");
     736             :             }
     737             :             return f_workers[i]->get_worker();
     738             :         }
     739             : 
     740             : 
     741             :         /** \brief Get worker at index `i` (constant version).
     742             :          *
     743             :          * This function returns a reference to the worker at index `i`.
     744             :          *
     745             :          * \exception std::range_error
     746             :          * If the index is out of range (negative or larger or equal to
     747             :          * the number of workers) then this exception is raised.
     748             :          *
     749             :          * \param[in] i  The index of the worker to retrieve.
     750             :          *
     751             :          * \return A reference to worker `i`.
     752             :          */
     753             :         W const & get_worker(int i) const
     754             :         {
     755             :             if(static_cast<size_t>(i) >= f_workers.size())
     756             :             {
     757             :                 throw std::range_error("snap::snap_thread::snap_thread_pool::get_worker() called with an index out of bounds.");
     758             :             }
     759             :             return f_workers[i]->get_worker();
     760             :         }
     761             : 
     762             : 
     763             :         /** \brief Push one work load of data.
     764             :          *
     765             :          * This function adds a work load of data to the input. One of the
     766             :          * worker threads will automatically pick up that work and have
     767             :          * its snap_worker::do_work() function called.
     768             :          *
     769             :          * \param[in] v  The work load of data to add to this pool.
     770             :          *
     771             :          * \sa pop_front()
     772             :          */
     773             :         void push_back(work_load_type const & v)
     774             :         {
     775             :             f_in.push_back(v);
     776             :         }
     777             : 
     778             : 
     779             :         /** \brief Retrieve one work load of processed data.
     780             :          *
     781             :          * This function retrieves one T object from the output FIFO.
     782             :          *
     783             :          * The \p usecs parameter can be set to -1 to wait until output
     784             :          * is received. Set it to 0 to avoid waiting (check for data,
     785             :          * if nothing return immediately) and a positive number to wait
     786             :          * up to that many microseconds.
     787             :          *
     788             :          * Make sure to check the function return value. If false, the
     789             :          * \p v parameter is unmodified.
     790             :          *
     791             :          * Once you called the stop() function, the pop_front() function
     792             :          * can still be called until the out queue is emptied. The proper
     793             :          * sequence is to 
     794             :          *
     795             :          * \param[in] v  A reference where the object gets copied.
     796             :          * \param[in] usecs  The number of microseconds to wait.
     797             :          *
     798             :          * \return true if an object was retrieved.
     799             :          *
     800             :          * \sa snap_mutex::pop_front()
     801             :          */
     802             :         bool pop_front(work_load_type & v, int64_t usecs)
     803             :         {
     804             :             if(f_in.is_done())
     805             :             {
     806             :                 usecs = 0;
     807             :             }
     808             :             return f_out.pop_front(v, usecs);
     809             :         }
     810             : 
     811             : 
     812             :         /** \brief Stop the threads.
     813             :          *
     814             :          * This function is called to ask all the threads to stop.
     815             :          *
     816             :          * When the \p immediate parameter is set to true, whatever is
     817             :          * left in the queue gets removed. This means you are likely to
     818             :          * get invalid or at least incomplete results in the output.
     819             :          * It should only be used if the plan is to cancel the current
     820             :          * work process and trash everything away.
     821             :          *
     822             :          * After a call to the stop() function, you may want to retrieve
     823             :          * the last bits of data with the pop_front() function until
     824             :          * it returns false and then call the join() function to wait
     825             :          * on all the threads and call pop_front() again to make sure
     826             :          * that you got all the output.
     827             :          *
     828             :          * You may also call the join() function right after stop()
     829             :          * and then the pop_front().
     830             :          *
     831             :          * \note
     832             :          * It is not necessary to call pop_front() if you are cancelling
     833             :          * the processing anyway. You can just ignore that data and
     834             :          * it will be deleted as soon as the thread pool gets deleted.
     835             :          *
     836             :          * \note
     837             :          * The function can be called any number of times. It really only
     838             :          * has an effect on the first call, though.
     839             :          *
     840             :          * \param[in] immediate  Whether the clear the remaining items on
     841             :          *                       the queue.
     842             :          */
     843             :         void stop(bool immediate)
     844             :         {
     845             :             if(!f_in->is_done())
     846             :             {
     847             :                 f_in->done(immediate);
     848             :             }
     849             :         }
     850             : 
     851             : 
     852             :         /** \brief Wait on the threads to be done.
     853             :          *
     854             :          * This function waits on all the worker threads until they all
     855             :          * exited. This ensures that all the output (see the pop_front()
     856             :          * function) was generated and you can therefore end your
     857             :          * processing.
     858             :          *
     859             :          * The order of processing is as follow:
     860             :          *
     861             :          * \code
     862             :          *     snap_thread_pool my_pool;
     863             :          *
     864             :          *     ...initialization...
     865             :          *
     866             :          *     data_t in;
     867             :          *     data_t out;
     868             :          *
     869             :          *     while(load_data(in))
     870             :          *     {
     871             :          *         pool.push_back(in);
     872             :          *
     873             :          *         while(pool.pop_front(out))
     874             :          *         {
     875             :          *             // handle out, maybe:
     876             :          *             save_data(out);
     877             :          *         }
     878             :          *     }
     879             :          *
     880             :          *     // no more input
     881             :          *     pool.stop();
     882             :          *
     883             :          *     // optionally, empty the output pipe before the wait()
     884             :          *     while(pool.pop_front(out))
     885             :          *     {
     886             :          *         // handle out...
     887             :          *     }
     888             :          *
     889             :          *     // make sure the workers are done
     890             :          *     pool.wait();
     891             :          *
     892             :          *     // make sure the output is all managed
     893             :          *     while(pool.pop_front(out))
     894             :          *     {
     895             :          *         // handle out...
     896             :          *     }
     897             :          *
     898             :          *     // now we're done
     899             :          * \endcode
     900             :          *
     901             :          * Emptying the output queue between the stop() and wait() is not
     902             :          * required. It may always be empty or you will anyway get only
     903             :          * a few small chunks that can wait in the buffer.
     904             :          *
     905             :          * \note
     906             :          * The function can be called any number of times. After the first
     907             :          * time, though, the vector of worker threads is empty so really
     908             :          * nothing happens.
     909             :          *
     910             :          * \attention
     911             :          * This function can't be called from one of the workers.
     912             :          */
     913             :         void wait()
     914             :         {
     915             :             f_workers.clear();
     916             :         }
     917             : 
     918             : 
     919             :     private:
     920             :         typedef typename worker_thread_t::vector_t  workers_t;
     921             : 
     922             :         std::string const                   f_name;
     923             :         typename worker_fifo_t::pointer_t   f_in;
     924             :         typename worker_fifo_t::pointer_t   f_out;
     925             :         workers_t                           f_workers = workers_t();
     926             :     };
     927             : 
     928             :     class snap_thread_life
     929             :     {
     930             :     public:
     931             :         /** \brief Initialize a "thread life" object.
     932             :          *
     933             :          * This type of objects are used to record a thread and make sure
     934             :          * that it gets destroyed once done with it.
     935             :          *
     936             :          * The constructor makes sure that the specified thread is not
     937             :          * a null pointer and it starts the thread. If the thread is
     938             :          * already running, then the constructor will throw.
     939             :          *
     940             :          * Once such an object was created, it is not possible to prevent
     941             :          * the thread life desrtuctor from calling the stop() function and
     942             :          * waiting for the thread to be done.
     943             :          *
     944             :          * \note
     945             :          * If possible, you should consider using the snap_thread::pointer_t
     946             :          * instead of the snap_thread_life which expects a bare pointer.
     947             :          * There are situations, though, where this class is practical
     948             :          * because you have a thread as a variable member in a class.
     949             :          *
     950             :          * \param[in] thread  The thread which life is to be controlled.
     951             :          */
     952             :         snap_thread_life(snap_thread * const thread)
     953             :             : f_thread(thread)
     954             :         {
     955             :             if(f_thread == nullptr)
     956             :             {
     957             :                 throw snap_logic_exception("snap_thread_life pointer is nullptr");
     958             :             }
     959             :             if(!f_thread->start())
     960             :             {
     961             :                 // we cannot really just generate an error if the thread
     962             :                 // does not start because we do not offer a way for the
     963             :                 // user to know so we have to throw for now
     964             :                 throw snap_thread_exception_not_started("somehow the thread was not started, an error should have been logged");
     965             :             }
     966             :         }
     967             : 
     968             :         snap_thread_life(snap_thread_life const & rhs) = delete;
     969             : 
     970             :         /** \brief Make sure the thread stops.
     971             :          *
     972             :          * This function requests that the attach thread stop. It will block
     973             :          * until such happens. You are responsible to make sure that the
     974             :          * stop happens early on if your own object needs to access the
     975             :          * thread while stopping.
     976             :          */
     977             :         ~snap_thread_life()
     978             :         {
     979             :             //SNAP_LOG_TRACE() << "stopping snap_thread_life...";
     980             :             f_thread->stop();
     981             :             //SNAP_LOG_TRACE() << "snap_thread_life stopped!";
     982             :         }
     983             : 
     984             :         snap_thread_life & operator = (snap_thread_life const & rhs) = delete;
     985             : 
     986             :     private:
     987             :         /** \brief The pointer to the thread being managed.
     988             :          *
     989             :          * This pointer is a pointer to the thread. Once a thread life
     990             :          * object is initialized, the pointer is never nullptr (we
     991             :          * throw before in the constructor if that is the case.)
     992             :          *
     993             :          * The user of the snap_thread_life class must view the
     994             :          * thread pointer as owned by the snap_thread_life object
     995             :          * (similar to a smart pointer.)
     996             :          */
     997             :         snap_thread *           f_thread = nullptr;
     998             :     };
     999             : 
    1000             :                                 snap_thread(std::string const & name, snap_runner * runner);
    1001             :                                 ~snap_thread();
    1002             :                                 snap_thread(snap_thread const & rhs) = delete;
    1003             :                                 snap_thread & operator = (snap_thread const & rhs) = delete;
    1004             : 
    1005             :     std::string const &         get_name() const;
    1006             :     snap_runner *               get_runner() const;
    1007             :     bool                        is_running() const;
    1008             :     bool                        is_stopping() const;
    1009             :     bool                        start();
    1010             :     void                        stop();
    1011             :     bool                        kill(int sig);
    1012             :     pid_t                       get_thread_tid() const;
    1013             :     snap_mutex &                get_thread_mutex() const;
    1014             : 
    1015             :     static int                  get_total_number_of_processors();
    1016             :     static int                  get_number_of_available_processors();
    1017             :     static pid_t                gettid();
    1018             : 
    1019             : private:
    1020             :     // internal function to start the runner
    1021             :     friend void *               func_internal_start(void * thread);
    1022             :     void                        internal_run();
    1023             : 
    1024             :     std::string const           f_name = std::string();
    1025             :     snap_runner *               f_runner = nullptr;
    1026             :     mutable snap_mutex          f_mutex = snap_mutex();
    1027             :     bool                        f_running = false;
    1028             :     bool                        f_started = false;
    1029             :     bool                        f_stopping = false;
    1030             :     pid_t                       f_tid = -1;
    1031             :     pthread_t                   f_thread_id = -1;
    1032             :     pthread_attr_t              f_thread_attr = pthread_attr_t();
    1033             :     std::exception_ptr          f_exception = std::exception_ptr();
    1034             : };
    1035             : 
    1036             : } // namespace snap
    1037             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.13