LCOV - code coverage report
Current view: top level - eventdispatcher - local_stream_blocking_client_message_connection.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1 133 0.8 %
Date: 2021-09-19 09:06:58 Functions: 2 6 33.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2012-2021  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // https://snapwebsites.org/project/eventdispatcher
       4             : // contact@m2osw.com
       5             : //
       6             : // This program is free software; you can redistribute it and/or modify
       7             : // it under the terms of the GNU General Public License as published by
       8             : // the Free Software Foundation; either version 2 of the License, or
       9             : // (at your option) any later version.
      10             : //
      11             : // This program is distributed in the hope that it will be useful,
      12             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             : // GNU General Public License for more details.
      15             : //
      16             : // You should have received a copy of the GNU General Public License along
      17             : // with this program; if not, write to the Free Software Foundation, Inc.,
      18             : // 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      19             : 
      20             : /** \file
      21             :  * \brief Implementation of the Snap Communicator class.
      22             :  *
      23             :  * This class wraps the C poll() interface in a C++ object with many types
      24             :  * of objects:
      25             :  *
      26             :  * \li Server Connections; for software that want to offer a port to
      27             :  *     which clients can connect to; the server will call accept()
      28             :  *     once a new client connection is ready; this results in a
      29             :  *     Server/Client connection object
      30             :  * \li Client Connections; for software that want to connect to
      31             :  *     a server; these expect the IP address and port to connect to
      32             :  * \li Server/Client Connections; for the server when it accepts a new
      33             :  *     connection; in this case the server gets a socket from accept()
      34             :  *     and creates one of these objects to handle the connection
      35             :  *
      36             :  * Using the poll() function is the easiest and allows us to listen
      37             :  * on pretty much any number of sockets (on my server it is limited
      38             :  * at 16,768 and frankly over 1,000 we probably will start to have
      39             :  * real slowness issues on small VPN servers.)
      40             :  */
      41             : 
      42             : // to get the POLLRDHUP definition
      43             : #ifndef _GNU_SOURCE
      44             : #define _GNU_SOURCE
      45             : #endif
      46             : 
      47             : 
      48             : // self
      49             : //
      50             : #include    "eventdispatcher/local_stream_blocking_client_message_connection.h"
      51             : 
      52             : #include    "eventdispatcher/exception.h"
      53             : 
      54             : 
      55             : // snaplogger lib
      56             : //
      57             : #include    <snaplogger/message.h>
      58             : 
      59             : 
      60             : // snapdev lib
      61             : //
      62             : #include    <snapdev/not_used.h>
      63             : 
      64             : 
      65             : // C++ lib
      66             : //
      67             : #include    <cstring>
      68             : 
      69             : 
      70             : // C lib
      71             : //
      72             : #include    <poll.h>
      73             : #include    <sys/resource.h>
      74             : 
      75             : 
      76             : // last include
      77             : //
      78             : #include    <snapdev/poison.h>
      79             : 
      80             : 
      81             : 
      82             : namespace ed
      83             : {
      84             : 
      85             : 
      86             : 
      87             : /** \brief Blocking client message connection.
      88             :  *
      89             :  * This object allows you to create a blocking, generally temporary
      90             :  * one message connection client. This is specifically used with
      91             :  * the snaplock daemon, but it can be used for other things too as
      92             :  * required.
      93             :  *
      94             :  * The connection is expected to be used as shown in the following
      95             :  * example which is how it is used to implement the LOCK through
      96             :  * our snaplock daemons.
      97             :  *
      98             :  * \code
      99             :  *      class my_blocking_connection
     100             :  *          : public ed::local_stream_blocking_client_message_connection
     101             :  *      {
     102             :  *      public:
     103             :  *          my_blocking_connection(std::string const & addr, int port, mode_t mode)
     104             :  *              : local_stream_blocking_client_message_connection(addr, port, mode)
     105             :  *          {
     106             :  *              // need to register with communicator
     107             :  *              message register_message;
     108             :  *              register_message.set_command("REGISTER");
     109             :  *              ...
     110             :  *              blocking_connection.send_message(register_message);
     111             :  *
     112             :  *              run();
     113             :  *          }
     114             :  *
     115             :  *          ~my_blocking_connection()
     116             :  *          {
     117             :  *              // done, send UNLOCK and then make sure to unregister
     118             :  *              message unlock_message;
     119             :  *              unlock_message.set_command("UNLOCK");
     120             :  *              ...
     121             :  *              blocking_connection.send_message(unlock_message);
     122             :  *
     123             :  *              message unregister_message;
     124             :  *              unregister_message.set_command("UNREGISTER");
     125             :  *              ...
     126             :  *              blocking_connection.send_message(unregister_message);
     127             :  *          }
     128             :  *
     129             :  *          // now that we have a dispatcher, this  would probably use
     130             :  *          // that mechanism instead of a list of if()/else if()
     131             :  *          //
     132             :  *          // Please, consider using the dispatcher instead
     133             :  *          //
     134             :  *          virtual void process_message(message const & message)
     135             :  *          {
     136             :  *              QString const command(message.get_command());
     137             :  *              if(command == "LOCKED")
     138             :  *              {
     139             :  *                  // the lock worked, release hand back to the user
     140             :  *                  done();
     141             :  *              }
     142             :  *              else if(command == "READY")
     143             :  *              {
     144             :  *                  // the REGISTER worked
     145             :  *                  // send the LOCK now
     146             :  *                  message lock_message;
     147             :  *                  lock_message.set_command("LOCK");
     148             :  *                  ...
     149             :  *                  blocking_connection.send_message(lock_message);
     150             :  *              }
     151             :  *              else if(command == "HELP")
     152             :  *              {
     153             :  *                  // snapcommunicator wants us to tell it what commands
     154             :  *                  // we accept
     155             :  *                  message commands_message;
     156             :  *                  commands_message.set_command("COMMANDS");
     157             :  *                  ...
     158             :  *                  blocking_connection.send_message(commands_message);
     159             :  *              }
     160             :  *          }
     161             :  *      };
     162             :  *      my_blocking_connection blocking_connection("127.0.0.1", 4040);
     163             :  *
     164             :  *      // then we can send a message to the service we are interested in
     165             :  *      my_blocking_connection.send_message(my_message);
     166             :  *
     167             :  *      // now we call run() waiting for a reply
     168             :  *      my_blocking_connection.run();
     169             :  * \endcode
     170             :  *
     171             :  * \param[in] addr  The address to connect to.
     172             :  * \param[in] port  The port to connect at.
     173             :  * \param[in] mode  The mode used to connect.
     174             :  */
     175           0 : local_stream_blocking_client_message_connection::local_stream_blocking_client_message_connection(
     176             :               addr::unix const & address
     177             :             , bool const blocking
     178           0 :             , bool const close_on_exec)
     179             :     : local_stream_client_message_connection(
     180             :               address
     181             :             , blocking
     182           0 :             , close_on_exec)
     183             : {
     184           0 : }
     185             : 
     186             : 
     187             : /** \brief Blocking run on the connection.
     188             :  *
     189             :  * This function reads the incoming messages and calls process_message()
     190             :  * on each one of them, in a blocking manner.
     191             :  *
     192             :  * If you called mark_done() before, the done flag is reset back to false.
     193             :  * You will have to call mark_done() again if you again receive a message
     194             :  * that is expected to end the loop.
     195             :  *
     196             :  * \note
     197             :  * Internally, the function actually calls process_line() which transforms
     198             :  * the line in a message and in turn calls process_message().
     199             :  */
     200           0 : void local_stream_blocking_client_message_connection::run()
     201             : {
     202           0 :     mark_not_done();
     203             : 
     204           0 :     do
     205             :     {
     206             :         for(;;)
     207             :         {
     208             :             // TBD: can the socket become -1 within the read() loop?
     209             :             //      (i.e. should not that be just outside of the for(;;)?)
     210             :             //
     211           0 :             struct pollfd fd;
     212           0 :             fd.events = POLLIN | POLLPRI | POLLRDHUP;
     213           0 :             fd.fd = get_socket();
     214           0 :             if(fd.fd < 0
     215           0 :             || !is_enabled())
     216             :             {
     217             :                 // invalid socket
     218           0 :                 process_error();
     219           0 :                 return;
     220             :             }
     221             : 
     222             :             // at this time, this class is used with the lock and
     223             :             // the lock has a timeout so we need to block at most
     224             :             // for that amount of time and not forever (presumably
     225             :             // the snaplock would send us a LOCKFAILED marked with
     226             :             // a "timeout" parameter, but we cannot rely on the
     227             :             // snaplock being there and responding as expected.)
     228             :             //
     229             :             // calculate the number of microseconds and then convert
     230             :             // them to milliseconds for poll()
     231             :             //
     232           0 :             std::int64_t const next_timeout_timestamp(save_timeout_timestamp());
     233           0 :             std::int64_t const now(get_current_date());
     234           0 :             std::int64_t const timeout((next_timeout_timestamp - now) / 1000);
     235           0 :             if(timeout <= 0)
     236             :             {
     237             :                 // timed out
     238             :                 //
     239           0 :                 process_timeout();
     240           0 :                 if(is_done())
     241             :                 {
     242           0 :                     return;
     243             :                 }
     244           0 :                 SNAP_LOG_FATAL
     245           0 :                     << "blocking connection timed out."
     246             :                     << SNAP_LOG_SEND;
     247             :                 throw event_dispatcher_runtime_error(
     248             :                     "local_stream_blocking_client_message_connection::run(): blocking"
     249           0 :                     " connection timed out.");
     250             :             }
     251           0 :             errno = 0;
     252           0 :             fd.revents = 0; // probably useless... (kernel should clear those)
     253           0 :             int const r(::poll(&fd, 1, timeout));
     254           0 :             if(r < 0)
     255             :             {
     256             :                 // r < 0 means an error occurred
     257             :                 //
     258           0 :                 if(errno == EINTR)
     259             :                 {
     260             :                     // Note: if the user wants to prevent this error, he should
     261             :                     //       use the signal with the Unix signals that may
     262             :                     //       happen while calling poll().
     263             :                     //
     264             :                     throw event_dispatcher_runtime_error(
     265             :                             "local_stream_blocking_client_message_connection::run():"
     266             :                             " EINTR occurred while in poll() -- interrupts"
     267           0 :                             " are not supported yet though.");
     268             :                 }
     269           0 :                 if(errno == EFAULT)
     270             :                 {
     271             :                     throw event_dispatcher_runtime_error(
     272             :                             "local_stream_blocking_client_message_connection::run():"
     273           0 :                             " buffer was moved out of our address space?");
     274             :                 }
     275           0 :                 if(errno == EINVAL)
     276             :                 {
     277             :                     // if this is really because nfds is too large then it may be
     278             :                     // a "soft" error that can be fixed; that being said, my
     279             :                     // current version is 16K files which frankly when we reach
     280             :                     // that level we have a problem...
     281             :                     //
     282           0 :                     rlimit rl;
     283           0 :                     getrlimit(RLIMIT_NOFILE, &rl);
     284             :                     throw event_dispatcher_invalid_parameter(
     285             :                               "local_stream_blocking_client_message_connection::run():"
     286             :                               " too many file fds for poll, limit is"
     287             :                               " currently "
     288           0 :                             + std::to_string(rl.rlim_cur)
     289           0 :                             + ", your kernel top limit is "
     290           0 :                             + std::to_string(rl.rlim_max)
     291           0 :                             + ".");
     292             :                 }
     293           0 :                 if(errno == ENOMEM)
     294             :                 {
     295             :                     throw event_dispatcher_runtime_error(
     296             :                             "local_stream_blocking_client_message_connection::run():"
     297           0 :                             " poll() failed because of memory.");
     298             :                 }
     299           0 :                 int const e(errno);
     300             :                 throw event_dispatcher_invalid_parameter(
     301             :                           "local_stream_blocking_client_message_connection::run():"
     302             :                           " poll() failed with error "
     303           0 :                         + std::to_string(e)
     304           0 :                         + " -- "
     305           0 :                         + strerror(e));
     306             :             }
     307             : 
     308           0 :             if((fd.revents & (POLLIN | POLLPRI)) != 0)
     309             :             {
     310             :                 // read one character at a time otherwise we would be
     311             :                 // blocked forever
     312             :                 //
     313           0 :                 char buf[2];
     314           0 :                 int const size(::read(fd.fd, buf, 1));
     315           0 :                 if(size != 1)
     316             :                 {
     317             :                     // invalid read
     318             :                     //
     319           0 :                     process_error();
     320             :                     throw event_dispatcher_invalid_parameter(
     321             :                               "local_stream_blocking_client_message_connection::run():"
     322             :                               " read() failed reading data from socket"
     323             :                               " (return value = "
     324           0 :                             + std::to_string(size)
     325           0 :                             + ").");
     326             :                 }
     327           0 :                 if(buf[0] == '\n')
     328             :                 {
     329             :                     // end of a line, we got a whole message in our buffer
     330             :                     // notice that we do not add the '\n' to line
     331             :                     //
     332           0 :                     break;
     333             :                 }
     334           0 :                 buf[1] = '\0';
     335           0 :                 f_line += buf;
     336             :             }
     337           0 :             if((fd.revents & POLLERR) != 0)
     338             :             {
     339           0 :                 process_error();
     340           0 :                 return;
     341             :             }
     342           0 :             if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
     343             :             {
     344           0 :                 process_hup();
     345           0 :                 return;
     346             :             }
     347           0 :             if((fd.revents & POLLNVAL) != 0)
     348             :             {
     349           0 :                 process_invalid();
     350           0 :                 return;
     351             :             }
     352           0 :         }
     353           0 :         process_line(f_line);
     354           0 :         f_line.clear();
     355             :     }
     356           0 :     while(!is_done());
     357             : }
     358             : 
     359             : 
     360             : /** \brief Quick peek on the connection.
     361             :  *
     362             :  * This function checks for incoming messages and calls process_message()
     363             :  * on each one of them. If no messages are found on the pipe, then the
     364             :  * function returns immediately.
     365             :  *
     366             :  * \note
     367             :  * Internally, the function actually calls process_line() which transforms
     368             :  * the line in a message and in turn calls process_message().
     369             :  */
     370           0 : void local_stream_blocking_client_message_connection::peek()
     371             : {
     372           0 :     do
     373             :     {
     374             :         for(;;)
     375             :         {
     376           0 :             pollfd fd;
     377           0 :             fd.events = POLLIN | POLLPRI | POLLRDHUP;
     378           0 :             fd.fd = get_socket();
     379           0 :             if(fd.fd < 0
     380           0 :             || !is_enabled())
     381             :             {
     382             :                 // invalid socket
     383           0 :                 process_error();
     384           0 :                 return;
     385             :             }
     386             : 
     387           0 :             errno = 0;
     388           0 :             fd.revents = 0; // probably useless... (kernel should clear those)
     389           0 :             int const r(::poll(&fd, 1, 0));
     390           0 :             if(r < 0)
     391             :             {
     392             :                 // r < 0 means an error occurred
     393             :                 //
     394           0 :                 if(errno == EINTR)
     395             :                 {
     396             :                     // Note: if the user wants to prevent this error, he should
     397             :                     //       use the signal with the Unix signals that may
     398             :                     //       happen while calling poll().
     399             :                     //
     400             :                     throw event_dispatcher_runtime_error(
     401             :                             "local_stream_blocking_client_message_connection::run():"
     402             :                             " EINTR occurred while in poll() -- interrupts"
     403           0 :                             " are not supported yet though");
     404             :                 }
     405           0 :                 if(errno == EFAULT)
     406             :                 {
     407             :                     throw event_dispatcher_invalid_parameter(
     408             :                             "local_stream_blocking_client_message_connection::run():"
     409           0 :                             " buffer was moved out of our address space?");
     410             :                 }
     411           0 :                 if(errno == EINVAL)
     412             :                 {
     413             :                     // if this is really because nfds is too large then it may be
     414             :                     // a "soft" error that can be fixed; that being said, my
     415             :                     // current version is 16K files which frankly when we reach
     416             :                     // that level we have a problem...
     417             :                     //
     418           0 :                     struct rlimit rl;
     419           0 :                     getrlimit(RLIMIT_NOFILE, &rl);
     420             :                     throw event_dispatcher_invalid_parameter(
     421             :                               "local_stream_blocking_client_message_connection::run():"
     422             :                               " too many file fds for poll, limit is currently "
     423           0 :                             + std::to_string(rl.rlim_cur)
     424           0 :                             + ", your kernel top limit is "
     425           0 :                             + std::to_string(rl.rlim_max));
     426             :                 }
     427           0 :                 if(errno == ENOMEM)
     428             :                 {
     429             :                     throw event_dispatcher_runtime_error(
     430             :                             "local_stream_blocking_client_message_connection::run():"
     431           0 :                             " poll() failed because of memory");
     432             :                 }
     433           0 :                 int const e(errno);
     434             :                 throw event_dispatcher_runtime_error(
     435             :                           "local_stream_blocking_client_message_connection::run():"
     436             :                           " poll() failed with error "
     437           0 :                         + std::to_string(e)
     438           0 :                         + " -- "
     439           0 :                         + strerror(e));
     440             :             }
     441             : 
     442           0 :             if(r == 0)
     443             :             {
     444           0 :                 return;
     445             :             }
     446             : 
     447           0 :             if((fd.revents & (POLLIN | POLLPRI)) != 0)
     448             :             {
     449             :                 // read one character at a time otherwise we would be
     450             :                 // blocked forever
     451             :                 //
     452           0 :                 char buf[2];
     453           0 :                 int const size(::read(fd.fd, buf, 1));
     454           0 :                 if(size != 1)
     455             :                 {
     456             :                     // invalid read
     457           0 :                     process_error();
     458             :                     throw event_dispatcher_runtime_error(
     459             :                               "local_stream_blocking_client_message_connection::run():"
     460             :                               " read() failed reading data from socket (return"
     461             :                               " value = "
     462           0 :                             + std::to_string(size)
     463           0 :                             + ")");
     464             :                 }
     465           0 :                 if(buf[0] == '\n')
     466             :                 {
     467             :                     // end of a line, we got a whole message in our buffer
     468             :                     // notice that we do not add the '\n' to line
     469           0 :                     break;
     470             :                 }
     471           0 :                 buf[1] = '\0';
     472           0 :                 f_line += buf;
     473             :             }
     474           0 :             if((fd.revents & POLLERR) != 0)
     475             :             {
     476           0 :                 process_error();
     477           0 :                 return;
     478             :             }
     479           0 :             if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
     480             :             {
     481           0 :                 process_hup();
     482           0 :                 return;
     483             :             }
     484           0 :             if((fd.revents & POLLNVAL) != 0)
     485             :             {
     486           0 :                 process_invalid();
     487           0 :                 return;
     488             :             }
     489           0 :         }
     490           0 :         process_line(f_line);
     491           0 :         f_line.clear();
     492             :     }
     493           0 :     while(!is_done());
     494             : }
     495             : 
     496             : 
     497             : /** \brief Send the specified message to the connection on the other end.
     498             :  *
     499             :  * This function sends the specified message to the other side of the
     500             :  * socket connection. If the write somehow fails, then the function
     501             :  * returns false.
     502             :  *
     503             :  * The function blocks until the entire message was written to the
     504             :  * socket.
     505             :  *
     506             :  * \param[in] message  The message to send to the connection.
     507             :  * \param[in] cache  Whether to cache the message if it cannot be sent
     508             :  *                   immediately (ignored at the moment.)
     509             :  *
     510             :  * \return true if the message was sent successfully, false otherwise.
     511             :  */
     512           0 : bool local_stream_blocking_client_message_connection::send_message(message const & msg, bool cache)
     513             : {
     514           0 :     snap::NOT_USED(cache);
     515             : 
     516           0 :     int const s(get_socket());
     517           0 :     if(s >= 0)
     518             :     {
     519             :         // transform the message to a string and write to the socket
     520             :         // the writing is blocking and thus fully synchronous so the
     521             :         // function blocks until the message gets fully sent
     522             :         //
     523             :         // WARNING: we cannot use f_connection.write() because that one
     524             :         //          is asynchronous (at least, it writes to a buffer
     525             :         //          and not directly to the socket!)
     526             :         //
     527           0 :         std::string buf(msg.to_message());
     528           0 :         buf += '\n';
     529           0 :         return ::write(s, buf.c_str(), buf.length()) == static_cast<ssize_t>(buf.length());
     530             :     }
     531             : 
     532           0 :     return false;
     533             : }
     534             : 
     535             : 
     536             : 
     537             : 
     538             : 
     539           6 : } // namespace ed
     540             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.13