LCOV - code coverage report
Current view: top level - eventdispatcher - local_stream_client_permanent_message_connection.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 128 216 59.3 %
Date: 2022-02-12 12:27:47 Functions: 31 51 60.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2012-2022  Made to Order Software Corp.  All Rights Reserved
       2             : //
       3             : // https://snapwebsites.org/project/eventdispatcher
       4             : // contact@m2osw.com
       5             : //
       6             : // This program is free software; you can redistribute it and/or modify
       7             : // it under the terms of the GNU General Public License as published by
       8             : // the Free Software Foundation; either version 2 of the License, or
       9             : // (at your option) any later version.
      10             : //
      11             : // This program is distributed in the hope that it will be useful,
      12             : // but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             : // GNU General Public License for more details.
      15             : //
      16             : // You should have received a copy of the GNU General Public License along
      17             : // with this program; if not, write to the Free Software Foundation, Inc.,
      18             : // 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
      19             : 
      20             : /** \file
      21             :  * \brief Implementation client permanent message connection.
      22             :  *
      23             :  * This class implements a permanent connection to a Unix socket. This
      24             :  * means if the server is restarted, this class is capable to automatically
      25             :  * reconnect under the hood. This can be done using a thread so if the
      26             :  * connect() command is slow (needs to time out), it won't block the
      27             :  * rest of your event loop.
      28             :  */
      29             : 
      30             : 
      31             : // self
      32             : //
      33             : #include    "eventdispatcher/local_stream_client_permanent_message_connection.h"
      34             : 
      35             : #include    "eventdispatcher/communicator.h"
      36             : #include    "eventdispatcher/exception.h"
      37             : //#include    "eventdispatcher/local_stream_server_client_message_connection.h"
      38             : #include    "eventdispatcher/local_stream_client_message_connection.h"
      39             : #include    "eventdispatcher/thread_done_signal.h"
      40             : 
      41             : 
      42             : // snaplogger lib
      43             : //
      44             : #include    <snaplogger/message.h>
      45             : 
      46             : 
      47             : // snapdev lib
      48             : //
      49             : //#include    <snapdev/not_used.h>
      50             : 
      51             : 
      52             : // cppthread lib
      53             : //
      54             : #include    <cppthread/exception.h>
      55             : #include    <cppthread/guard.h>
      56             : #include    <cppthread/runner.h>
      57             : #include    <cppthread/thread.h>
      58             : 
      59             : 
      60             : // C++ lib
      61             : //
      62             : //#include    <cstring>
      63             : 
      64             : 
      65             : // C lib
      66             : //
      67             : //#include    <sys/socket.h>
      68             : 
      69             : 
      70             : // last include
      71             : //
      72             : #include    <snapdev/poison.h>
      73             : 
      74             : 
      75             : 
      76             : namespace ed
      77             : {
      78             : 
      79             : 
      80             : 
      81             : namespace detail
      82             : {
      83             : 
      84             : 
      85             : /** \brief Internal implementation of the local_stream_client_permanent_message_connection class.
      86             :  *
      87             :  * This class is used to handle a thread that will process a connection for
      88             :  * us. This allows us to connect in any amount of time required by the
      89             :  * Unix system to obtain the connection with the remote server.
      90             :  *
      91             :  * \todo
      92             :  * Having threads at the time we do a fork() is not safe. We may
      93             :  * want to reconsider offering this functionality here. Because at
      94             :  * this time we would have no control of when the thread is created
      95             :  * and thus a way to make sure that no such thread is running when
      96             :  * we call fork().
      97             :  */
      98             : class local_stream_client_permanent_message_connection_impl
      99             : {
     100             : public:
     101           1 :     class messenger
     102             :         : public local_stream_client_message_connection
     103             :     {
     104             :     public:
     105             :         typedef std::shared_ptr<messenger>      pointer_t;
     106             : 
     107           1 :         messenger(
     108             :                   local_stream_client_permanent_message_connection * parent
     109             :                 , addr::unix const & address
     110             :                 , bool const blocking
     111             :                 , bool const close_on_exec)
     112           1 :             : local_stream_client_message_connection(
     113             :                       address
     114             :                     , blocking
     115             :                     , close_on_exec)
     116           1 :             , f_parent(parent)
     117             :         {
     118           1 :             set_name("local_stream_client_permanent_message_connection_impl::messenger");
     119           1 :         }
     120             : 
     121             :         messenger(messenger const & rhs) = delete;
     122             :         messenger & operator = (messenger const & rhs) = delete;
     123             : 
     124             :         // connection implementation
     125           2 :         virtual void process_empty_buffer()
     126             :         {
     127           2 :             local_stream_client_message_connection::process_empty_buffer();
     128           2 :             f_parent->process_empty_buffer();
     129           2 :         }
     130             : 
     131             :         // connection implementation
     132           0 :         virtual void process_error()
     133             :         {
     134           0 :             local_stream_client_message_connection::process_error();
     135           0 :             f_parent->process_error();
     136           0 :         }
     137             : 
     138             :         // connection implementation
     139           0 :         virtual void process_hup()
     140             :         {
     141           0 :             local_stream_client_message_connection::process_hup();
     142           0 :             f_parent->process_hup();
     143           0 :         }
     144             : 
     145             :         // connection implementation
     146           0 :         virtual void process_invalid()
     147             :         {
     148           0 :             local_stream_client_message_connection::process_invalid();
     149           0 :             f_parent->process_invalid();
     150           0 :         }
     151             : 
     152             :         // local_stream_server_client_message_connection implementation
     153           1 :         virtual void process_message(message const & msg)
     154             :         {
     155             :             // We call the dispatcher from our parent since the child
     156             :             // (this messenger) is not given a dispatcher
     157             :             //
     158           2 :             message copy(msg);
     159           1 :             f_parent->dispatch_message(copy);
     160           1 :         }
     161             : 
     162             :     private:
     163             :         local_stream_client_permanent_message_connection *  f_parent = nullptr;
     164             :     };
     165             : 
     166           1 :     class thread_signal_handler
     167             :         : public thread_done_signal
     168             :     {
     169             :     public:
     170             :         typedef std::shared_ptr<thread_signal_handler>   pointer_t;
     171             : 
     172           1 :         thread_signal_handler(local_stream_client_permanent_message_connection_impl * parent_impl)
     173           1 :             : f_parent_impl(parent_impl)
     174             :         {
     175           1 :             set_name("local_stream_client_permanent_message_connection_impl::thread_signal_handler");
     176           1 :         }
     177             : 
     178             :         thread_signal_handler(thread_signal_handler const & rhs) = delete;
     179             :         thread_signal_handler & operator = (thread_signal_handler const & rhs) = delete;
     180             : 
     181             :         /** \brief This signal was emitted.
     182             :          *
     183             :          * This function gets called whenever the thread is just about to
     184             :          * quit. Calling f_thread.is_running() may still return true when
     185             :          * you get in the 'thread_done()' callback. However, an
     186             :          * f_thread.stop() will return very quickly.
     187             :          */
     188           1 :         virtual void process_read()
     189             :         {
     190           1 :             thread_done_signal::process_read();
     191             : 
     192           1 :             f_parent_impl->thread_done();
     193           1 :         }
     194             : 
     195             :     private:
     196             :         local_stream_client_permanent_message_connection_impl *  f_parent_impl = nullptr;
     197             :     };
     198             : 
     199           1 :     class runner
     200             :         : public cppthread::runner
     201             :     {
     202             :     public:
     203           1 :         runner(
     204             :                       local_stream_client_permanent_message_connection_impl * parent_impl
     205             :                     , addr::unix const & address
     206             :                     , bool const blocking = false
     207             :                     , bool const close_on_exec = true)
     208           1 :             : cppthread::runner("background local_stream_client_permanent_message_connection for asynchronous connections")
     209             :             , f_parent_impl(parent_impl)
     210             :             , f_address(address)
     211             :             , f_blocking(blocking)
     212           1 :             , f_close_on_exec(close_on_exec)
     213             :         {
     214           1 :         }
     215             : 
     216             :         runner(runner const & rhs) = delete;
     217             :         runner & operator = (runner const & rhs) = delete;
     218             : 
     219             : 
     220             :         /** \brief This is the actual function run by the thread.
     221             :          *
     222             :          * This function calls the connect() function and then
     223             :          * tells the main thread we are done.
     224             :          */
     225           1 :         virtual void run()
     226             :         {
     227           1 :             connect();
     228             : 
     229             :             // tell the main thread that we are done
     230             :             //
     231           1 :             f_parent_impl->trigger_thread_done();
     232           1 :         }
     233             : 
     234             : 
     235             :         /** \brief This function attempts to connect.
     236             :          *
     237             :          * This function attempts a connection to the specified address
     238             :          * and port with the specified mode (i.e. plain or encrypted.)
     239             :          *
     240             :          * The function may take a long time to succeed connecting with
     241             :          * the server. The main thread will be awaken whenever this
     242             :          * thread dies.
     243             :          *
     244             :          * If an error occurs, then the f_socket variable member will
     245             :          * be set to -1. Otherwise it represents the socket that we
     246             :          * just connected with.
     247             :          */
     248           1 :         void connect()
     249             :         {
     250           1 :             char const * error_name(nullptr);
     251             :             try
     252             :             {
     253           3 :                 f_messenger = std::make_shared<messenger>(
     254           2 :                               f_parent_impl->parent()
     255             :                             , f_address
     256             :                             , f_blocking
     257           1 :                             , f_close_on_exec);
     258           1 :                 return;
     259             :             }
     260           0 :             catch(event_dispatcher_initialization_error const & e)
     261             :             {
     262           0 :                 error_name = "event_dispatcher_initialization_error";
     263           0 :                 f_last_error = e.what();
     264             :             }
     265           0 :             catch(event_dispatcher_runtime_error const & e)
     266             :             {
     267           0 :                 error_name = "event_dispatcher_runtime_error";
     268           0 :                 f_last_error = e.what();
     269             :             }
     270           0 :             catch(std::exception const & e)
     271             :             {
     272           0 :                 error_name = "std::exception";
     273           0 :                 f_last_error = e.what();
     274             :             }
     275           0 :             catch(...)
     276             :             {
     277           0 :                 error_name = "... (any other exception)";
     278           0 :                 f_last_error = "Unknown exception";
     279             :             }
     280           0 :             f_messenger.reset();
     281             : 
     282             :             // connection failed... we will have to try again later
     283             :             //
     284             :             // WARNING: our logger is not multi-thread safe quiet yet
     285             :             //SNAP_LOG_ERROR
     286             :             //    << "connection to "
     287             :             //    << f_address
     288             :             //    << ":"
     289             :             //    << f_port
     290             :             //    << " failed with: "
     291             :             //    << f_last_error
     292             :             //    << " ("
     293             :             //    << error_name
     294             :             //    << ")"
     295             :             //    << SNAP_LOG_SEND;
     296             :         }
     297             : 
     298             : 
     299             :         /** \brief Retrieve the address to connect to.
     300             :          *
     301             :          * This function returns the address passed in on creation.
     302             :          *
     303             :          * \note
     304             :          * Since the variable is constant, it is likely to never change.
     305             :          * However, the c_str() function may change the buffer pointer.
     306             :          * Hence, to be 100% safe, you cannot call this function until
     307             :          * you make sure that the thread is fully stopped.
     308             :          *
     309             :          * \return The destination address.
     310             :          */
     311           0 :         addr::unix get_address() const
     312             :         {
     313           0 :             return f_address;
     314             :         }
     315             : 
     316             : 
     317             :         /** \brief Retrieve the client allocated and connected by the thread.
     318             :          *
     319             :          * This function returns the TCP connection object resulting from
     320             :          * connection attempts of the background thread.
     321             :          *
     322             :          * If the pointer is null, then you may get the corresponding
     323             :          * error message using the get_last_error() function.
     324             :          *
     325             :          * You can get the client TCP connection pointer once. After that
     326             :          * you always get a null pointer.
     327             :          *
     328             :          * \note
     329             :          * This function is guarded so the pointer and the object it
     330             :          * points to will be valid in another thread that retrieves it.
     331             :          *
     332             :          * \return The connection pointer.
     333             :          */
     334           1 :         messenger::pointer_t release_client()
     335             :         {
     336           2 :             cppthread::guard g(f_mutex);
     337           1 :             messenger::pointer_t release;
     338           1 :             release.swap(f_messenger);
     339           2 :             return release;
     340             :         }
     341             : 
     342             : 
     343             :         /** \brief Retrieve the last error message that happened.
     344             :          *
     345             :          * This function returns the last error message that was captured
     346             :          * when trying to connect to the socket. The message is the
     347             :          * e.what() message from the exception we captured.
     348             :          *
     349             :          * The message does not get cleared so the function can be called
     350             :          * any number of times. To know whether an error was generated
     351             :          * on the last attempt, make sure to first get the get_socket()
     352             :          * and if it returns -1, then this message is significant,
     353             :          * otherwise it is from a previous error.
     354             :          *
     355             :          * \warning
     356             :          * Remember that if the background thread was used the error will
     357             :          * NOT be available in the main thread until a full memory barrier
     358             :          * was executed. For that reason we make sure that the thread
     359             :          * was stopped when we detect an error.
     360             :          *
     361             :          * \return The last error message.
     362             :          */
     363           0 :         std::string const & get_last_error() const
     364             :         {
     365           0 :             return f_last_error;
     366             :         }
     367             : 
     368             : 
     369             :         /** \brief Close the connection.
     370             :          *
     371             :          * This function closes the connection. Since the f_local_stream_connection
     372             :          * holds the socket to the remote server, we have get this function
     373             :          * called in order to completely disconnect.
     374             :          *
     375             :          * \note
     376             :          * This function does not clear the f_last_error parameter so it
     377             :          * can be read later.
     378             :          */
     379           1 :         void close()
     380             :         {
     381           1 :             f_messenger.reset();
     382           1 :         }
     383             : 
     384             : 
     385             :     private:
     386             :         local_stream_client_permanent_message_connection_impl *
     387             :                                 f_parent_impl = nullptr;
     388             :         addr::unix const        f_address;
     389             :         bool const              f_blocking;
     390             :         bool const              f_close_on_exec;
     391             :         messenger::pointer_t    f_messenger = messenger::pointer_t();
     392             :         std::string             f_last_error = std::string();
     393             :     };
     394             : 
     395             : 
     396             :     /** \brief Initialize a permanent message connection implementation object.
     397             :      *
     398             :      * This object manages the thread used to asynchronically connect to
     399             :      * the specified address and port.
     400             :      *
     401             :      * This class and its sub-classes may end up executing callbacks
     402             :      * of the local_stream_client_permanent_message_connection object.
     403             :      * However, in all cases these are never run from the thread.
     404             :      *
     405             :      * \param[in] parent  A pointer to the owner of this
     406             :      * local_stream_client_permanent_message_connection_impl object.
     407             :      * \param[in] address  The address we are to connect to.
     408             :      * \param[in] blocking  Whether to open in blocking mode or not.
     409             :      * \param[in] close_on_exec  Whether to mark the socket as requiring to
     410             :      * be closed on an exec() call.
     411             :      */
     412           1 :     local_stream_client_permanent_message_connection_impl(
     413             :                   local_stream_client_permanent_message_connection * parent
     414             :                 , addr::unix const & address
     415             :                 , bool const blocking
     416             :                 , bool const close_on_exec)
     417           1 :         : f_parent(parent)
     418             :         , f_thread_runner(this, address, blocking, close_on_exec)
     419           1 :         , f_thread("background connection handler thread", &f_thread_runner)
     420             :     {
     421           1 :     }
     422             : 
     423             : 
     424             :     local_stream_client_permanent_message_connection_impl(local_stream_client_permanent_message_connection_impl const & rhs) = delete;
     425             :     local_stream_client_permanent_message_connection_impl & operator = (local_stream_client_permanent_message_connection_impl const & rhs) = delete;
     426             : 
     427             :     /** \brief Destroy the permanent message connection.
     428             :      *
     429             :      * This function makes sure that the messenger was lost.
     430             :      */
     431           1 :     ~local_stream_client_permanent_message_connection_impl()
     432           1 :     {
     433             :         // to make sure we can lose the messenger, first we want to be sure
     434             :         // that we do not have a thread running
     435             :         //
     436             :         try
     437             :         {
     438           1 :             f_thread.stop();
     439             :         }
     440           0 :         catch(cppthread::cppthread_mutex_failed_error const &)
     441             :         {
     442             :         }
     443           0 :         catch(cppthread::cppthread_invalid_error const &)
     444             :         {
     445             :         }
     446             : 
     447             :         // in this case we may still have an instance of the f_thread_done
     448             :         // which linger around, we want it out
     449             :         //
     450             :         // Note: the call is safe even if the f_thread_done is null
     451             :         //
     452           1 :         communicator::instance()->remove_connection(f_thread_done);
     453             : 
     454             :         // although the f_messenger variable gets reset automatically in
     455             :         // the destructor, it would not get removed from the
     456             :         // communicator instance if we were not doing it explicitly
     457             :         //
     458           1 :         disconnect();
     459           1 :     }
     460             : 
     461             : 
     462             :     /** \brief Direct connect to the messenger.
     463             :      *
     464             :      * In this case we try to connect without the thread. This allows
     465             :      * us to avoid the thread problems, but we are blocked until the
     466             :      * OS decides to time out or the connection worked.
     467             :      */
     468           0 :     void connect()
     469             :     {
     470           0 :         if(f_done)
     471             :         {
     472           0 :             SNAP_LOG_ERROR
     473             :                 << "Permanent connection marked done. Cannot attempt to reconnect."
     474             :                 << SNAP_LOG_SEND;
     475           0 :             return;
     476             :         }
     477             : 
     478             :         // call the thread connect() function from the main thread
     479             :         //
     480           0 :         f_thread_runner.connect();
     481             : 
     482             :         // simulate receiving the thread_done() signal
     483             :         //
     484           0 :         thread_done();
     485             :     }
     486             : 
     487             : 
     488             :     /** \brief Check whether the permanent connection is currently connected.
     489             :      *
     490             :      * This function returns true if the messenger exists, which means that
     491             :      * the connection is up.
     492             :      *
     493             :      * \return true if the connection is up.
     494             :      */
     495           0 :     bool is_connected()
     496             :     {
     497           0 :         return f_messenger != nullptr;
     498             :     }
     499             : 
     500             : 
     501             :     /** \brief Try to start the thread runner.
     502             :      *
     503             :      * This function tries to start the thread runner in order to initiate
     504             :      * a connection in the background. If the thread could not be started,
     505             :      * then the function returns false.
     506             :      *
     507             :      * If the thread started, then the function returns true. This does
     508             :      * not mean that the connection was obtained. This is known once
     509             :      * the process_connected() function is called.
     510             :      *
     511             :      * \return true if the thread was successfully started.
     512             :      */
     513           1 :     bool background_connect()
     514             :     {
     515           1 :         if(f_done)
     516             :         {
     517           0 :             SNAP_LOG_ERROR
     518             :                 << "Permanent connection marked done. Cannot attempt to reconnect."
     519             :                 << SNAP_LOG_SEND;
     520           0 :             return false;
     521             :         }
     522             : 
     523           1 :         if(f_thread.is_running())
     524             :         {
     525           0 :             SNAP_LOG_ERROR
     526             :                 << "A background connection attempt is already in progress. Further requests are ignored."
     527             :                 << SNAP_LOG_SEND;
     528           0 :             return false;
     529             :         }
     530             : 
     531             :         // create the f_thread_done only when required
     532             :         //
     533           1 :         if(f_thread_done == nullptr)
     534             :         {
     535           1 :             f_thread_done = std::make_shared<thread_signal_handler>(this);
     536             :         }
     537             : 
     538           1 :         communicator::instance()->add_connection(f_thread_done);
     539             : 
     540           1 :         if(!f_thread.start())
     541             :         {
     542           0 :             SNAP_LOG_ERROR
     543             :                 << "The thread used to run the background connection process did not start."
     544             :                 << SNAP_LOG_SEND;
     545           0 :             return false;
     546             :         }
     547             : 
     548           1 :         return true;
     549             :     }
     550             : 
     551             : 
     552             :     /** \brief Tell the main thread that the background thread is done.
     553             :      *
     554             :      * This function is called by the thread so the thread_done()
     555             :      * function of the thread done object gets called. Only the
     556             :      * thread should call this function.
     557             :      *
     558             :      * As a result the thread_done() function of this class will be
     559             :      * called by the main thread.
     560             :      */
     561           1 :     void trigger_thread_done()
     562             :     {
     563           1 :         f_thread_done->thread_done();
     564           1 :     }
     565             : 
     566             : 
     567             :     /** \brief Signal that the background thread is done.
     568             :      *
     569             :      * This callback is called whenever the background thread sends
     570             :      * a signal to us. This is used to avoid calling end user functions
     571             :      * that would certainly cause a lot of problem if called from the
     572             :      * thread.
     573             :      *
     574             :      * The function calls the process_connection_failed() if the
     575             :      * connection did not happen.
     576             :      *
     577             :      * The function calls the process_connected() if the connection
     578             :      * did happen.
     579             :      *
     580             :      * \note
     581             :      * This is used only if the user requested that the connection
     582             :      * happen in the background (i.e. use_thread was set to true
     583             :      * in the local_stream_client_permanent_message_connection object
     584             :      * constructor.)
     585             :      */
     586           1 :     void thread_done()
     587             :     {
     588             :         // if we used the thread we have to remove the signal used
     589             :         // to know that the thread was done
     590             :         //
     591           1 :         communicator::instance()->remove_connection(f_thread_done);
     592             : 
     593             :         // we will access the f_last_error member of the thread runner
     594             :         // which may not be available to the main thread yet, calling
     595             :         // stop forces a memory barrier so we are all good.
     596             :         //
     597             :         // calling stop() has no effect if we did not use the thread,
     598             :         // however, not calling stop() when we did use the thread
     599             :         // causes all sorts of other problems (especially, the thread
     600             :         // never gets joined)
     601             :         //
     602           1 :         f_thread.stop();
     603             : 
     604           2 :         messenger::pointer_t client(f_thread_runner.release_client());
     605           1 :         if(f_done)
     606             :         {
     607             :             // already marked done, ignore the result and lose the
     608             :             // connection immediately
     609             :             //
     610             :             //f_thread_running.close(); -- not necessary, 'client' is the connection
     611           0 :             return;
     612             :         }
     613             : 
     614           1 :         if(client == nullptr)
     615             :         {
     616             :             // TODO: fix address in error message using a addr::addr so
     617             :             //       as to handle IPv6 seamlessly.
     618             :             //
     619           0 :             SNAP_LOG_ERROR
     620             :                 << "connection to "
     621           0 :                 << f_thread_runner.get_address().to_uri()
     622             :                 << " failed with: "
     623           0 :                 << f_thread_runner.get_last_error()
     624             :                 << SNAP_LOG_SEND;
     625             : 
     626             :             // signal that an error occurred
     627             :             //
     628           0 :             f_parent->process_connection_failed(f_thread_runner.get_last_error());
     629             :         }
     630             :         else
     631             :         {
     632           1 :             f_messenger = client;
     633             : 
     634             :             // add the messenger to the communicator
     635             :             //
     636           1 :             communicator::instance()->add_connection(f_messenger);
     637             : 
     638             :             // if some messages were cached, process them immediately
     639             :             //
     640           3 :             while(!f_message_cache.empty())
     641             :             {
     642           1 :                 f_messenger->send_message(f_message_cache[0]);
     643           1 :                 f_message_cache.erase(f_message_cache.begin());
     644             :             }
     645             : 
     646             :             // let the client know we are now connected
     647             :             //
     648           1 :             f_parent->process_connected();
     649             :         }
     650             :     }
     651             : 
     652             :     /** \brief Send a message to the connection.
     653             :      *
     654             :      * This implementation function actually sends the message to the
     655             :      * connection, assuming that the connection exists. Otherwise, it
     656             :      * may cache the message (if cache is true.)
     657             :      *
     658             :      * Note that the message does not get cached if mark_done() was
     659             :      * called earlier since we are trying to close the whole connection.
     660             :      *
     661             :      * \param[in] msg  The message to send.
     662             :      * \param[in] cache  Whether to cache the message if the connection is
     663             :      *                   currently down.
     664             :      *
     665             :      * \return true if the message was forwarded, false if the message
     666             :      *         was ignored or cached.
     667             :      */
     668           2 :     bool send_message(message const & msg, bool cache)
     669             :     {
     670           2 :         if(f_messenger != nullptr)
     671             :         {
     672           1 :             return f_messenger->send_message(msg);
     673             :         }
     674             : 
     675           1 :         if(cache && !f_done)
     676             :         {
     677           1 :             f_message_cache.push_back(msg);
     678             :         }
     679             : 
     680           1 :         return false;
     681             :     }
     682             : 
     683             : 
     684             :     /** \brief Forget about the messenger connection.
     685             :      *
     686             :      * This function is used to fully disconnect from the messenger.
     687             :      *
     688             :      * If there is a messenger, this means:
     689             :      *
     690             :      * \li Removing the messenger from the communicator instance.
     691             :      * \li Closing the connection in the thread object.
     692             :      *
     693             :      * In most cases, it is called when an error occur, also it happens
     694             :      * that we call it explicitly through the disconnect() function
     695             :      * of the permanent connection class.
     696             :      *
     697             :      * \note
     698             :      * This is safe, even though it is called from the messenger itself
     699             :      * because it will not get deleted yet. This is because the run()
     700             :      * loop has a copy in its own temporary copy of the vector of
     701             :      * connections.
     702             :      */
     703           2 :     void disconnect()
     704             :     {
     705           2 :         if(f_messenger != nullptr)
     706             :         {
     707           1 :             communicator::instance()->remove_connection(f_messenger);
     708           1 :             f_messenger.reset();
     709             : 
     710             :             // just the messenger does not close the TCP connection because
     711             :             // we may have another in the thread runner
     712             :             //
     713           1 :             f_thread_runner.close();
     714             :         }
     715           2 :     }
     716             : 
     717             : 
     718             :     /** \brief Return the address and size of the remote computer.
     719             :      *
     720             :      * This function retrieve the socket address.
     721             :      *
     722             :      * \warning
     723             :      * If the socket is not currently connected, the function returns
     724             :      * a default Unix address. This means it returns a valid unnamed
     725             :      * address.
     726             :      *
     727             :      * \return The Unix address used to connect.
     728             :      */
     729           0 :     addr::unix get_address() const
     730             :     {
     731           0 :         if(f_messenger != nullptr)
     732             :         {
     733           0 :             return f_messenger->get_address();
     734             :         }
     735           0 :         return addr::unix();
     736             :     }
     737             : 
     738             : 
     739             :     /** \brief Mark the messenger as done.
     740             :      *
     741             :      * This function is used to mark the messenger as done. This means it
     742             :      * will get removed from the communicator instance as soon as it
     743             :      * is done with its current write buffer if there is one.
     744             :      *
     745             :      * You may also want to call the disconnection() function to actually
     746             :      * reset the pointer along the way.
     747             :      */
     748           1 :     void mark_done()
     749             :     {
     750           1 :         f_done = true;
     751             : 
     752             :         // once done we don't attempt to reconnect so we can as well
     753             :         // get rid of our existing cache immediately to save some
     754             :         // memory
     755             :         //
     756           1 :         f_message_cache.clear();
     757             : 
     758           1 :         if(f_messenger != nullptr)
     759             :         {
     760           1 :             f_messenger->mark_done();
     761             :         }
     762           1 :     }
     763             : 
     764             : 
     765             :     /** \brief Retrieve the parent of the impl.
     766             :      *
     767             :      * This function returns the parent of the impl, which is the main
     768             :      * local_stream_client_permanent_message_connection point. This is
     769             :      * saved in the messenger so we can relay events from our internal
     770             :      * messenger implementation to the main class owned by the user.
     771             :      *
     772             :      * \return The pointer to the parent of the impl.
     773             :      */
     774           1 :     local_stream_client_permanent_message_connection * parent() const
     775             :     {
     776           1 :         return f_parent;
     777             :     }
     778             : 
     779             : 
     780             : private:
     781             :     local_stream_client_permanent_message_connection *
     782             :                                         f_parent = nullptr;
     783             :     thread_signal_handler::pointer_t    f_thread_done = thread_signal_handler::pointer_t();
     784             :     runner                              f_thread_runner;
     785             :     cppthread::thread                   f_thread;
     786             :     messenger::pointer_t                f_messenger = messenger::pointer_t();
     787             :     message::vector_t                   f_message_cache = message::vector_t();
     788             :     bool                                f_done = false;
     789             : };
     790             : 
     791             : 
     792             : 
     793             : }
     794             : // namespace detail
     795             : 
     796             : 
     797             : 
     798             : /** \brief Initializes this TCP client message connection.
     799             :  *
     800             :  * This implementation creates what we call a permanent connection.
     801             :  * Such a connection may fail once in a while. In such circumstances,
     802             :  * the class automatically requests for a reconnection (see various
     803             :  * parameters in the regard below.) However, this causes one issue:
     804             :  * by default, the connection just never ends. When you are about
     805             :  * ready to close the connection, you must call the mark_done()
     806             :  * function first. This will tell the various error functions to
     807             :  * drop this connection instead of restarting it after a small pause.
     808             :  *
     809             :  * This constructor makes sure to initialize the timer and saves
     810             :  * the address, port, mode, pause, and use_thread parameters.
     811             :  *
     812             :  * The timer is first set to trigger immediately. This means the TCP
     813             :  * connection will be attempted as soon as possible (the next time
     814             :  * the run() loop is entered, it will time out immediately.) You
     815             :  * are free to call set_timeout_date() with a date in the future if
     816             :  * you prefer that the connect be attempted a little later.
     817             :  *
     818             :  * The \p pause parameter is used if the connection is lost and this
     819             :  * timer is used again to attempt a new connection. It will be reused
     820             :  * as long as the connection fails (as a delay). It has to be at least
     821             :  * 10 microseconds, although really you should not use less than 1
     822             :  * second (1000000). You may set the pause parameter to 0 in which case
     823             :  * you are responsible to set the delay (by default there will be no
     824             :  * delay and thus the timer will never time out.)
     825             :  *
     826             :  * To start with a delay, instead of trying to connect immediately,
     827             :  * you may pass a negative pause parameter. So for example to get the
     828             :  * first attempt 5 seconds after you created this object, you use
     829             :  * -5000000LL as the pause parameter.
     830             :  *
     831             :  * The \p use_thread parameter determines whether the connection should
     832             :  * be attempted in a thread (asynchronously) or immediately (which means
     833             :  * the timeout callback may block for a while.) If the connection is to
     834             :  * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
     835             :  * the thread is probably not required. For connections to a remote
     836             :  * computer, though, it certainly is important.
     837             :  *
     838             :  * \param[in] address  The address to listen on. It may be set to "0.0.0.0".
     839             :  * \param[in] pause  The amount of time to wait before attempting a new
     840             :  *                   connection after a failure, in microseconds, or 0.
     841             :  * \param[in] use_thread  Whether a thread is used to connect to the
     842             :  *                        server.
     843             :  * \param[in] blocking  Whether the socket is going to be blocking or not.
     844             :  * \param[in] close_on_exec  Automatically close the connection if the process
     845             :  * execute an exec() call.
     846             :  */
     847           1 : local_stream_client_permanent_message_connection::local_stream_client_permanent_message_connection(
     848             :           addr::unix const & address
     849             :         , std::int64_t const pause
     850             :         , bool const use_thread
     851             :         , bool const blocking
     852           1 :         , bool const close_on_exec)
     853             :     : timer(pause < 0 ? -pause : 0)
     854             :     , f_impl(std::make_shared<detail::local_stream_client_permanent_message_connection_impl>(
     855             :               this
     856             :             , address
     857             :             , blocking
     858             :             , close_on_exec))
     859           1 :     , f_pause(llabs(pause))
     860           2 :     , f_use_thread(use_thread)
     861             : {
     862           1 : }
     863             : 
     864             : 
     865             : /** \brief Destroy instance.
     866             :  *
     867             :  * This function cleans up everything in the permanent message object.
     868             :  */
     869           1 : local_stream_client_permanent_message_connection::~local_stream_client_permanent_message_connection()
     870             : {
     871             :     // Does nothing
     872           1 : }
     873             : 
     874             : 
     875             : /** \brief Attempt to send a message to this connection.
     876             :  *
     877             :  * If the connection is currently enabled, the message is sent immediately.
     878             :  * Otherwise, it may be cached if the \p cache parameter is set to true.
     879             :  * A cached message is forwarded as soon as a new successful connection
     880             :  * happens, which can be a problem if messages need to happen in a very
     881             :  * specific order (For example, after a reconnection to snapcommunicator
     882             :  * you first need to REGISTER or CONNECT...)
     883             :  *
     884             :  * \param[in] msg  The message to send to the connected server.
     885             :  * \param[in] cache  Whether the message should be cached.
     886             :  *
     887             :  * \return true if the message was sent, false if it was not sent, although
     888             :  *         if cache was true, it was cached
     889             :  */
     890           2 : bool local_stream_client_permanent_message_connection::send_message(message const & msg, bool cache)
     891             : {
     892           2 :     return f_impl->send_message(msg, cache);
     893             : }
     894             : 
     895             : 
     896             : /** \brief Check whether the connection is up.
     897             :  *
     898             :  * This function returns true if the connection is considered to be up.
     899             :  * This means sending messages will work quickly instead of being
     900             :  * cached up until an actual TCP/IP connection gets established.
     901             :  *
     902             :  * Note that the connection may have hanged up since, and the system
     903             :  * may not have yet detected the fact (i.e. the connection is going
     904             :  * to receive the process_hup() call after the event in which you are
     905             :  * working.)
     906             :  *
     907             :  * \return true if connected
     908             :  */
     909           0 : bool local_stream_client_permanent_message_connection::is_connected() const
     910             : {
     911           0 :     return f_impl->is_connected();
     912             : }
     913             : 
     914             : 
     915             : /** \brief Disconnect the messenger now.
     916             :  *
     917             :  * This function kills the current connection.
     918             :  *
     919             :  * There are a few cases where two daemons communicate between each others
     920             :  * and at some point one of them wants to exit and needs to disconnect. This
     921             :  * function can be used in that one situation assuming that you have an
     922             :  * acknowledgement from the other daemon.
     923             :  *
     924             :  * Say you have daemon A and B. B wants to quit and before doing so sends
     925             :  * a form of "I'm quitting" message to A. In that situation, B is not closing
     926             :  * the messenger connection, A is responsible for that (i.e. A acknowledges
     927             :  * receipt of the "I'm quitting" message from B by closing the connection.)
     928             :  *
     929             :  * B also wants to call the mark_done() function to make sure that it
     930             :  * does not reconnected a split second later and instead the permanent
     931             :  * connection gets removed from the communicator list of connections.
     932             :  */
     933           0 : void local_stream_client_permanent_message_connection::disconnect()
     934             : {
     935           0 :     f_impl->disconnect();
     936           0 : }
     937             : 
     938             : 
     939             : /** \brief Overload so we do not have to use namespace everywhere.
     940             :  *
     941             :  * This function overloads the connection::mark_done() function so
     942             :  * we can call it without the need to use timer::mark_done()
     943             :  * everywhere.
     944             :  */
     945           0 : void local_stream_client_permanent_message_connection::mark_done()
     946             : {
     947           0 :     timer::mark_done();
     948           0 : }
     949             : 
     950             : 
     951             : /** \brief Mark connection as done.
     952             :  *
     953             :  * This function allows you to mark the permanent connection and the
     954             :  * messenger as done.
     955             :  *
     956             :  * Note that calling this function with false is the same as calling the
     957             :  * base class mark_done() function.
     958             :  *
     959             :  * If the \p message parameter is set to true, we suggest you also call
     960             :  * the disconnect() function. That way the messenger will truly get
     961             :  * removed from everyone quickly.
     962             :  *
     963             :  * \param[in] messenger  If true, also mark the messenger as done.
     964             :  */
     965           1 : void local_stream_client_permanent_message_connection::mark_done(bool messenger)
     966             : {
     967           1 :     timer::mark_done();
     968           1 :     if(messenger)
     969             :     {
     970           1 :         f_impl->mark_done();
     971             :     }
     972           1 : }
     973             : 
     974             : 
     975             : /** \brief Retrieve a copy of the client's address.
     976             :  *
     977             :  * This function makes a copy of the address of this client connection
     978             :  * to the \p address parameter and returns the length.
     979             :  *
     980             :  * \return Return a copy of the Unix address.
     981             :  */
     982           0 : addr::unix local_stream_client_permanent_message_connection::get_address() const
     983             : {
     984           0 :     return f_impl->get_address();
     985             : }
     986             : 
     987             : 
     988             : /** \brief Internal timeout callback implementation.
     989             :  *
     990             :  * This callback implements the guts of this class: it attempts to connect
     991             :  * to the specified address and port, optionally after creating a thread
     992             :  * so the attempt can happen asynchronously.
     993             :  *
     994             :  * When the connection fails, the timer is used to try again pause
     995             :  * microseconds later (pause as specified in the constructor).
     996             :  *
     997             :  * When a connection succeeds, the timer is disabled until you detect
     998             :  * an error while using the connection and re-enable the timer.
     999             :  *
    1000             :  * \warning
    1001             :  * This function changes the timeout delay to the pause amount
    1002             :  * as defined with the constructor. If you want to change that
    1003             :  * amount, you can do so an any point after this function call
    1004             :  * using the set_timeout_delay() function. If the pause parameter
    1005             :  * was set to -1, then the timeout never gets changed.
    1006             :  * However, you should not use a permanent message timer as your
    1007             :  * own or you will interfere with the internal use of the timer.
    1008             :  */
    1009           1 : void local_stream_client_permanent_message_connection::process_timeout()
    1010             : {
    1011             :     // got a spurious call when already marked done
    1012             :     //
    1013           1 :     if(is_done())
    1014             :     {
    1015           0 :         return;
    1016             :     }
    1017             : 
    1018             :     // change the timeout delay although we will not use it immediately
    1019             :     // if we start the thread or attempt an immediate connection, but
    1020             :     // that way the user can change it by calling set_timeout_delay()
    1021             :     // at any time after the first process_timeout() call
    1022             :     //
    1023           1 :     if(f_pause > 0)
    1024             :     {
    1025           1 :         set_timeout_delay(f_pause);
    1026           1 :         f_pause = 0;
    1027             :     }
    1028             : 
    1029           1 :     if(f_use_thread)
    1030             :     {
    1031             :         // in this case we create a thread, run it and know whether the
    1032             :         // connection succeeded only when the thread tells us it did
    1033             :         //
    1034             :         // TODO: the background_connect() may return false in two situations:
    1035             :         //       1) when the thread is already running and then the behavior
    1036             :         //          we have below is INCORRECT
    1037             :         //       2) when the thread cannot be started (i.e. could not
    1038             :         //          allocate the stack?) in which case the if() below
    1039             :         //          is the correct behavior
    1040             :         //
    1041           1 :         if(f_impl->background_connect())
    1042             :         {
    1043             :             // we started the thread successfully, so block the timer
    1044             :             //
    1045           1 :             set_enable(false);
    1046             :         }
    1047             :     }
    1048             :     else
    1049             :     {
    1050             :         // the success is noted when we receive a call to
    1051             :         // process_connected(); there we do set_enable(false)
    1052             :         // so the timer stops
    1053             :         //
    1054           0 :         f_impl->connect();
    1055             :     }
    1056             : }
    1057             : 
    1058             : 
    1059             : /** \brief Process an error.
    1060             :  *
    1061             :  * When an error occurs, we restart the timer so we can attempt to reconnect
    1062             :  * to that server.
    1063             :  *
    1064             :  * If you overload this function, make sure to either call this
    1065             :  * implementation or enable the timer yourselves.
    1066             :  *
    1067             :  * \warning
    1068             :  * This function does not call the timer::process_error() function
    1069             :  * which means that this connection is not automatically removed from
    1070             :  * the communicator object on failures.
    1071             :  */
    1072           0 : void local_stream_client_permanent_message_connection::process_error()
    1073             : {
    1074           0 :     if(is_done())
    1075             :     {
    1076           0 :         timer::process_error();
    1077             :     }
    1078             :     else
    1079             :     {
    1080           0 :         f_impl->disconnect();
    1081           0 :         set_enable(true);
    1082             :     }
    1083           0 : }
    1084             : 
    1085             : 
    1086             : /** \brief Process a hang up.
    1087             :  *
    1088             :  * When a hang up occurs, we restart the timer so we can attempt to reconnect
    1089             :  * to that server.
    1090             :  *
    1091             :  * If you overload this function, make sure to either call this
    1092             :  * implementation or enable the timer yourselves.
    1093             :  *
    1094             :  * \warning
    1095             :  * This function does not call the timer::process_hup() function
    1096             :  * which means that this connection is not automatically removed from
    1097             :  * the communicator object on failures.
    1098             :  */
    1099           0 : void local_stream_client_permanent_message_connection::process_hup()
    1100             : {
    1101           0 :     if(is_done())
    1102             :     {
    1103           0 :         timer::process_hup();
    1104             :     }
    1105             :     else
    1106             :     {
    1107           0 :         f_impl->disconnect();
    1108           0 :         set_enable(true);
    1109             :     }
    1110           0 : }
    1111             : 
    1112             : 
    1113             : /** \brief Process an invalid signal.
    1114             :  *
    1115             :  * When an invalid signal occurs, we restart the timer so we can attempt
    1116             :  * to reconnect to that server.
    1117             :  *
    1118             :  * If you overload this function, make sure to either call this
    1119             :  * implementation or enable the timer yourselves.
    1120             :  *
    1121             :  * \warning
    1122             :  * This function does not call the timer::process_invalid() function
    1123             :  * which means that this connection is not automatically removed from
    1124             :  * the communicator object on failures.
    1125             :  */
    1126           0 : void local_stream_client_permanent_message_connection::process_invalid()
    1127             : {
    1128           0 :     if(is_done())
    1129             :     {
    1130           0 :         timer::process_invalid();
    1131             :     }
    1132             :     else
    1133             :     {
    1134           0 :         f_impl->disconnect();
    1135           0 :         set_enable(true);
    1136             :     }
    1137           0 : }
    1138             : 
    1139             : 
    1140             : /** \brief Make sure that the messenger connection gets removed.
    1141             :  *
    1142             :  * This function makes sure that the messenger sub-connection also gets
    1143             :  * removed from the communicator. Otherwise it would lock the system
    1144             :  * since connections are saved in the communicator object as shared
    1145             :  * pointers.
    1146             :  */
    1147           1 : void local_stream_client_permanent_message_connection::connection_removed()
    1148             : {
    1149           1 :     f_impl->disconnect();
    1150           1 : }
    1151             : 
    1152             : 
    1153             : /** \brief Process a connection failed callback.
    1154             :  *
    1155             :  * When a connection attempt fails, we restart the timer so we can
    1156             :  * attempt to reconnect to that server.
    1157             :  *
    1158             :  * If you overload this function, make sure to either call this
    1159             :  * implementation or enable the timer yourselves.
    1160             :  *
    1161             :  * \param[in] error_message  The error message that triggered this callback.
    1162             :  */
    1163           0 : void local_stream_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
    1164             : {
    1165           0 :     snapdev::NOT_USED(error_message);
    1166           0 :     set_enable(true);
    1167           0 : }
    1168             : 
    1169             : 
    1170             : /** \brief The connection is ready.
    1171             :  *
    1172             :  * This callback gets called whenever the connection succeeded and is
    1173             :  * ready to be used.
    1174             :  *
    1175             :  * You should implement this virtual function if you have to initiate
    1176             :  * the communication. For example, the snapserver has to send a
    1177             :  * REGISTER to the snapcommunicator system and thus implements this
    1178             :  * function.
    1179             :  *
    1180             :  * The default implementation makes sure that the timer gets turned off
    1181             :  * so we do not try to reconnect every minute or so. Make sure you call
    1182             :  * the default function so you get the proper behavior.
    1183             :  */
    1184           1 : void local_stream_client_permanent_message_connection::process_connected()
    1185             : {
    1186           1 :     set_enable(false);
    1187           1 : }
    1188             : 
    1189             : 
    1190             : 
    1191           6 : } // namespace ed
    1192             : // vim: ts=4 sw=4 et

Generated by: LCOV version 1.13