LCOV - code coverage report
Current view: top level - cppthread - fifo.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 27 34 79.4 %
Date: 2021-08-21 09:27:22 Functions: 18 18 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2013-2021  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // https://snapwebsites.org/project/cppthread
       4             : // contact@m2osw.com
       5             : //
       6             : // This program is free software; you can redistribute it and/or modify
       7             : // it under the terms of the GNU General Public License as published by
       8             : // the Free Software Foundation; either version 2 of the License, or
       9             : // (at your option) any later version.
      10             : //
      11             : // This program is distributed in the hope that it will be useful,
      12             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             : // GNU General Public License for more details.
      15             : //
      16             : // You should have received a copy of the GNU General Public License along
      17             : // with this program; if not, write to the Free Software Foundation, Inc.,
      18             : // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             : #pragma once
      20             : 
      21             : /** \file
      22             :  * \brief Thread Runner and Managers.
      23             :  *
      24             :  * This file includes the declaration and implementation (For templates)
      25             :  * of classes used to manage threads the easy way. Especially, our
      26             :  * implementation is aware of object destructors so a thread manager
      27             :  * (snap_thread) can be destroyed. It will automatically and properly
      28             :  * wait for its runner (the actual system pthread) to exit before
      29             :  * finishing up its and its runner clean up.
      30             :  */
      31             : 
      32             : // self
      33             : //
      34             : #include    "cppthread/guard.h"
      35             : #include    "cppthread/mutex.h"
      36             : 
      37             : 
      38             : // snapdev lib
      39             : //
      40             : #include    <snapdev/not_used.h>
      41             : 
      42             : 
      43             : // C++ lib
      44             : //
      45             : #include    <numeric>
      46             : #include    <deque>
      47             : 
      48             : 
      49             : 
      50             : namespace cppthread
      51             : {
      52             : 
      53             : 
      54             : 
      55             : template<class T>
      56           6 : class fifo
      57             :     : public mutex
      58             : {
      59             : private:
      60             :     typedef std::deque<T>   items_t;
      61             : 
      62             :     // void_t is C++17 so to compile on more system, we have our own definition
      63             :     //
      64             :     template<typename ...> using void_t = void;
      65             : 
      66             : 
      67             :     // the following templates are used to know whether class T has a
      68             :     // valid_workload() function returning a bool and if so, we'll use
      69             :     // it to know whether an item is ready to be popped.
      70             :     //
      71             :     template<typename, typename, typename = void_t<>>
      72             :         struct item_has_predicate
      73             :             : public std::false_type
      74             :     {
      75             :     };
      76             : 
      77             :     template<typename C, typename R, typename... A>
      78             :         struct item_has_predicate<C, R(A...),
      79             :                 void_t<decltype(std::declval<C>().valid_workload(std::declval<A>()...))>>
      80             :             : public std::is_same<decltype(std::declval<C>().valid_workload(std::declval<A>()...)), R>
      81             :     {
      82             :     };
      83             : 
      84             :     template<typename C>
      85             :         struct is_shared_ptr
      86             :             : std::false_type
      87             :     {
      88             :     };
      89             : 
      90             :     template<typename C>
      91             :         struct is_shared_ptr<std::shared_ptr<C>>
      92             :             : std::true_type
      93             :     {
      94             :     };
      95             : 
      96             :     /** \brief Validate item.
      97             :      *
      98             :      * This function checks whether the T::valid_workload() function
      99             :      * says the item can be processed now or not.
     100             :      *
     101             :      * In this case, the class C is not a shared pointer.
     102             :      *
     103             :      * \tparam C  The type of the item.
     104             :      * \param[in] item  The item to verify.
     105             :      *
     106             :      * \return true if the valid_workload() returns true, false otherwise.
     107             :      */
     108             :     template<typename C>
     109             :     typename std::enable_if<!is_shared_ptr<C>::value
     110             :                         && item_has_predicate<C, bool()>::value
     111             :                 , bool>::type
     112             :         validate_item(C const & item)
     113             :     {
     114             :         return item.valid_workload();
     115             :     }
     116             : 
     117             :     /** \brief Validate item.
     118             :      *
     119             :      * This function always returns true. It is used when the item does
     120             :      * not have a valid_workload() function defined.
     121             :      *
     122             :      * \tparam C  The type of the item.
     123             :      * \param[in] item  The item to verify.
     124             :      *
     125             :      * \return Always true.
     126             :      */
     127             :     template<typename C>
     128             :     typename std::enable_if<!is_shared_ptr<C>::value
     129             :                          && !item_has_predicate<C, bool()>::value
     130             :                 , bool>::type
     131             :         validate_item(C const & item)
     132             :     {
     133             :         snap::NOT_USED(item);
     134             :         return true;
     135             :     }
     136             : 
     137             :     /** \brief Validate item.
     138             :      *
     139             :      * This function checks whether the T::valid_workload() function
     140             :      * says the item can be processed now or not.
     141             :      *
     142             :      * In this case, the class C is a shared pointer to an item T.
     143             :      *
     144             :      * \tparam C  The type of the item.
     145             :      * \param[in] item  The item to verify.
     146             :      *
     147             :      * \return Always true.
     148             :      */
     149             :     template<typename C>
     150             :     typename std::enable_if<is_shared_ptr<C>::value
     151             :                         && item_has_predicate<typename C::element_type, bool()>::value
     152             :                 , bool>::type
     153          65 :         validate_item(C const & item)
     154             :     {
     155          65 :         return item->valid_workload();
     156             :     }
     157             : 
     158             :     /** \brief Validate item.
     159             :      *
     160             :      * This function always returns true. It is used when the item is a
     161             :      * shared pointer and does not have a valid_workload() function defined.
     162             :      *
     163             :      * \tparam C  The type of the item.
     164             :      * \param[in] item  The item to verify.
     165             :      *
     166             :      * \return Always true.
     167             :      */
     168             :     template<typename C>
     169             :     typename std::enable_if<is_shared_ptr<C>::value
     170             :                          && !item_has_predicate<typename C::element_type, bool()>::value
     171             :                 , bool>::type
     172             :         validate_item(C const & item)
     173             :     {
     174             :         snap::NOT_USED(item);
     175             :         return true;
     176             :     }
     177             : 
     178             : public:
     179             :     typedef T                               value_type;
     180             :     typedef fifo<value_type>                fifo_type;
     181             :     typedef std::shared_ptr<fifo_type>      pointer_t;
     182             : 
     183          32 :     bool push_back(T const & v)
     184             :     {
     185          64 :         guard lock(*this);
     186          32 :         if(f_done)
     187             :         {
     188           0 :             return false;
     189             :         }
     190          32 :         f_queue.push_back(v);
     191          32 :         signal();
     192          32 :         return true;
     193             :     }
     194             : 
     195          33 :     bool pop_front(T & v, int64_t const usecs)
     196             :     {
     197          66 :         guard lock(*this);
     198             : 
     199          66 :         auto cleanup = [&]()
     200             :             {
     201          33 :                 if(f_done && !f_broadcast && f_queue.empty())
     202             :                 {
     203             :                     // make sure all the threads wake up on this new
     204             :                     // "queue is empty" status
     205             :                     //
     206           0 :                     broadcast();
     207           0 :                     f_broadcast = true;
     208             :                 }
     209          33 :             };
     210             : 
     211           0 :         for(;;)
     212             :         {
     213             :             // search for an item we can pop now
     214             :             //
     215          66 :             for(auto it(f_queue.begin()); it != f_queue.end(); ++it)
     216             :             {
     217          65 :                 bool const result(validate_item<T>(*it));
     218          65 :                 if(result)
     219             :                 {
     220          32 :                     v = *it;
     221          32 :                     f_queue.erase(it);
     222          32 :                     cleanup();
     223          32 :                     return true;
     224             :                 }
     225             :             }
     226             : 
     227           1 :             if(f_done)
     228             :             {
     229           0 :                 break;
     230             :             }
     231             : 
     232             :             // when no items can be returned, wait a bit if possible
     233             :             // and try again
     234             :             //
     235           1 :             if(usecs == -1)
     236             :             {
     237             :                 // wait until signal() wakes us up
     238             :                 //
     239           0 :                 wait();
     240             :             }
     241           1 :             else if(usecs > 0)
     242             :             {
     243           0 :                 timed_wait(usecs);
     244             :             }
     245             :             else // if(usecs == 0)
     246             :             {
     247             :                 // do not wait
     248             :                 //
     249           1 :                 break;
     250             :             }
     251             :         }
     252           1 :         cleanup();
     253           1 :         return false;
     254             :     }
     255             : 
     256             :     void clear()
     257             :     {
     258             :         guard lock(*this);
     259             :         items_t empty;
     260             :         f_queue.swap(empty);
     261             :     }
     262             : 
     263             :     bool empty() const
     264             :     {
     265             :         guard lock(const_cast<fifo &>(*this));
     266             :         return f_queue.empty();
     267             :     }
     268             : 
     269             :     size_t size() const
     270             :     {
     271             :         guard lock(const_cast<fifo &>(*this));
     272             :         return f_queue.size();
     273             :     }
     274             : 
     275             :     size_t byte_size() const
     276             :     {
     277             :         guard lock(const_cast<fifo &>(*this));
     278             :         return std::accumulate(
     279             :                     f_queue.begin(),
     280             :                     f_queue.end(),
     281             :                     0,
     282             :                     [](size_t accum, T const & obj)
     283             :                     {
     284             :                         return accum + obj.size();
     285             :                     });
     286             :     }
     287             : 
     288             :     void done(bool clear)
     289             :     {
     290             :         guard lock(*this);
     291             :         f_done = true;
     292             :         if(clear)
     293             :         {
     294             :             items_t empty;
     295             :             f_queue.swap(empty);
     296             :         }
     297             :         if(f_queue.empty())
     298             :         {
     299             :             broadcast();
     300             :             f_broadcast = true;
     301             :         }
     302             :     }
     303             : 
     304             :     bool is_done() const
     305             :     {
     306             :         guard lock(const_cast<fifo &>(*this));
     307             :         return f_done;
     308             :     }
     309             : 
     310             : private:
     311             :     items_t                 f_queue = items_t();
     312             :     bool                    f_done = false;
     313             :     bool                    f_broadcast = false;
     314             : };
     315             : 
     316             : 
     317             : 
     318             : } // namespace cppthread
     319             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.13