LCOV - code coverage report
Current view: top level - eventdispatcher - tcp_blocking_client_message_connection.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1 127 0.8 %
Date: 2019-08-08 02:52:36 Functions: 2 7 28.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2012-2019  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // This program is free software; you can redistribute it and/or modify
       4             : // it under the terms of the GNU General Public License as published by
       5             : // the Free Software Foundation; either version 2 of the License, or
       6             : // (at your option) any later version.
       7             : //
       8             : // This program is distributed in the hope that it will be useful,
       9             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      10             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      11             : // GNU General Public License for more details.
      12             : //
      13             : // You should have received a copy of the GNU General Public License
      14             : // along with this program; if not, write to the Free Software
      15             : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      16             : 
      17             : /** \file
      18             :  * \brief Implementation of the Snap Communicator class.
      19             :  *
      20             :  * This class wraps the C poll() interface in a C++ object with many types
      21             :  * of objects:
      22             :  *
      23             :  * \li Server Connections; for software that want to offer a port to
      24             :  *     which clients can connect to; the server will call accept()
      25             :  *     once a new client connection is ready; this results in a
      26             :  *     Server/Client connection object
      27             :  * \li Client Connections; for software that want to connect to
      28             :  *     a server; these expect the IP address and port to connect to
      29             :  * \li Server/Client Connections; for the server when it accepts a new
      30             :  *     connection; in this case the server gets a socket from accept()
      31             :  *     and creates one of these objects to handle the connection
      32             :  *
      33             :  * Using the poll() function is the easiest and allows us to listen
      34             :  * on pretty much any number of sockets (on my server it is limited
      35             :  * at 16,768 and frankly over 1,000 we probably will start to have
      36             :  * real slowness issues on small VPN servers.)
      37             :  */
      38             : 
      39             : // to get the POLLRDHUP definition
      40             : #ifndef _GNU_SOURCE
      41             : #define _GNU_SOURCE
      42             : #endif
      43             : 
      44             : 
      45             : // self
      46             : //
      47             : #include "eventdispatcher/tcp_blocking_client_message_connection.h"
      48             : 
      49             : #include "eventdispatcher/exception.h"
      50             : 
      51             : 
      52             : // snaplogger lib
      53             : //
      54             : #include "snaplogger/message.h"
      55             : 
      56             : 
      57             : // snapdev lib
      58             : //
      59             : //#include "snapdev/not_reached.h"
      60             : #include "snapdev/not_used.h"
      61             : //#include "snapdev/string_replace_many.h"
      62             : 
      63             : 
      64             : //// libaddr lib
      65             : ////
      66             : //#include "libaddr/addr_parser.h"
      67             : //
      68             : //
      69             : //// C++ lib
      70             : ////
      71             : //#include <sstream>
      72             : //#include <limits>
      73             : //#include <atomic>
      74             : 
      75             : 
      76             : // C lib
      77             : //
      78             : //#include <fcntl.h>
      79             : #include <poll.h>
      80             : //#include <unistd.h>
      81             : //#include <sys/eventfd.h>
      82             : //#include <sys/inotify.h>
      83             : //#include <sys/ioctl.h>
      84             : #include <sys/resource.h>
      85             : //#include <sys/syscall.h>
      86             : //#include <sys/time.h>
      87             : 
      88             : 
      89             : // last include
      90             : //
      91             : #include <snapdev/poison.h>
      92             : 
      93             : 
      94             : 
      95             : namespace ed
      96             : {
      97             : 
      98             : 
      99             : 
     100             : /** \brief Blocking client message connection.
     101             :  *
     102             :  * This object allows you to create a blocking, generally temporary
     103             :  * one message connection client. This is specifically used with
     104             :  * the snaplock daemon, but it can be used for other things too as
     105             :  * required.
     106             :  *
     107             :  * The connection is expected to be used as shown in the following
     108             :  * example which is how it is used to implement the LOCK through
     109             :  * our snaplock daemons.
     110             :  *
     111             :  * \code
     112             :  *      class my_blocking_connection
     113             :  *          : public ed::tcp_blocking_client_message_connection
     114             :  *      {
     115             :  *      public:
     116             :  *          my_blocking_connection(std::string const & addr, int port, mode_t mode)
     117             :  *              : tcp_blocking_client_message_connection(addr, port, mode)
     118             :  *          {
     119             :  *              // need to register with snap communicator
     120             :  *              snap_communicator_message register_message;
     121             :  *              register_message.set_command("REGISTER");
     122             :  *              ...
     123             :  *              blocking_connection.send_message(register_message);
     124             :  *
     125             :  *              run();
     126             :  *          }
     127             :  *
     128             :  *          ~my_blocking_connection()
     129             :  *          {
     130             :  *              // done, send UNLOCK and then make sure to unregister
     131             :  *              snap_communicator_message unlock_message;
     132             :  *              unlock_message.set_command("UNLOCK");
     133             :  *              ...
     134             :  *              blocking_connection.send_message(unlock_message);
     135             :  *
     136             :  *              snap_communicator_message unregister_message;
     137             :  *              unregister_message.set_command("UNREGISTER");
     138             :  *              ...
     139             :  *              blocking_connection.send_message(unregister_message);
     140             :  *          }
     141             :  *
     142             :  *          // now that we have a dispatcher, this  would probably use
     143             :  *          // that mechanism instead of a list of if()/else if()
     144             :  *          //
     145             :  *          // Please, consider using the dispatcher instead
     146             :  *          //
     147             :  *          virtual void process_message(snap_communicator_message const & message)
     148             :  *          {
     149             :  *              QString const command(message.get_command());
     150             :  *              if(command == "LOCKED")
     151             :  *              {
     152             :  *                  // the lock worked, release hand back to the user
     153             :  *                  done();
     154             :  *              }
     155             :  *              else if(command == "READY")
     156             :  *              {
     157             :  *                  // the REGISTER worked
     158             :  *                  // send the LOCK now
     159             :  *                  snap_communicator_message lock_message;
     160             :  *                  lock_message.set_command("LOCK");
     161             :  *                  ...
     162             :  *                  blocking_connection.send_message(lock_message);
     163             :  *              }
     164             :  *              else if(command == "HELP")
     165             :  *              {
     166             :  *                  // snapcommunicator wants us to tell it what commands
     167             :  *                  // we accept
     168             :  *                  snap_communicator_message commands_message;
     169             :  *                  commands_message.set_command("COMMANDS");
     170             :  *                  ...
     171             :  *                  blocking_connection.send_message(commands_message);
     172             :  *              }
     173             :  *          }
     174             :  *      };
     175             :  *      my_blocking_connection blocking_connection("127.0.0.1", 4040);
     176             :  *
     177             :  *      // then we can send a message to the service we are interested in
     178             :  *      my_blocking_connection.send_message(my_message);
     179             :  *
     180             :  *      // now we call run() waiting for a reply
     181             :  *      my_blocking_connection.run();
     182             :  * \endcode
     183             :  *
     184             :  * \param[in] addr  The address to connect to.
     185             :  * \param[in] port  The port to connect at.
     186             :  * \param[in] mode  The mode used to connect.
     187             :  */
     188           0 : tcp_blocking_client_message_connection::tcp_blocking_client_message_connection(
     189             :               std::string const & addr
     190             :             , int const port
     191             :             , mode_t const mode)
     192           0 :     : tcp_client_message_connection(addr, port, mode, true)
     193             : {
     194           0 : }
     195             : 
     196             : 
     197             : /** \brief Blocking run on the connection.
     198             :  *
     199             :  * This function reads the incoming messages and calls process_message()
     200             :  * on each one of them, in a blocking manner.
     201             :  *
     202             :  * If you called mark_done() before, the done flag is reset back to false.
     203             :  * You will have to call mark_done() again if you again receive a message
     204             :  * that is expected to end the loop.
     205             :  *
     206             :  * \note
     207             :  * Internally, the function actually calls process_line() which transforms
     208             :  * the line in a message and in turn calls process_message().
     209             :  */
     210           0 : void tcp_blocking_client_message_connection::run()
     211             : {
     212           0 :     mark_not_done();
     213             : 
     214           0 :     do
     215             :     {
     216           0 :         for(;;)
     217             :         {
     218             :             // TBD: can the socket become -1 within the read() loop?
     219             :             //      (i.e. should not that be just outside of the for(;;)?)
     220             :             //
     221             :             struct pollfd fd;
     222           0 :             fd.events = POLLIN | POLLPRI | POLLRDHUP;
     223           0 :             fd.fd = get_socket();
     224           0 :             if(fd.fd < 0
     225           0 :             || !is_enabled())
     226             :             {
     227             :                 // invalid socket
     228           0 :                 process_error();
     229           0 :                 return;
     230             :             }
     231             : 
     232             :             // at this time, this class is used with the lock and
     233             :             // the lock has a timeout so we need to block at most
     234             :             // for that amount of time and not forever (presumably
     235             :             // the snaplock would send us a LOCKFAILED marked with
     236             :             // a "timeout" parameter, but we cannot rely on the
     237             :             // snaplock being there and responding as expected.)
     238             :             //
     239             :             // calculate the number of microseconds and then convert
     240             :             // them to milliseconds for poll()
     241             :             //
     242           0 :             std::int64_t const next_timeout_timestamp(save_timeout_timestamp());
     243           0 :             std::int64_t const now(get_current_date());
     244           0 :             std::int64_t const timeout((next_timeout_timestamp - now) / 1000);
     245           0 :             if(timeout <= 0)
     246             :             {
     247             :                 // timed out
     248             :                 //
     249           0 :                 process_timeout();
     250           0 :                 if(is_done())
     251             :                 {
     252           0 :                     return;
     253             :                 }
     254             :                 SNAP_LOG_FATAL
     255           0 :                     << "blocking connection timed out.";
     256             :                 throw event_dispatcher_runtime_error(
     257             :                     "tcp_blocking_client_message_connection::run(): blocking"
     258           0 :                     " connection timed out.");
     259             :             }
     260           0 :             errno = 0;
     261           0 :             fd.revents = 0; // probably useless... (kernel should clear those)
     262           0 :             int const r(::poll(&fd, 1, timeout));
     263           0 :             if(r < 0)
     264             :             {
     265             :                 // r < 0 means an error occurred
     266             :                 //
     267           0 :                 if(errno == EINTR)
     268             :                 {
     269             :                     // Note: if the user wants to prevent this error, he should
     270             :                     //       use the snap_signal with the Unix signals that may
     271             :                     //       happen while calling poll().
     272             :                     //
     273             :                     throw event_dispatcher_runtime_error(
     274             :                             "tcp_blocking_client_message_connection::run():"
     275             :                             " EINTR occurred while in poll() -- interrupts"
     276           0 :                             " are not supported yet though.");
     277             :                 }
     278           0 :                 if(errno == EFAULT)
     279             :                 {
     280             :                     throw event_dispatcher_runtime_error(
     281             :                             "tcp_blocking_client_message_connection::run():"
     282           0 :                             " buffer was moved out of our address space?");
     283             :                 }
     284           0 :                 if(errno == EINVAL)
     285             :                 {
     286             :                     // if this is really because nfds is too large then it may be
     287             :                     // a "soft" error that can be fixed; that being said, my
     288             :                     // current version is 16K files which frankly when we reach
     289             :                     // that level we have a problem...
     290             :                     //
     291             :                     rlimit rl;
     292           0 :                     getrlimit(RLIMIT_NOFILE, &rl);
     293             :                     throw event_dispatcher_invalid_parameter(
     294             :                               "tcp_blocking_client_message_connection::run():"
     295             :                               " too many file fds for poll, limit is"
     296             :                               " currently "
     297           0 :                             + std::to_string(rl.rlim_cur)
     298           0 :                             + ", your kernel top limit is "
     299           0 :                             + std::to_string(rl.rlim_max)
     300           0 :                             + ".");
     301             :                 }
     302           0 :                 if(errno == ENOMEM)
     303             :                 {
     304             :                     throw event_dispatcher_runtime_error(
     305             :                             "tcp_blocking_client_message_connection::run():"
     306           0 :                             " poll() failed because of memory.");
     307             :                 }
     308           0 :                 int const e(errno);
     309             :                 throw event_dispatcher_invalid_parameter(
     310             :                           "tcp_blocking_client_message_connection::run():"
     311             :                           " poll() failed with error "
     312           0 :                         + std::to_string(e)
     313           0 :                         + " -- "
     314           0 :                         + strerror(e));
     315             :             }
     316             : 
     317           0 :             if((fd.revents & (POLLIN | POLLPRI)) != 0)
     318             :             {
     319             :                 // read one character at a time otherwise we would be
     320             :                 // blocked forever
     321             :                 //
     322             :                 char buf[2];
     323           0 :                 int const size(::read(fd.fd, buf, 1));
     324           0 :                 if(size != 1)
     325             :                 {
     326             :                     // invalid read
     327             :                     //
     328           0 :                     process_error();
     329             :                     throw event_dispatcher_invalid_parameter(
     330             :                               "tcp_blocking_client_message_connection::run():"
     331             :                               " read() failed reading data from socket"
     332             :                               " (return value = "
     333           0 :                             + std::to_string(size)
     334           0 :                             + ").");
     335             :                 }
     336           0 :                 if(buf[0] == '\n')
     337             :                 {
     338             :                     // end of a line, we got a whole message in our buffer
     339             :                     // notice that we do not add the '\n' to line
     340             :                     //
     341           0 :                     break;
     342             :                 }
     343           0 :                 buf[1] = '\0';
     344           0 :                 f_line += buf;
     345             :             }
     346           0 :             if((fd.revents & POLLERR) != 0)
     347             :             {
     348           0 :                 process_error();
     349           0 :                 return;
     350             :             }
     351           0 :             if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
     352             :             {
     353           0 :                 process_hup();
     354           0 :                 return;
     355             :             }
     356           0 :             if((fd.revents & POLLNVAL) != 0)
     357             :             {
     358           0 :                 process_invalid();
     359           0 :                 return;
     360             :             }
     361             :         }
     362           0 :         process_line(f_line);
     363           0 :         f_line.clear();
     364             :     }
     365           0 :     while(!is_done());
     366             : }
     367             : 
     368             : 
     369             : /** \brief Quick peek on the connection.
     370             :  *
     371             :  * This function checks for incoming messages and calls process_message()
     372             :  * on each one of them. If no messages are found on the pipe, then the
     373             :  * function returns immediately.
     374             :  *
     375             :  * \note
     376             :  * Internally, the function actually calls process_line() which transforms
     377             :  * the line in a message and in turn calls process_message().
     378             :  */
     379           0 : void tcp_blocking_client_message_connection::peek()
     380             : {
     381           0 :     do
     382             :     {
     383           0 :         for(;;)
     384             :         {
     385             :             pollfd fd;
     386           0 :             fd.events = POLLIN | POLLPRI | POLLRDHUP;
     387           0 :             fd.fd = get_socket();
     388           0 :             if(fd.fd < 0
     389           0 :             || !is_enabled())
     390             :             {
     391             :                 // invalid socket
     392           0 :                 process_error();
     393           0 :                 return;
     394             :             }
     395             : 
     396           0 :             errno = 0;
     397           0 :             fd.revents = 0; // probably useless... (kernel should clear those)
     398           0 :             int const r(::poll(&fd, 1, 0));
     399           0 :             if(r < 0)
     400             :             {
     401             :                 // r < 0 means an error occurred
     402             :                 //
     403           0 :                 if(errno == EINTR)
     404             :                 {
     405             :                     // Note: if the user wants to prevent this error, he should
     406             :                     //       use the snap_signal with the Unix signals that may
     407             :                     //       happen while calling poll().
     408             :                     //
     409             :                     throw event_dispatcher_runtime_error(
     410             :                             "tcp_blocking_client_message_connection::run():"
     411             :                             " EINTR occurred while in poll() -- interrupts"
     412           0 :                             " are not supported yet though");
     413             :                 }
     414           0 :                 if(errno == EFAULT)
     415             :                 {
     416             :                     throw event_dispatcher_invalid_parameter(
     417             :                             "tcp_blocking_client_message_connection::run():"
     418           0 :                             " buffer was moved out of our address space?");
     419             :                 }
     420           0 :                 if(errno == EINVAL)
     421             :                 {
     422             :                     // if this is really because nfds is too large then it may be
     423             :                     // a "soft" error that can be fixed; that being said, my
     424             :                     // current version is 16K files which frankly when we reach
     425             :                     // that level we have a problem...
     426             :                     //
     427             :                     struct rlimit rl;
     428           0 :                     getrlimit(RLIMIT_NOFILE, &rl);
     429             :                     throw event_dispatcher_invalid_parameter(
     430             :                               "tcp_blocking_client_message_connection::run():"
     431             :                               " too many file fds for poll, limit is currently "
     432           0 :                             + std::to_string(rl.rlim_cur)
     433           0 :                             + ", your kernel top limit is "
     434           0 :                             + std::to_string(rl.rlim_max));
     435             :                 }
     436           0 :                 if(errno == ENOMEM)
     437             :                 {
     438             :                     throw event_dispatcher_runtime_error(
     439             :                             "tcp_blocking_client_message_connection::run():"
     440           0 :                             " poll() failed because of memory");
     441             :                 }
     442           0 :                 int const e(errno);
     443             :                 throw event_dispatcher_runtime_error(
     444             :                           "tcp_blocking_client_message_connection::run():"
     445             :                           " poll() failed with error "
     446           0 :                         + std::to_string(e)
     447           0 :                         + " -- "
     448           0 :                         + strerror(e));
     449             :             }
     450             : 
     451           0 :             if(r == 0)
     452             :             {
     453           0 :                 return;
     454             :             }
     455             : 
     456           0 :             if((fd.revents & (POLLIN | POLLPRI)) != 0)
     457             :             {
     458             :                 // read one character at a time otherwise we would be
     459             :                 // blocked forever
     460             :                 //
     461             :                 char buf[2];
     462           0 :                 int const size(::read(fd.fd, buf, 1));
     463           0 :                 if(size != 1)
     464             :                 {
     465             :                     // invalid read
     466           0 :                     process_error();
     467             :                     throw event_dispatcher_runtime_error(
     468             :                               "tcp_blocking_client_message_connection::run():"
     469             :                               " read() failed reading data from socket (return"
     470             :                               " value = "
     471           0 :                             + std::to_string(size)
     472           0 :                             + ")");
     473             :                 }
     474           0 :                 if(buf[0] == '\n')
     475             :                 {
     476             :                     // end of a line, we got a whole message in our buffer
     477             :                     // notice that we do not add the '\n' to line
     478           0 :                     break;
     479             :                 }
     480           0 :                 buf[1] = '\0';
     481           0 :                 f_line += buf;
     482             :             }
     483           0 :             if((fd.revents & POLLERR) != 0)
     484             :             {
     485           0 :                 process_error();
     486           0 :                 return;
     487             :             }
     488           0 :             if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
     489             :             {
     490           0 :                 process_hup();
     491           0 :                 return;
     492             :             }
     493           0 :             if((fd.revents & POLLNVAL) != 0)
     494             :             {
     495           0 :                 process_invalid();
     496           0 :                 return;
     497             :             }
     498             :         }
     499           0 :         process_line(f_line);
     500           0 :         f_line.clear();
     501             :     }
     502           0 :     while(!is_done());
     503             : }
     504             : 
     505             : 
     506             : /** \brief Send the specified message to the connection on the other end.
     507             :  *
     508             :  * This function sends the specified message to the other side of the
     509             :  * socket connection. If the write somehow fails, then the function
     510             :  * returns false.
     511             :  *
     512             :  * The function blocks until the entire message was written to the
     513             :  * socket.
     514             :  *
     515             :  * \param[in] message  The message to send to the connection.
     516             :  * \param[in] cache  Whether to cache the message if it cannot be sent
     517             :  *                   immediately (ignored at the moment.)
     518             :  *
     519             :  * \return true if the message was sent succesfully, false otherwise.
     520             :  */
     521           0 : bool tcp_blocking_client_message_connection::send_message(message const & msg, bool cache)
     522             : {
     523           0 :     snap::NOTUSED(cache);
     524             : 
     525           0 :     int const s(get_socket());
     526           0 :     if(s >= 0)
     527             :     {
     528             :         // transform the message to a string and write to the socket
     529             :         // the writing is blocking and thus fully synchronous so the
     530             :         // function blocks until the message gets fully sent
     531             :         //
     532             :         // WARNING: we cannot use f_connection.write() because that one
     533             :         //          is asynchronous (at least, it writes to a buffer
     534             :         //          and not directly to the socket!)
     535             :         //
     536           0 :         std::string buf(msg.to_message());
     537           0 :         buf += '\n';
     538           0 :         return ::write(s, buf.c_str(), buf.length()) == static_cast<ssize_t>(buf.length());
     539             :     }
     540             : 
     541           0 :     return false;
     542             : }
     543             : 
     544             : 
     545             : /** \brief Overridden callback.
     546             :  *
     547             :  * This function is overriding the lower level process_error() to make
     548             :  * (mostly) sure that the remove_from_communicator() function does not
     549             :  * get called because that would generate the creation of a
     550             :  * snap_communicator object which we do not want with blocking
     551             :  * clients.
     552             :  */
     553           0 : void tcp_blocking_client_message_connection::process_error()
     554             : {
     555           0 : }
     556             : 
     557             : 
     558             : 
     559           6 : } // namespace ed
     560             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.12