LCOV - code coverage report
Current view: top level - eventdispatcher - communicator.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 119 173 68.8 %
Date: 2022-06-18 10:10:36 Functions: 8 13 61.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2012-2022  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // https://snapwebsites.org/project/eventdispatcher
       4             : // contact@m2osw.com
       5             : //
       6             : // This program is free software; you can redistribute it and/or modify
       7             : // it under the terms of the GNU General Public License as published by
       8             : // the Free Software Foundation; either version 2 of the License, or
       9             : // (at your option) any later version.
      10             : //
      11             : // This program is distributed in the hope that it will be useful,
      12             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             : // GNU General Public License for more details.
      15             : //
      16             : // You should have received a copy of the GNU General Public License
      17             : // along with this program; if not, write to the Free Software
      18             : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      19             : 
      20             : /** \file
      21             :  * \brief Implementation of the Snap Communicator class.
      22             :  *
      23             :  * This class wraps the C poll() interface in a C++ object with many types
      24             :  * of objects:
      25             :  *
      26             :  * \li Server Connections; for software that want to offer a port to
      27             :  *     which clients can connect to; the server will call accept()
      28             :  *     once a new client connection is ready; this results in a
      29             :  *     Server/Client connection object
      30             :  * \li Client Connections; for software that want to connect to
      31             :  *     a server; these expect the IP address and port to connect to
      32             :  * \li Server/Client Connections; for the server when it accepts a new
      33             :  *     connection; in this case the server gets a socket from accept()
      34             :  *     and creates one of these objects to handle the connection
      35             :  *
      36             :  * Using the poll() function is the easiest and allows us to listen
      37             :  * on pretty much any number of sockets (on my server it is limited
      38             :  * at 16,768 and frankly over 1,000 we probably will start to have
      39             :  * real slowness issues on small VPN servers.)
      40             :  */
      41             : 
      42             : // to get the POLLRDHUP definition
      43             : #ifndef _GNU_SOURCE
      44             : #define _GNU_SOURCE
      45             : #endif
      46             : 
      47             : 
      48             : // self
      49             : //
      50             : #include    "eventdispatcher/communicator.h"
      51             : 
      52             : #include    "eventdispatcher/exception.h"
      53             : #include    "eventdispatcher/signal.h"
      54             : #include    "eventdispatcher/utils.h"
      55             : 
      56             : 
      57             : // cppthread lib
      58             : //
      59             : #include    <cppthread/guard.h>
      60             : #include    <cppthread/mutex.h>
      61             : #include    <cppthread/thread.h>
      62             : 
      63             : 
      64             : // snapdev lib
      65             : //
      66             : #include    <snapdev/safe_variable.h>
      67             : 
      68             : 
      69             : // snaplogger lib
      70             : //
      71             : #include    <snaplogger/message.h>
      72             : 
      73             : 
      74             : // C++ lib
      75             : //
      76             : #include    <algorithm>
      77             : #include    <cstring>
      78             : #include    <limits>
      79             : 
      80             : 
      81             : // C lib
      82             : //
      83             : #include    <poll.h>
      84             : #include    <sys/resource.h>
      85             : 
      86             : 
      87             : // last include
      88             : //
      89             : #include    <snapdev/poison.h>
      90             : 
      91             : 
      92             : 
      93             : namespace ed
      94             : {
      95             : namespace
      96             : {
      97             : 
      98             : 
      99             : /** \brief The instance of the communicator singleton.
     100             :  *
     101             :  * This pointer is the one instance of the communicator
     102             :  * we create to run an event loop.
     103             :  */
     104             : communicator::pointer_t *           g_instance = nullptr;
     105             : 
     106             : 
     107             : } // no name namespace
     108             : 
     109             : 
     110             : 
     111             : 
     112             : 
     113             : /** \brief Initialize a communicator object.
     114             :  *
     115             :  * This function initializes the communicator object.
     116             :  */
     117           1 : communicator::communicator()
     118             : {
     119           1 : }
     120             : 
     121             : 
     122             : /** \brief Retrieve the instance() of the communicator.
     123             :  *
     124             :  * This function returns the instance of the communicator.
     125             :  * There is really no reason and it could also create all sorts
     126             :  * of problems to have more than one instance hence we created
     127             :  * the communicator as a singleton. It also means you cannot
     128             :  * actually delete the communicator.
     129             :  *
     130             :  * The initialization of the communicator instance is thread
     131             :  * safe.
     132             :  *
     133             :  * \return The communicator pointer.
     134             :  */
     135          48 : communicator::pointer_t communicator::instance()
     136             : {
     137          96 :     cppthread::guard g(*cppthread::g_system_mutex);
     138             : 
     139          48 :     if(g_instance == nullptr)
     140             :     {
     141             :         // `communicator` constructor is private so we can't use
     142             :         // the std::make_shared<>
     143             :         //
     144           1 :         g_instance = new communicator::pointer_t();
     145           1 :         g_instance->reset(new communicator());
     146             :     }
     147             : 
     148          96 :     return *g_instance;
     149             : }
     150             : 
     151             : 
     152             : /** \brief Retrieve a reference to the vector of connections.
     153             :  *
     154             :  * This function returns a reference to all the connections that are
     155             :  * currently attached to the communicator system.
     156             :  *
     157             :  * This is useful to search the array.
     158             :  *
     159             :  * \return The vector of connections.
     160             :  */
     161           0 : connection::vector_t const & communicator::get_connections() const
     162             : {
     163           0 :     return f_connections;
     164             : }
     165             : 
     166             : 
     167             : /** \brief Attach a connection to the communicator.
     168             :  *
     169             :  * This function attaches a connection to the communicator. This allows
     170             :  * us to execute code for that connection by having the process_signal()
     171             :  * function called.
     172             :  *
     173             :  * Connections are kept in the order in which they are added. This may
     174             :  * change the order in which connection callbacks are called. However,
     175             :  * events are received asynchronously so do not expect callbacks to be
     176             :  * called in any specific order.
     177             :  *
     178             :  * You may call this function with a null pointer. It simply returns
     179             :  * false immediately. This makes it easy to eventually allocate a
     180             :  * new connection and then use the return value of this function
     181             :  * to know whether the two step process worked or not.
     182             :  *
     183             :  * \note
     184             :  * A connection can only be added once to a communicator object.
     185             :  * Also it cannot be shared between multiple communicator objects.
     186             :  *
     187             :  * \param[in] connection  The connection being added.
     188             :  *
     189             :  * \return true if the connection was added, false if the connection
     190             :  *         was already present in the communicator list of connections.
     191             :  */
     192          20 : bool communicator::add_connection(connection::pointer_t connection)
     193             : {
     194          20 :     if(connection == nullptr)
     195             :     {
     196           0 :         return false;
     197             :     }
     198             : 
     199          20 :     if(!connection->valid_socket())
     200             :     {
     201             :         throw invalid_parameter(
     202             :             "communicator::add_connection(): connection without a socket"
     203           0 :             " cannot be added to a communicator object.");
     204             :     }
     205             : 
     206          20 :     auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
     207          20 :     if(it != f_connections.end())
     208             :     {
     209             :         // already added, can be added only once but we allow multiple
     210             :         // calls (however, we do not count those calls, so first call
     211             :         // to the remove_connection() does remove it!)
     212             :         //
     213             : 
     214           0 :         SNAP_LOG_TRACE
     215             :             << "connection, \""
     216           0 :             << connection->get_name()
     217             :             << "\" not readded (already present in f_connections)."
     218             :             << SNAP_LOG_SEND;
     219             : 
     220           0 :         return false;
     221             :     }
     222             : 
     223          20 :     f_connections.push_back(connection);
     224             : 
     225          20 :     connection->connection_added();
     226             : 
     227          40 :     SNAP_LOG_TRACE
     228             :         << "added 1 connection, \""
     229          20 :         << connection->get_name()
     230          40 :         << "\", there is now "
     231          20 :         << f_connections.size()
     232             :         << " connections (including this one)."
     233             :         << SNAP_LOG_SEND;
     234             : 
     235          20 :     return true;
     236             : }
     237             : 
     238             : 
     239             : /** \brief Remove a connection from a communicator object.
     240             :  *
     241             :  * This function removes a connection from this communicator object.
     242             :  * Note that any one connection can only be added once.
     243             :  *
     244             :  * \param[in] connection  The connection to remove from this communicator.
     245             :  *
     246             :  * \return true if the connection was removed, false if it was not found.
     247             :  */
     248          22 : bool communicator::remove_connection(connection::pointer_t connection)
     249             : {
     250          22 :     auto it(std::find(f_connections.begin(), f_connections.end(), connection));
     251          22 :     if(it == f_connections.end())
     252             :     {
     253           2 :         return false;
     254             :     }
     255             : 
     256          40 :     SNAP_LOG_TRACE
     257             :         << "removing 1 connection, \""
     258          20 :         << connection->get_name()
     259          40 :         << "\", of "
     260          20 :         << f_connections.size()
     261             :         << " connections (including this one)."
     262             :         << SNAP_LOG_SEND;
     263             : 
     264          20 :     f_connections.erase(it);
     265             : 
     266          20 :     connection->connection_removed();
     267             : 
     268          20 :     if(f_debug_connections != snaplogger::severity_t::SEVERITY_OFF)
     269             :     {
     270           0 :         log_connections();
     271             :     }
     272             : 
     273          20 :     return true;
     274             : }
     275             : 
     276             : 
     277             : /** \brief Log the list of connections.
     278             :  *
     279             :  * This function prints out the name of each existing connection to your
     280             :  * logs at the specified log level (severity).
     281             :  *
     282             :  * The function can automatically be called when you remove a connection
     283             :  * when the debug connections flag is turned on. This is done by calling
     284             :  * the debug_connections() function.
     285             :  *
     286             :  * \param[in] severity  The logger severity level.
     287             :  *
     288             :  * \sa debug_connections()
     289             :  */
     290           0 : void communicator::log_connections(snaplogger::severity_t severity)
     291             : {
     292           0 :     std::for_each(
     293             :               f_connections.begin()
     294             :             , f_connections.end()
     295           0 :             , [severity](auto const & c)
     296           0 :             {
     297           0 :                 snaplogger::message msg(severity, __FILE__, __func__, __LINE__);
     298           0 :                 msg << "communicator remaining connection: \""
     299           0 :                     << c->get_name()
     300           0 :                     << "\"";
     301           0 :                 snaplogger::send_message(msg);
     302           0 :             });
     303           0 : }
     304             : 
     305             : 
     306             : /** \brief Set the Force Sort flag to \p status.
     307             :  *
     308             :  * This function can be called to force the run() function to sort (or not
     309             :  * sort) the list of connections.
     310             :  *
     311             :  * Since the sort function is somewhat expensive, the sort changes the
     312             :  * vector of connections in place. Then only a change of priority
     313             :  * triggers a request for the vector to be sorted again.
     314             :  *
     315             :  * This function can be used in the event you need to force a trigger.
     316             :  * It would be unlikely that you would call this function with false.
     317             :  *
     318             :  * \param[in] status  The new status of the force sort flag.
     319             :  */
     320           0 : void communicator::set_force_sort(bool status)
     321             : {
     322           0 :     f_force_sort = status;
     323           0 : }
     324             : 
     325             : 
     326             : /** \brief Check whether the run() function is still going.
     327             :  *
     328             :  * The f_running internal flag is set to true while within the run()
     329             :  * function. This function tells you whether you already called the
     330             :  * run() function and are running within a callback or you are before
     331             :  * or after the call.
     332             :  *
     333             :  * \return true if the run() function is still running.
     334             :  */
     335           6 : bool communicator::is_running() const
     336             : {
     337           6 :     return f_running;
     338             : }
     339             : 
     340             : 
     341             : /** \brief Debug connections being removed.
     342             :  *
     343             :  * Whenever one of your processes is stuck on a QUIT, it most likely
     344             :  * is because you have one or more connections still defined in your
     345             :  * communicator.
     346             :  *
     347             :  * The communicator has a list of connections and it is possible to
     348             :  * automatically get that list in your logs whenever you remove a
     349             :  * connection. This is often very helpful even while running because
     350             :  * that way you can see what is still in your communicator at a given
     351             :  * moment.
     352             :  *
     353             :  * By default, though, this list does not get printed in the logs to
     354             :  * avoid wasting disk space and processing time. In a debug setup, it
     355             :  * is really helpful to call this function.
     356             :  *
     357             :  * Note that the feature is available in the release version of the
     358             :  * library. It will output the data at the DEBUG level.
     359             :  *
     360             :  * For the list to appear, you need to call this function with
     361             :  * the \p severity parameter set to a value other than
     362             :  * snaplogger::severity_t::SEVERITY_OFF.
     363             :  *
     364             :  * \note
     365             :  * To optimize this feature as much as possible, you can turn it on
     366             :  * only at the time you are calling the very last remove_connection()
     367             :  * (or at least what you think is the very last connection). That
     368             :  * way you avoid having the list appear all over the place.
     369             :  *
     370             :  * You can also directly call the log_connections() function which
     371             :  * is what we use internally.
     372             :  *
     373             :  * \param[in] severity  The severity level at which to log lists of connections.
     374             :  *
     375             :  * \sa log_connections()
     376             :  * \sa remove_connection()
     377             :  */
     378           0 : void communicator::debug_connections(snaplogger::severity_t severity)
     379             : {
     380           0 :     f_debug_connections = severity;
     381           0 : }
     382             : 
     383             : 
     384             : /** \brief Run until all connections are removed.
     385             :  *
     386             :  * This function "blocks" until all the connections added to this
     387             :  * communicator instance are removed. Until then, it wakes
     388             :  * up and run callback functions whenever an event occurs.
     389             :  *
     390             :  * In other words, you want to add_connection() before you call
     391             :  * this function otherwise the function returns immediately.
     392             :  *
     393             :  * Note that you can include timeout events so if you need to
     394             :  * run some code once in a while, you may just use a timeout
     395             :  * event and process your repetitive events that way.
     396             :  *
     397             :  * \note
     398             :  * Calling exit() or a similar function from within a callback
     399             :  * is not adviced, although it may work in most cases, it is
     400             :  * much better/cleaner to go through your list of connections
     401             :  * and remove them all once you are ready to quit. This also
     402             :  * allows for a 100% valid shutdown procedure.
     403             :  *
     404             :  * \return true if the loop exits because the list of connections is empty.
     405             :  */
     406           8 : bool communicator::run()
     407             : {
     408           8 :     if(f_running)
     409             :     {
     410           0 :         SNAP_LOG_FATAL
     411             :             << "communicator::run(): recursively called from within a callback."
     412             :             << SNAP_LOG_SEND;
     413           0 :         throw recursive_call("communicator::run(): recursively called from within a callback.");
     414             :     }
     415             : 
     416          16 :     snapdev::safe_variable running(f_running, true);
     417             : 
     418          16 :     std::vector<bool> enabled;
     419          16 :     std::vector<struct pollfd> fds;
     420           8 :     f_force_sort = true;
     421             :     for(;;)
     422             :     {
     423             :         // any connections?
     424          36 :         if(f_connections.empty())
     425             :         {
     426           8 :             return true;
     427             :         }
     428             : 
     429          28 :         if(f_force_sort)
     430             :         {
     431             :             // sort the connections by priority
     432             :             //
     433           8 :             std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
     434           8 :             f_force_sort = false;
     435             :         }
     436             : 
     437             :         // make a copy because the callbacks may end up making
     438             :         // changes to the main list and we would have problems
     439             :         // with that here...
     440             :         //
     441          56 :         connection::vector_t connections(f_connections);
     442          28 :         size_t max_connections(connections.size());
     443             : 
     444             :         // timeout is do not time out by default
     445             :         //
     446          28 :         std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
     447             : 
     448             :         // clear() is not supposed to delete the buffer of vectors
     449             :         //
     450          28 :         enabled.clear();
     451          28 :         fds.clear();
     452          28 :         fds.reserve(max_connections); // avoid more than 1 allocation
     453          91 :         for(size_t idx(0); idx < max_connections; ++idx)
     454             :         {
     455         118 :             connection::pointer_t c(connections[idx]);
     456          63 :             c->f_fds_position = -1;
     457             : 
     458             :             // is the connection enabled?
     459             :             //
     460             :             // note that we save that value for later use in our loop
     461             :             // below because otherwise we will miss many events and
     462             :             // it tends to break things; that means you may get your
     463             :             // callback called even while disabled
     464             :             //
     465          63 :             enabled.push_back(c->is_enabled());
     466          63 :             if(!enabled[idx])
     467             :             {
     468             :                 //SNAP_LOG_TRACE
     469             :                 //    << "communicator::run(): connection '"
     470             :                 //    << c->get_name()
     471             :                 //    << "' has been disabled, so ignored."
     472             :                 //    << SNAP_LOG_SEND;
     473           7 :                 continue;
     474             :             }
     475             : //SNAP_LOG_TRACE
     476             : //    << "communicator::run(): handling connection "
     477             : //    << idx
     478             : //    << "/"
     479             : //    << max_connections
     480             : //    << ". '"
     481             : //    << c->get_name()
     482             : //    << "' since it is enabled..."
     483             : //    << SNAP_LOG_SEND;
     484             : 
     485             :             // check whether a timeout is defined in this connection
     486             :             //
     487          56 :             int64_t const timestamp(c->save_timeout_timestamp());
     488          56 :             if(timestamp != -1)
     489             :             {
     490             :                 // the timeout event gives us a time when to tick
     491             :                 //
     492           1 :                 if(timestamp < next_timeout_timestamp)
     493             :                 {
     494           1 :                     next_timeout_timestamp = timestamp;
     495             :                 }
     496             :             }
     497             : 
     498             :             // is there any events to listen on?
     499          56 :             int e(0);
     500          56 :             if(c->is_listener() || c->is_signal())
     501             :             {
     502          25 :                 e |= POLLIN;
     503             :             }
     504          56 :             if(c->is_reader())
     505             :             {
     506          27 :                 e |= POLLIN | POLLPRI | POLLRDHUP;
     507             :             }
     508          56 :             if(c->is_writer())
     509             :             {
     510           6 :                 e |= POLLOUT | POLLRDHUP;
     511             :             }
     512          56 :             if(e == 0)
     513             :             {
     514             :                 // this should only happen on timer objects
     515             :                 //
     516           1 :                 continue;
     517             :             }
     518             : 
     519             :             // do we have a currently valid socket? (i.e. the connection
     520             :             // may have been closed or we may be handling a timer or
     521             :             // signal object)
     522             :             //
     523          55 :             if(c->get_socket() < 0)
     524             :             {
     525           0 :                 continue;
     526             :             }
     527             : 
     528             :             // this is considered valid, add this connection to the list
     529             :             //
     530             :             // save the position since we may skip some entries...
     531             :             // (otherwise we would have to use -1 as the socket to
     532             :             // allow for such dead entries, but avoiding such entries
     533             :             // saves time)
     534             :             //
     535          55 :             c->f_fds_position = fds.size();
     536             : 
     537             : //SNAP_LOG_ERROR
     538             : //    << "*** still waiting on \""
     539             : //    << c->get_name()
     540             : //    << "\"."
     541             : //    << SNAP_LOG_SEND;
     542             : 
     543          55 :             struct pollfd fd;
     544          55 :             fd.fd = c->get_socket();
     545          55 :             fd.events = e;
     546          55 :             fd.revents = 0; // probably useless... (kernel should clear those)
     547          55 :             fds.push_back(fd);
     548             :         }
     549             : 
     550             :         // compute the right timeout
     551          28 :         std::int64_t timeout(-1);
     552          28 :         if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
     553             :         {
     554           1 :             std::int64_t const now(get_current_date());
     555           1 :             timeout = next_timeout_timestamp - now;
     556           1 :             if(timeout < 0)
     557             :             {
     558             :                 // timeout is in the past so timeout immediately, but
     559             :                 // still check for events if any
     560           1 :                 timeout = 0;
     561             :             }
     562             :             else
     563             :             {
     564             :                 // convert microseconds to milliseconds for poll()
     565           0 :                 timeout /= 1000;
     566           0 :                 if(timeout == 0)
     567             :                 {
     568             :                     // less than one is a waste of time (CPU intensive
     569             :                     // until the time is reached, we can be 1 ms off
     570             :                     // instead...)
     571           0 :                     timeout = 1;
     572             :                 }
     573             :             }
     574             :         }
     575          27 :         else if(fds.empty())
     576             :         {
     577           0 :             SNAP_LOG_FATAL
     578           0 :                 << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
     579           0 :                 << max_connections
     580             :                 << " and exiting the run() loop anyway.)"
     581             :                 << SNAP_LOG_SEND;
     582           0 :             return false;
     583             :         }
     584             : 
     585             : //SNAP_LOG_TRACE << "communicator::run(): ready to poll(); "
     586             : //               << "count " << fds.size()
     587             : //               << " timeout " << timeout
     588             : //               << " (next was: " << next_timeout_timestamp
     589             : //               << ", current ~ " << get_current_date()
     590             : //               << ")"
     591             : //               << SNAP_LOG_SEND;
     592             : 
     593             :         // TODO: add support for ppoll() so we can support signals cleanly
     594             :         //       with nearly no additional work from us
     595             :         //
     596          28 :         errno = 0;
     597          28 :         int const r(poll(fds.empty() ? nullptr : &fds[0], fds.size(), timeout));
     598          28 :         if(r >= 0)
     599             :         {
     600             :             // quick sanity check
     601             :             //
     602          28 :             if(static_cast<size_t>(r) > connections.size())
     603             :             {
     604           0 :                 throw runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
     605             :             }
     606             : //SNAP_LOG_TRACE
     607             : //    <<"tid="
     608             : //    << cppthread::gettid()
     609             : //    << ", communicator::run(): ------------------- new set of "
     610             : //    << r
     611             : //    << " events to handle"
     612             : //    << SNAP_LOG_SEND;
     613             : 
     614             :             // check each connection one by one for:
     615             :             //
     616             :             // 1) fds events, including signals
     617             :             // 2) timeouts
     618             :             //
     619             :             // and execute the corresponding callbacks
     620             :             //
     621          91 :             for(size_t idx(0); idx < connections.size(); ++idx)
     622             :             {
     623         119 :                 connection::pointer_t c(connections[idx]);
     624             : 
     625             :                 // is the connection enabled?
     626             :                 //
     627             :                 // note that we check whether that connection was enabled
     628             :                 // before poll() was called; this is very important because
     629             :                 // the last poll() events must be run even if a previous
     630             :                 // callback call just disabled this very connection
     631             :                 // (i.e. at the time we called poll() the connection was
     632             :                 // still enabled and therefore we are expected to call
     633             :                 // their callbacks even if it just got disabled by an
     634             :                 // earlier callback)
     635             :                 //
     636          63 :                 if(!enabled[idx])
     637             :                 {
     638             :                     //SNAP_LOG_TRACE
     639             :                     //    << "communicator::run(): in loop, connection '"
     640             :                     //    << c->get_name()
     641             :                     //    << "' has been disabled, so ignored!"
     642             :                     //    << SNAP_LOG_SEND;
     643           7 :                     continue;
     644             :                 }
     645             : 
     646             :                 // if we have a valid fds position then an event other
     647             :                 // than a timeout occurred on that connection
     648             :                 //
     649          56 :                 if(c->f_fds_position >= 0)
     650             :                 {
     651          55 :                     struct pollfd * fd(&fds[c->f_fds_position]);
     652             : 
     653             :                     // if any events were found by poll(), process them now
     654             :                     //
     655             : //SNAP_LOG_TRACE
     656             : //    <<"tid="
     657             : //    << cppthread::gettid()
     658             : //    << ", communicator::run(): events for "
     659             : //    << c->get_name()
     660             : //    << " = "
     661             : //    << fd->revents
     662             : //    << SNAP_LOG_SEND;
     663          55 :                     if(fd->revents != 0)
     664             :                     {
     665             :                         // an event happened on this one
     666             :                         //
     667          27 :                         if((fd->revents & (POLLIN | POLLPRI)) != 0)
     668             :                         {
     669             :                             // we consider that Unix signals have the greater priority
     670             :                             // and thus handle them first
     671             :                             //
     672          20 :                             if(c->is_signal())
     673             :                             {
     674           8 :                                 signal * ss(dynamic_cast<signal *>(c.get()));
     675           8 :                                 if(ss != nullptr)
     676             :                                 {
     677           8 :                                     ss->process();
     678             :                                 }
     679             :                             }
     680          12 :                             else if(c->is_listener())
     681             :                             {
     682             :                                 // a listener is a special case and we want
     683             :                                 // to call process_accept() instead
     684             :                                 //
     685           1 :                                 c->process_accept();
     686             :                             }
     687             :                             else
     688             :                             {
     689          11 :                                 c->process_read();
     690             :                             }
     691             :                         }
     692          27 :                         if((fd->revents & POLLOUT) != 0)
     693             :                         {
     694           6 :                             c->process_write();
     695             :                         }
     696          27 :                         if((fd->revents & POLLERR) != 0)
     697             :                         {
     698           0 :                             c->process_error();
     699             :                         }
     700          27 :                         if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
     701             :                         {
     702           5 :                             c->process_hup();
     703             :                         }
     704          27 :                         if((fd->revents & POLLNVAL) != 0)
     705             :                         {
     706           0 :                             c->process_invalid();
     707             :                         }
     708             :                     }
     709             :                 }
     710             : 
     711             :                 // now check whether we have a timeout on this connection
     712             :                 //
     713          56 :                 int64_t const timestamp(c->get_saved_timeout_timestamp());
     714          56 :                 if(timestamp != -1)
     715             :                 {
     716           1 :                     int64_t const now(get_current_date());
     717           1 :                     if(now >= timestamp)
     718             :                     {
     719             : //SNAP_LOG_TRACE
     720             : //    << "communicator::run(): timer of connection = '"<< c->get_name()
     721             : //    << "', timestamp = " << timestamp
     722             : //    << ", now = " << now
     723             : //    << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE")
     724             : //    << SNAP_LOG_SEND;
     725             : 
     726             :                         // move the timeout as required first
     727             :                         // (because the callback may move it again)
     728             :                         //
     729           1 :                         c->calculate_next_tick();
     730             : 
     731             :                         // the timeout date needs to be reset if the tick
     732             :                         // happened for that date
     733             :                         //
     734           1 :                         if(now >= c->get_timeout_date())
     735             :                         {
     736           1 :                             c->set_timeout_date(-1);
     737             :                         }
     738             : 
     739             :                         // then run the callback
     740             :                         //
     741           1 :                         c->process_timeout();
     742             :                     }
     743             :                 }
     744             :             }
     745             :         }
     746             :         else
     747             :         {
     748             :             // r < 0 means an error occurred
     749             :             //
     750           0 :             if(errno == EINTR)
     751             :             {
     752             :                 // Note: if the user wants to prevent this error, he should
     753             :                 //       use the signal with the Unix signals that may
     754             :                 //       happen while calling poll().
     755             :                 //
     756           0 :                 throw runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
     757             :             }
     758           0 :             if(errno == EFAULT)
     759             :             {
     760           0 :                 throw invalid_parameter("communicator::run(): buffer was moved out of our address space?");
     761             :             }
     762           0 :             if(errno == EINVAL)
     763             :             {
     764             :                 // if this is really because nfds is too large then it may be
     765             :                 // a "soft" error that can be fixed; that being said, my
     766             :                 // current version is 16K files which frankly when we reach
     767             :                 // that level we have a problem...
     768             :                 //
     769           0 :                 struct rlimit rl;
     770           0 :                 getrlimit(RLIMIT_NOFILE, &rl);
     771             :                 throw invalid_parameter(
     772             :                             "communicator::run(): too many file fds for poll, limit is currently "
     773           0 :                           + std::to_string(rl.rlim_cur)
     774           0 :                           + ", your kernel top limit is "
     775           0 :                           + std::to_string(rl.rlim_max));
     776             :             }
     777           0 :             if(errno == ENOMEM)
     778             :             {
     779           0 :                 throw runtime_error("communicator::run(): poll() failed because of memory");
     780             :             }
     781           0 :             int const e(errno);
     782             :             throw runtime_error(
     783             :                         "communicator::run(): poll() failed with error "
     784           0 :                       + std::to_string(e)
     785           0 :                       + " -- "
     786           0 :                       + strerror(e));
     787             :         }
     788          28 :     }
     789             : }
     790             : 
     791             : 
     792             : 
     793           6 : } // namespace ed
     794             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.13