LCOV - code coverage report
Current view: top level - eventdispatcher - communicator.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1 144 0.7 %
Date: 2019-08-08 02:52:36 Functions: 2 9 22.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2012-2019  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // This program is free software; you can redistribute it and/or modify
       4             : // it under the terms of the GNU General Public License as published by
       5             : // the Free Software Foundation; either version 2 of the License, or
       6             : // (at your option) any later version.
       7             : //
       8             : // This program is distributed in the hope that it will be useful,
       9             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      10             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      11             : // GNU General Public License for more details.
      12             : //
      13             : // You should have received a copy of the GNU General Public License
      14             : // along with this program; if not, write to the Free Software
      15             : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      16             : 
      17             : /** \file
      18             :  * \brief Implementation of the Snap Communicator class.
      19             :  *
      20             :  * This class wraps the C poll() interface in a C++ object with many types
      21             :  * of objects:
      22             :  *
      23             :  * \li Server Connections; for software that want to offer a port to
      24             :  *     which clients can connect to; the server will call accept()
      25             :  *     once a new client connection is ready; this results in a
      26             :  *     Server/Client connection object
      27             :  * \li Client Connections; for software that want to connect to
      28             :  *     a server; these expect the IP address and port to connect to
      29             :  * \li Server/Client Connections; for the server when it accepts a new
      30             :  *     connection; in this case the server gets a socket from accept()
      31             :  *     and creates one of these objects to handle the connection
      32             :  *
      33             :  * Using the poll() function is the easiest and allows us to listen
      34             :  * on pretty much any number of sockets (on my server it is limited
      35             :  * at 16,768 and frankly over 1,000 we probably will start to have
      36             :  * real slowness issues on small VPN servers.)
      37             :  */
      38             : 
      39             : // to get the POLLRDHUP definition
      40             : #ifndef _GNU_SOURCE
      41             : #define _GNU_SOURCE
      42             : #endif
      43             : 
      44             : 
      45             : // self
      46             : //
      47             : #include    "eventdispatcher/communicator.h"
      48             : 
      49             : #include    "eventdispatcher/exception.h"
      50             : #include    "eventdispatcher/signal.h"
      51             : #include    "eventdispatcher/utils.h"
      52             : 
      53             : 
      54             : // cppthread lib
      55             : //
      56             : #include    <cppthread/guard.h>
      57             : #include    <cppthread/mutex.h>
      58             : 
      59             : 
      60             : // snaplogger lib
      61             : //
      62             : #include    <snaplogger/message.h>
      63             : 
      64             : 
      65             : //// snapdev lib
      66             : ////
      67             : //#include "snapdev/not_reached.h"
      68             : //#include "snapdev/not_used.h"
      69             : //#include "snapdev/string_replace_many.h"
      70             : //
      71             : //
      72             : //// libaddr lib
      73             : ////
      74             : //#include "libaddr/addr_parser.h"
      75             : 
      76             : 
      77             : // C++ lib
      78             : //
      79             : #include <algorithm>
      80             : //#include <limits>
      81             : //#include <atomic>
      82             : 
      83             : // C lib
      84             : //
      85             : //#include <fcntl.h>
      86             : #include <poll.h>
      87             : //#include <unistd.h>
      88             : //#include <sys/eventfd.h>
      89             : //#include <sys/inotify.h>
      90             : //#include <sys/ioctl.h>
      91             : #include <sys/resource.h>
      92             : //#include <sys/syscall.h>
      93             : //#include <sys/time.h>
      94             : 
      95             : 
      96             : // last include
      97             : //
      98             : #include <snapdev/poison.h>
      99             : 
     100             : 
     101             : 
     102             : namespace ed
     103             : {
     104             : namespace
     105             : {
     106             : 
     107             : 
     108             : /** \brief The instance of the communicator singleton.
     109             :  *
     110             :  * This pointer is the one instance of the communicator
     111             :  * we create to run an event loop.
     112             :  */
     113             : communicator::pointer_t *           g_instance = nullptr;
     114             : 
     115             : 
     116             : } // no name namespace
     117             : 
     118             : 
     119             : 
     120             : 
     121             : 
     122             : /** \brief Initialize a snap communicator object.
     123             :  *
     124             :  * This function initializes the communicator object.
     125             :  */
     126           0 : communicator::communicator()
     127             :     //: f_connections() -- auto-init
     128             :     //, f_force_sort(true) -- auto-init
     129             : {
     130           0 : }
     131             : 
     132             : 
     133             : /** \brief Retrieve the instance() of the communicator.
     134             :  *
     135             :  * This function returns the instance of the communicator.
     136             :  * There is really no reason and it could also create all sorts
     137             :  * of problems to have more than one instance hence we created
     138             :  * the communicator as a singleton. It also means you cannot
     139             :  * actually delete the communicator.
     140             :  *
     141             :  * The initialization fo the communicator instance is thread
     142             :  * safe.
     143             :  */
     144           0 : communicator::pointer_t communicator::instance()
     145             : {
     146           0 :     cppthread::guard g(*cppthread::g_system_mutex);
     147             : 
     148           0 :     if(g_instance == nullptr)
     149             :     {
     150           0 :         g_instance = new communicator::pointer_t();
     151           0 :         g_instance->reset(new communicator());
     152             :     }
     153             : 
     154           0 :     return *g_instance;
     155             : }
     156             : 
     157             : 
     158             : /** \brief Retrieve a reference to the vector of connections.
     159             :  *
     160             :  * This function returns a reference to all the connections that are
     161             :  * currently attached to the communicator system.
     162             :  *
     163             :  * This is useful to search the array.
     164             :  *
     165             :  * \return The vector of connections.
     166             :  */
     167           0 : connection::vector_t const & communicator::get_connections() const
     168             : {
     169           0 :     return f_connections;
     170             : }
     171             : 
     172             : 
     173             : /** \brief Attach a connection to the communicator.
     174             :  *
     175             :  * This function attaches a connection to the communicator. This allows
     176             :  * us to execute code for that connection by having the process_signal()
     177             :  * function called.
     178             :  *
     179             :  * Connections are kept in the order in which they are added. This may
     180             :  * change the order in which connection callbacks are called. However,
     181             :  * events are received asynchronously so do not expect callbacks to be
     182             :  * called in any specific order.
     183             :  *
     184             :  * You may call this function with a null pointer. It simply returns
     185             :  * false immediately. This makes it easy to eventually allocate a
     186             :  * new connection and then use the return value of this function
     187             :  * to know whether the two step process worked or not.
     188             :  *
     189             :  * \note
     190             :  * A connection can only be added once to a communicator object.
     191             :  * Also it cannot be shared between multiple communicator objects.
     192             :  *
     193             :  * \param[in] connection  The connection being added.
     194             :  *
     195             :  * \return true if the connection was added, false if the connection
     196             :  *         was already present in the communicator list of connections.
     197             :  */
     198           0 : bool communicator::add_connection(connection::pointer_t connection)
     199             : {
     200           0 :     if(connection == nullptr)
     201             :     {
     202           0 :         return false;
     203             :     }
     204             : 
     205           0 :     if(!connection->valid_socket())
     206             :     {
     207             :         throw event_dispatcher_invalid_parameter(
     208             :             "communicator::add_connection(): connection without a socket"
     209           0 :             " cannot be added to a communicator object.");
     210             :     }
     211             : 
     212           0 :     auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
     213           0 :     if(it != f_connections.end())
     214             :     {
     215             :         // already added, can be added only once but we allow multiple
     216             :         // calls (however, we do not count those calls, so first call
     217             :         // to the remove_connection() does remove it!)
     218             :         //
     219           0 :         return false;
     220             :     }
     221             : 
     222           0 :     f_connections.push_back(connection);
     223             : 
     224           0 :     connection->connection_added();
     225             : 
     226           0 :     return true;
     227             : }
     228             : 
     229             : 
     230             : /** \brief Remove a connection from a communicator object.
     231             :  *
     232             :  * This function removes a connection from this communicator object.
     233             :  * Note that any one connection can only be added once.
     234             :  *
     235             :  * \param[in] connection  The connection to remove from this communicator.
     236             :  *
     237             :  * \return true if the connection was removed, false if it was not found.
     238             :  */
     239           0 : bool communicator::remove_connection(connection::pointer_t connection)
     240             : {
     241           0 :     auto it(std::find(f_connections.begin(), f_connections.end(), connection));
     242           0 :     if(it == f_connections.end())
     243             :     {
     244           0 :         return false;
     245             :     }
     246             : 
     247             :     SNAP_LOG_TRACE
     248           0 :         << "removing 1 connection, \""
     249           0 :         << connection->get_name()
     250           0 :         << "\", of "
     251           0 :         << f_connections.size()
     252           0 :         << " connections (including this one.)";
     253             : 
     254           0 :     f_connections.erase(it);
     255             : 
     256           0 :     connection->connection_removed();
     257             : 
     258             : #if 0
     259             : #ifdef _DEBUG
     260             : std::for_each(
     261             :           f_connections.begin()
     262             :         , f_connections.end()
     263             :         , [](auto const & c)
     264             :         {
     265             :             SNAP_LOG_TRACE("communicator::remove_connection(): remaining connection: \"")(c->get_name())("\"");
     266             :         });
     267             : #endif
     268             : #endif
     269             : 
     270           0 :     return true;
     271             : }
     272             : 
     273             : 
     274             : /** \brief Set the Force Sort flag to \p status.
     275             :  *
     276             :  * This function can be called to force the run() function to sort (or not
     277             :  * sort) the list of connections.
     278             :  *
     279             :  * Since the sort function is somewhat expensive, the sort changes the
     280             :  * vector of connections in place. Then only a change of priority
     281             :  * triggers a request for the vector to be sorted again.
     282             :  *
     283             :  * This function can be used in the event you need to force a trigger.
     284             :  * It would be unlikely that you would call this function with false.
     285             :  *
     286             :  * \param[in] status  The new status of the force sort flag.
     287             :  */
     288           0 : void communicator::set_force_sort(bool status)
     289             : {
     290           0 :     f_force_sort = status;
     291           0 : }
     292             : 
     293             : 
     294             : /** \brief Run until all connections are removed.
     295             :  *
     296             :  * This function "blocks" until all the events added to this
     297             :  * communicator instance are removed. Until then, it
     298             :  * wakes up and run callback functions whenever an event occurs.
     299             :  *
     300             :  * In other words, you want to add_connection() before you call
     301             :  * this function otherwise the function returns immediately.
     302             :  *
     303             :  * Note that you can include timeout events so if you need to
     304             :  * run some code once in a while, you may just use a timeout
     305             :  * event and process your repetitive events that way.
     306             :  *
     307             :  * \return true if the loop exits because the list of connections is empty.
     308             :  */
     309           0 : bool communicator::run()
     310             : {
     311             :     // the loop promises to exit once the even_base object has no
     312             :     // more connections attached to it
     313             :     //
     314           0 :     std::vector<bool> enabled;
     315           0 :     std::vector<struct pollfd> fds;
     316           0 :     f_force_sort = true;
     317           0 :     for(;;)
     318             :     {
     319             :         // any connections?
     320           0 :         if(f_connections.empty())
     321             :         {
     322           0 :             return true;
     323             :         }
     324             : 
     325           0 :         if(f_force_sort)
     326             :         {
     327             :             // sort the connections by priority
     328             :             //
     329           0 :             std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
     330           0 :             f_force_sort = false;
     331             :         }
     332             : 
     333             :         // make a copy because the callbacks may end up making
     334             :         // changes to the main list and we would have problems
     335             :         // with that here...
     336             :         //
     337           0 :         connection::vector_t connections(f_connections);
     338           0 :         size_t max_connections(connections.size());
     339             : 
     340             :         // timeout is do not time out by default
     341             :         //
     342           0 :         std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
     343             : 
     344             :         // clear() is not supposed to delete the buffer of vectors
     345             :         //
     346           0 :         enabled.clear();
     347           0 :         fds.clear();
     348           0 :         fds.reserve(max_connections); // avoid more than 1 allocation
     349           0 :         for(size_t idx(0); idx < max_connections; ++idx)
     350             :         {
     351           0 :             connection::pointer_t c(connections[idx]);
     352           0 :             c->f_fds_position = -1;
     353             : 
     354             :             // is the connection enabled?
     355             :             //
     356             :             // note that we save that value for later use in our loop
     357             :             // below because otherwise we will miss many events and
     358             :             // it tends to break things; that means you may get your
     359             :             // callback called even while disabled
     360             :             //
     361           0 :             enabled.push_back(c->is_enabled());
     362           0 :             if(!enabled[idx])
     363             :             {
     364             :                 //SNAP_LOG_TRACE("communicator::run(): connection '")(c->get_name())("' has been disabled, so ignored.");
     365           0 :                 continue;
     366             :             }
     367             : //SNAP_LOG_TRACE("communicator::run(): handling connection ")(idx)("/")(max_connections)(". '")(c->get_name())("' since it is enabled...");
     368             : 
     369             :             // check whether a timeout is defined in this connection
     370             :             //
     371           0 :             int64_t const timestamp(c->save_timeout_timestamp());
     372           0 :             if(timestamp != -1)
     373             :             {
     374             :                 // the timeout event gives us a time when to tick
     375             :                 //
     376           0 :                 if(timestamp < next_timeout_timestamp)
     377             :                 {
     378           0 :                     next_timeout_timestamp = timestamp;
     379             :                 }
     380             :             }
     381             : 
     382             :             // is there any events to listen on?
     383           0 :             int e(0);
     384           0 :             if(c->is_listener() || c->is_signal())
     385             :             {
     386           0 :                 e |= POLLIN;
     387             :             }
     388           0 :             if(c->is_reader())
     389             :             {
     390           0 :                 e |= POLLIN | POLLPRI | POLLRDHUP;
     391             :             }
     392           0 :             if(c->is_writer())
     393             :             {
     394           0 :                 e |= POLLOUT | POLLRDHUP;
     395             :             }
     396           0 :             if(e == 0)
     397             :             {
     398             :                 // this should only happend on snap_timer objects
     399             :                 //
     400           0 :                 continue;
     401             :             }
     402             : 
     403             :             // do we have a currently valid socket? (i.e. the connection
     404             :             // may have been closed or we may be handling a timer or
     405             :             // signal object)
     406             :             //
     407           0 :             if(c->get_socket() < 0)
     408             :             {
     409           0 :                 continue;
     410             :             }
     411             : 
     412             :             // this is considered valid, add this connection to the list
     413             :             //
     414             :             // save the position since we may skip some entries...
     415             :             // (otherwise we would have to use -1 as the socket to
     416             :             // allow for such dead entries, but avoiding such entries
     417             :             // saves time)
     418             :             //
     419           0 :             c->f_fds_position = fds.size();
     420             : 
     421             : //SNAP_LOG_ERROR("*** still waiting on \"")(c->get_name())("\".");
     422             :             struct pollfd fd;
     423           0 :             fd.fd = c->get_socket();
     424           0 :             fd.events = e;
     425           0 :             fd.revents = 0; // probably useless... (kernel should clear those)
     426           0 :             fds.push_back(fd);
     427             :         }
     428             : 
     429             :         // compute the right timeout
     430           0 :         std::int64_t timeout(-1);
     431           0 :         if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
     432             :         {
     433           0 :             std::int64_t const now(get_current_date());
     434           0 :             timeout = next_timeout_timestamp - now;
     435           0 :             if(timeout < 0)
     436             :             {
     437             :                 // timeout is in the past so timeout immediately, but
     438             :                 // still check for events if any
     439           0 :                 timeout = 0;
     440             :             }
     441             :             else
     442             :             {
     443             :                 // convert microseconds to milliseconds for poll()
     444           0 :                 timeout /= 1000;
     445           0 :                 if(timeout == 0)
     446             :                 {
     447             :                     // less than one is a waste of time (CPU intenssive
     448             :                     // until the time is reached, we can be 1 ms off
     449             :                     // instead...)
     450           0 :                     timeout = 1;
     451             :                 }
     452             :             }
     453             :         }
     454           0 :         else if(fds.empty())
     455             :         {
     456             :             SNAP_LOG_FATAL
     457           0 :                 << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
     458           0 :                 << max_connections
     459           0 :                 << " and exiting the run() loop anyway.)";
     460           0 :             return false;
     461             :         }
     462             : 
     463             : //SNAP_LOG_TRACE << "communicator::run(): "
     464             : //               << "count " << fds.size()
     465             : //               << "timeout " << timeout
     466             : //               << " (next was: " << next_timeout_timestamp
     467             : //               << ", current ~ " << get_current_date()
     468             : //               << ")";
     469             : 
     470             :         // TODO: add support for ppoll() so we can support signals cleanly
     471             :         //       with nearly no additional work from us
     472             :         //
     473           0 :         errno = 0;
     474           0 :         int const r(poll(&fds[0], fds.size(), timeout));
     475           0 :         if(r >= 0)
     476             :         {
     477             :             // quick sanity check
     478             :             //
     479           0 :             if(static_cast<size_t>(r) > connections.size())
     480             :             {
     481           0 :                 throw event_dispatcher_runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
     482             :             }
     483             :             //SNAP_LOG_TRACE
     484             :             //    <<"tid="
     485             :             //    << gettid()
     486             :             //    << ", communicator::run(): ------------------- new set of "
     487             :             //    << r
     488             :             //    << " events to handle";
     489             : 
     490             :             // check each connection one by one for:
     491             :             //
     492             :             // 1) fds events, including signals
     493             :             // 2) timeouts
     494             :             //
     495             :             // and execute the corresponding callbacks
     496             :             //
     497           0 :             for(size_t idx(0); idx < connections.size(); ++idx)
     498             :             {
     499           0 :                 connection::pointer_t c(connections[idx]);
     500             : 
     501             :                 // is the connection enabled?
     502             :                 //
     503             :                 // note that we check whether that connection was enabled
     504             :                 // before poll() was called; this is very important because
     505             :                 // the last poll() events must be run even if a previous
     506             :                 // callback call just disabled this very connection
     507             :                 // (i.e. at the time we called poll() the connection was
     508             :                 // still enabled and therefore we are expected to call
     509             :                 // their callbacks even if it just got disabled by an
     510             :                 // earlier callback)
     511             :                 //
     512           0 :                 if(!enabled[idx])
     513             :                 {
     514             :                     //SNAP_LOG_TRACE
     515             :                     //    << "communicator::run(): in loop, connection '"
     516             :                     //    << c->get_name()
     517             :                     //    << "' has been disabled, so ignored!";
     518           0 :                     continue;
     519             :                 }
     520             : 
     521             :                 // if we have a valid fds position then an event other
     522             :                 // than a timeout occurred on that connection
     523             :                 //
     524           0 :                 if(c->f_fds_position >= 0)
     525             :                 {
     526           0 :                     struct pollfd * fd(&fds[c->f_fds_position]);
     527             : 
     528             :                     // if any events were found by poll(), process them now
     529             :                     //
     530           0 :                     if(fd->revents != 0)
     531             :                     {
     532             :                         // an event happened on this one
     533             :                         //
     534           0 :                         if((fd->revents & (POLLIN | POLLPRI)) != 0)
     535             :                         {
     536             :                             // we consider that Unix signals have the greater priority
     537             :                             // and thus handle them first
     538             :                             //
     539           0 :                             if(c->is_signal())
     540             :                             {
     541           0 :                                 signal * ss(dynamic_cast<signal *>(c.get()));
     542           0 :                                 if(ss)
     543             :                                 {
     544           0 :                                     ss->process();
     545             :                                 }
     546             :                             }
     547           0 :                             else if(c->is_listener())
     548             :                             {
     549             :                                 // a listener is a special case and we want
     550             :                                 // to call process_accept() instead
     551             :                                 //
     552           0 :                                 c->process_accept();
     553             :                             }
     554             :                             else
     555             :                             {
     556           0 :                                 c->process_read();
     557             :                             }
     558             :                         }
     559           0 :                         if((fd->revents & POLLOUT) != 0)
     560             :                         {
     561           0 :                             c->process_write();
     562             :                         }
     563           0 :                         if((fd->revents & POLLERR) != 0)
     564             :                         {
     565           0 :                             c->process_error();
     566             :                         }
     567           0 :                         if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
     568             :                         {
     569           0 :                             c->process_hup();
     570             :                         }
     571           0 :                         if((fd->revents & POLLNVAL) != 0)
     572             :                         {
     573           0 :                             c->process_invalid();
     574             :                         }
     575             :                     }
     576             :                 }
     577             : 
     578             :                 // now check whether we have a timeout on this connection
     579             :                 //
     580           0 :                 int64_t const timestamp(c->get_saved_timeout_timestamp());
     581           0 :                 if(timestamp != -1)
     582             :                 {
     583           0 :                     int64_t const now(get_current_date());
     584           0 :                     if(now >= timestamp)
     585             :                     {
     586             : //SNAP_LOG_TRACE
     587             : //    << "communicator::run(): timer of connection = '"<< c->get_name()
     588             : //    << "', timestamp = " << timestamp
     589             : //    << ", now = " << now
     590             : //    << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE");
     591             : 
     592             :                         // move the timeout as required first
     593             :                         // (because the callback may move it again)
     594             :                         //
     595           0 :                         c->calculate_next_tick();
     596             : 
     597             :                         // the timeout date needs to be reset if the tick
     598             :                         // happened for that date
     599             :                         //
     600           0 :                         if(now >= c->get_timeout_date())
     601             :                         {
     602           0 :                             c->set_timeout_date(-1);
     603             :                         }
     604             : 
     605             :                         // then run the callback
     606             :                         //
     607           0 :                         c->process_timeout();
     608             :                     }
     609             :                 }
     610             :             }
     611             :         }
     612             :         else
     613             :         {
     614             :             // r < 0 means an error occurred
     615             :             //
     616           0 :             if(errno == EINTR)
     617             :             {
     618             :                 // Note: if the user wants to prevent this error, he should
     619             :                 //       use the snap_signal with the Unix signals that may
     620             :                 //       happen while calling poll().
     621             :                 //
     622           0 :                 throw event_dispatcher_runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
     623             :             }
     624           0 :             if(errno == EFAULT)
     625             :             {
     626           0 :                 throw event_dispatcher_invalid_parameter("communicator::run(): buffer was moved out of our address space?");
     627             :             }
     628           0 :             if(errno == EINVAL)
     629             :             {
     630             :                 // if this is really because nfds is too large then it may be
     631             :                 // a "soft" error that can be fixed; that being said, my
     632             :                 // current version is 16K files which frankly when we reach
     633             :                 // that level we have a problem...
     634             :                 //
     635             :                 struct rlimit rl;
     636           0 :                 getrlimit(RLIMIT_NOFILE, &rl);
     637             :                 throw event_dispatcher_invalid_parameter(
     638             :                             "communicator::run(): too many file fds for poll, limit is currently "
     639           0 :                           + std::to_string(rl.rlim_cur)
     640           0 :                           + ", your kernel top limit is "
     641           0 :                           + std::to_string(rl.rlim_max));
     642             :             }
     643           0 :             if(errno == ENOMEM)
     644             :             {
     645           0 :                 throw event_dispatcher_runtime_error("communicator::run(): poll() failed because of memory");
     646             :             }
     647           0 :             int const e(errno);
     648             :             throw event_dispatcher_runtime_error(
     649             :                       "communicator::run(): poll() failed with error "
     650           0 :                       + std::to_string(e)
     651           0 :                       + " -- "
     652           0 :                       + strerror(e));
     653             :         }
     654             :     }
     655             : }
     656             : 
     657             : 
     658             : 
     659           6 : } // namespace ed
     660             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.12