Line data Source code
1 : // Copyright (c) 2012-2022 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation client permanent message connection.
22 : *
23 : * This class implements a permanent connection to a Unix socket. This
24 : * means if the server is restarted, this class is capable to automatically
25 : * reconnect under the hood. This can be done using a thread so if the
26 : * connect() command is slow (needs to time out), it won't block the
27 : * rest of your event loop.
28 : */
29 :
30 :
31 : // self
32 : //
33 : #include "eventdispatcher/local_stream_client_permanent_message_connection.h"
34 :
35 : #include "eventdispatcher/communicator.h"
36 : #include "eventdispatcher/exception.h"
37 : //#include "eventdispatcher/local_stream_server_client_message_connection.h"
38 : #include "eventdispatcher/local_stream_client_message_connection.h"
39 : #include "eventdispatcher/thread_done_signal.h"
40 :
41 :
42 : // snaplogger lib
43 : //
44 : #include <snaplogger/message.h>
45 :
46 :
47 : // snapdev lib
48 : //
49 : //#include <snapdev/not_used.h>
50 :
51 :
52 : // cppthread lib
53 : //
54 : #include <cppthread/exception.h>
55 : #include <cppthread/guard.h>
56 : #include <cppthread/runner.h>
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // C++ lib
61 : //
62 : //#include <cstring>
63 :
64 :
65 : // C lib
66 : //
67 : //#include <sys/socket.h>
68 :
69 :
70 : // last include
71 : //
72 : #include <snapdev/poison.h>
73 :
74 :
75 :
76 : namespace ed
77 : {
78 :
79 :
80 :
81 : namespace detail
82 : {
83 :
84 :
85 : /** \brief Internal implementation of the local_stream_client_permanent_message_connection class.
86 : *
87 : * This class is used to handle a thread that will process a connection for
88 : * us. This allows us to connect in any amount of time required by the
89 : * Unix system to obtain the connection with the remote server.
90 : *
91 : * \todo
92 : * Having threads at the time we do a fork() is not safe. We may
93 : * want to reconsider offering this functionality here. Because at
94 : * this time we would have no control of when the thread is created
95 : * and thus a way to make sure that no such thread is running when
96 : * we call fork().
97 : */
98 : class local_stream_client_permanent_message_connection_impl
99 : {
100 : public:
101 1 : class messenger
102 : : public local_stream_client_message_connection
103 : {
104 : public:
105 : typedef std::shared_ptr<messenger> pointer_t;
106 :
107 1 : messenger(
108 : local_stream_client_permanent_message_connection * parent
109 : , addr::unix const & address
110 : , bool const blocking
111 : , bool const close_on_exec)
112 1 : : local_stream_client_message_connection(
113 : address
114 : , blocking
115 : , close_on_exec)
116 1 : , f_parent(parent)
117 : {
118 1 : set_name("local_stream_client_permanent_message_connection_impl::messenger");
119 1 : }
120 :
121 : messenger(messenger const & rhs) = delete;
122 : messenger & operator = (messenger const & rhs) = delete;
123 :
124 : // connection implementation
125 2 : virtual void process_empty_buffer()
126 : {
127 2 : local_stream_client_message_connection::process_empty_buffer();
128 2 : f_parent->process_empty_buffer();
129 2 : }
130 :
131 : // connection implementation
132 0 : virtual void process_error()
133 : {
134 0 : local_stream_client_message_connection::process_error();
135 0 : f_parent->process_error();
136 0 : }
137 :
138 : // connection implementation
139 0 : virtual void process_hup()
140 : {
141 0 : local_stream_client_message_connection::process_hup();
142 0 : f_parent->process_hup();
143 0 : }
144 :
145 : // connection implementation
146 0 : virtual void process_invalid()
147 : {
148 0 : local_stream_client_message_connection::process_invalid();
149 0 : f_parent->process_invalid();
150 0 : }
151 :
152 : // local_stream_server_client_message_connection implementation
153 1 : virtual void process_message(message const & msg)
154 : {
155 : // We call the dispatcher from our parent since the child
156 : // (this messenger) is not given a dispatcher
157 : //
158 2 : message copy(msg);
159 1 : f_parent->dispatch_message(copy);
160 1 : }
161 :
162 : private:
163 : local_stream_client_permanent_message_connection * f_parent = nullptr;
164 : };
165 :
166 1 : class thread_signal_handler
167 : : public thread_done_signal
168 : {
169 : public:
170 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
171 :
172 1 : thread_signal_handler(local_stream_client_permanent_message_connection_impl * parent_impl)
173 1 : : f_parent_impl(parent_impl)
174 : {
175 1 : set_name("local_stream_client_permanent_message_connection_impl::thread_signal_handler");
176 1 : }
177 :
178 : thread_signal_handler(thread_signal_handler const & rhs) = delete;
179 : thread_signal_handler & operator = (thread_signal_handler const & rhs) = delete;
180 :
181 : /** \brief This signal was emitted.
182 : *
183 : * This function gets called whenever the thread is just about to
184 : * quit. Calling f_thread.is_running() may still return true when
185 : * you get in the 'thread_done()' callback. However, an
186 : * f_thread.stop() will return very quickly.
187 : */
188 1 : virtual void process_read()
189 : {
190 1 : thread_done_signal::process_read();
191 :
192 1 : f_parent_impl->thread_done();
193 1 : }
194 :
195 : private:
196 : local_stream_client_permanent_message_connection_impl * f_parent_impl = nullptr;
197 : };
198 :
199 1 : class runner
200 : : public cppthread::runner
201 : {
202 : public:
203 1 : runner(
204 : local_stream_client_permanent_message_connection_impl * parent_impl
205 : , addr::unix const & address
206 : , bool const blocking = false
207 : , bool const close_on_exec = true)
208 1 : : cppthread::runner("background local_stream_client_permanent_message_connection for asynchronous connections")
209 : , f_parent_impl(parent_impl)
210 : , f_address(address)
211 : , f_blocking(blocking)
212 1 : , f_close_on_exec(close_on_exec)
213 : {
214 1 : }
215 :
216 : runner(runner const & rhs) = delete;
217 : runner & operator = (runner const & rhs) = delete;
218 :
219 :
220 : /** \brief This is the actual function run by the thread.
221 : *
222 : * This function calls the connect() function and then
223 : * tells the main thread we are done.
224 : */
225 1 : virtual void run()
226 : {
227 1 : connect();
228 :
229 : // tell the main thread that we are done
230 : //
231 1 : f_parent_impl->trigger_thread_done();
232 1 : }
233 :
234 :
235 : /** \brief This function attempts to connect.
236 : *
237 : * This function attempts a connection to the specified address
238 : * and port with the specified mode (i.e. plain or encrypted.)
239 : *
240 : * The function may take a long time to succeed connecting with
241 : * the server. The main thread will be awaken whenever this
242 : * thread dies.
243 : *
244 : * If an error occurs, then the f_socket variable member will
245 : * be set to -1. Otherwise it represents the socket that we
246 : * just connected with.
247 : */
248 1 : void connect()
249 : {
250 1 : char const * error_name(nullptr);
251 : try
252 : {
253 3 : f_messenger = std::make_shared<messenger>(
254 2 : f_parent_impl->parent()
255 : , f_address
256 : , f_blocking
257 1 : , f_close_on_exec);
258 1 : return;
259 : }
260 0 : catch(event_dispatcher_initialization_error const & e)
261 : {
262 0 : error_name = "event_dispatcher_initialization_error";
263 0 : f_last_error = e.what();
264 : }
265 0 : catch(event_dispatcher_runtime_error const & e)
266 : {
267 0 : error_name = "event_dispatcher_runtime_error";
268 0 : f_last_error = e.what();
269 : }
270 0 : catch(std::exception const & e)
271 : {
272 0 : error_name = "std::exception";
273 0 : f_last_error = e.what();
274 : }
275 0 : catch(...)
276 : {
277 0 : error_name = "... (any other exception)";
278 0 : f_last_error = "Unknown exception";
279 : }
280 0 : f_messenger.reset();
281 :
282 : // connection failed... we will have to try again later
283 : //
284 : // WARNING: our logger is not multi-thread safe quiet yet
285 : //SNAP_LOG_ERROR
286 : // << "connection to "
287 : // << f_address
288 : // << ":"
289 : // << f_port
290 : // << " failed with: "
291 : // << f_last_error
292 : // << " ("
293 : // << error_name
294 : // << ")"
295 : // << SNAP_LOG_SEND;
296 : }
297 :
298 :
299 : /** \brief Retrieve the address to connect to.
300 : *
301 : * This function returns the address passed in on creation.
302 : *
303 : * \note
304 : * Since the variable is constant, it is likely to never change.
305 : * However, the c_str() function may change the buffer pointer.
306 : * Hence, to be 100% safe, you cannot call this function until
307 : * you make sure that the thread is fully stopped.
308 : *
309 : * \return The destination address.
310 : */
311 0 : addr::unix get_address() const
312 : {
313 0 : return f_address;
314 : }
315 :
316 :
317 : /** \brief Retrieve the client allocated and connected by the thread.
318 : *
319 : * This function returns the TCP connection object resulting from
320 : * connection attempts of the background thread.
321 : *
322 : * If the pointer is null, then you may get the corresponding
323 : * error message using the get_last_error() function.
324 : *
325 : * You can get the client TCP connection pointer once. After that
326 : * you always get a null pointer.
327 : *
328 : * \note
329 : * This function is guarded so the pointer and the object it
330 : * points to will be valid in another thread that retrieves it.
331 : *
332 : * \return The connection pointer.
333 : */
334 1 : messenger::pointer_t release_client()
335 : {
336 2 : cppthread::guard g(f_mutex);
337 1 : messenger::pointer_t release;
338 1 : release.swap(f_messenger);
339 2 : return release;
340 : }
341 :
342 :
343 : /** \brief Retrieve the last error message that happened.
344 : *
345 : * This function returns the last error message that was captured
346 : * when trying to connect to the socket. The message is the
347 : * e.what() message from the exception we captured.
348 : *
349 : * The message does not get cleared so the function can be called
350 : * any number of times. To know whether an error was generated
351 : * on the last attempt, make sure to first get the get_socket()
352 : * and if it returns -1, then this message is significant,
353 : * otherwise it is from a previous error.
354 : *
355 : * \warning
356 : * Remember that if the background thread was used the error will
357 : * NOT be available in the main thread until a full memory barrier
358 : * was executed. For that reason we make sure that the thread
359 : * was stopped when we detect an error.
360 : *
361 : * \return The last error message.
362 : */
363 0 : std::string const & get_last_error() const
364 : {
365 0 : return f_last_error;
366 : }
367 :
368 :
369 : /** \brief Close the connection.
370 : *
371 : * This function closes the connection. Since the f_local_stream_connection
372 : * holds the socket to the remote server, we have get this function
373 : * called in order to completely disconnect.
374 : *
375 : * \note
376 : * This function does not clear the f_last_error parameter so it
377 : * can be read later.
378 : */
379 1 : void close()
380 : {
381 1 : f_messenger.reset();
382 1 : }
383 :
384 :
385 : private:
386 : local_stream_client_permanent_message_connection_impl *
387 : f_parent_impl = nullptr;
388 : addr::unix const f_address;
389 : bool const f_blocking;
390 : bool const f_close_on_exec;
391 : messenger::pointer_t f_messenger = messenger::pointer_t();
392 : std::string f_last_error = std::string();
393 : };
394 :
395 :
396 : /** \brief Initialize a permanent message connection implementation object.
397 : *
398 : * This object manages the thread used to asynchronically connect to
399 : * the specified address and port.
400 : *
401 : * This class and its sub-classes may end up executing callbacks
402 : * of the local_stream_client_permanent_message_connection object.
403 : * However, in all cases these are never run from the thread.
404 : *
405 : * \param[in] parent A pointer to the owner of this
406 : * local_stream_client_permanent_message_connection_impl object.
407 : * \param[in] address The address we are to connect to.
408 : * \param[in] blocking Whether to open in blocking mode or not.
409 : * \param[in] close_on_exec Whether to mark the socket as requiring to
410 : * be closed on an exec() call.
411 : */
412 1 : local_stream_client_permanent_message_connection_impl(
413 : local_stream_client_permanent_message_connection * parent
414 : , addr::unix const & address
415 : , bool const blocking
416 : , bool const close_on_exec)
417 1 : : f_parent(parent)
418 : , f_thread_runner(this, address, blocking, close_on_exec)
419 1 : , f_thread("background connection handler thread", &f_thread_runner)
420 : {
421 1 : }
422 :
423 :
424 : local_stream_client_permanent_message_connection_impl(local_stream_client_permanent_message_connection_impl const & rhs) = delete;
425 : local_stream_client_permanent_message_connection_impl & operator = (local_stream_client_permanent_message_connection_impl const & rhs) = delete;
426 :
427 : /** \brief Destroy the permanent message connection.
428 : *
429 : * This function makes sure that the messenger was lost.
430 : */
431 1 : ~local_stream_client_permanent_message_connection_impl()
432 1 : {
433 : // to make sure we can lose the messenger, first we want to be sure
434 : // that we do not have a thread running
435 : //
436 : try
437 : {
438 1 : f_thread.stop();
439 : }
440 0 : catch(cppthread::cppthread_mutex_failed_error const &)
441 : {
442 : }
443 0 : catch(cppthread::cppthread_invalid_error const &)
444 : {
445 : }
446 :
447 : // in this case we may still have an instance of the f_thread_done
448 : // which linger around, we want it out
449 : //
450 : // Note: the call is safe even if the f_thread_done is null
451 : //
452 1 : communicator::instance()->remove_connection(f_thread_done);
453 :
454 : // although the f_messenger variable gets reset automatically in
455 : // the destructor, it would not get removed from the
456 : // communicator instance if we were not doing it explicitly
457 : //
458 1 : disconnect();
459 1 : }
460 :
461 :
462 : /** \brief Direct connect to the messenger.
463 : *
464 : * In this case we try to connect without the thread. This allows
465 : * us to avoid the thread problems, but we are blocked until the
466 : * OS decides to time out or the connection worked.
467 : */
468 0 : void connect()
469 : {
470 0 : if(f_done)
471 : {
472 0 : SNAP_LOG_ERROR
473 : << "Permanent connection marked done. Cannot attempt to reconnect."
474 : << SNAP_LOG_SEND;
475 0 : return;
476 : }
477 :
478 : // call the thread connect() function from the main thread
479 : //
480 0 : f_thread_runner.connect();
481 :
482 : // simulate receiving the thread_done() signal
483 : //
484 0 : thread_done();
485 : }
486 :
487 :
488 : /** \brief Check whether the permanent connection is currently connected.
489 : *
490 : * This function returns true if the messenger exists, which means that
491 : * the connection is up.
492 : *
493 : * \return true if the connection is up.
494 : */
495 0 : bool is_connected()
496 : {
497 0 : return f_messenger != nullptr;
498 : }
499 :
500 :
501 : /** \brief Try to start the thread runner.
502 : *
503 : * This function tries to start the thread runner in order to initiate
504 : * a connection in the background. If the thread could not be started,
505 : * then the function returns false.
506 : *
507 : * If the thread started, then the function returns true. This does
508 : * not mean that the connection was obtained. This is known once
509 : * the process_connected() function is called.
510 : *
511 : * \return true if the thread was successfully started.
512 : */
513 1 : bool background_connect()
514 : {
515 1 : if(f_done)
516 : {
517 0 : SNAP_LOG_ERROR
518 : << "Permanent connection marked done. Cannot attempt to reconnect."
519 : << SNAP_LOG_SEND;
520 0 : return false;
521 : }
522 :
523 1 : if(f_thread.is_running())
524 : {
525 0 : SNAP_LOG_ERROR
526 : << "A background connection attempt is already in progress. Further requests are ignored."
527 : << SNAP_LOG_SEND;
528 0 : return false;
529 : }
530 :
531 : // create the f_thread_done only when required
532 : //
533 1 : if(f_thread_done == nullptr)
534 : {
535 1 : f_thread_done = std::make_shared<thread_signal_handler>(this);
536 : }
537 :
538 1 : communicator::instance()->add_connection(f_thread_done);
539 :
540 1 : if(!f_thread.start())
541 : {
542 0 : SNAP_LOG_ERROR
543 : << "The thread used to run the background connection process did not start."
544 : << SNAP_LOG_SEND;
545 0 : return false;
546 : }
547 :
548 1 : return true;
549 : }
550 :
551 :
552 : /** \brief Tell the main thread that the background thread is done.
553 : *
554 : * This function is called by the thread so the thread_done()
555 : * function of the thread done object gets called. Only the
556 : * thread should call this function.
557 : *
558 : * As a result the thread_done() function of this class will be
559 : * called by the main thread.
560 : */
561 1 : void trigger_thread_done()
562 : {
563 1 : f_thread_done->thread_done();
564 1 : }
565 :
566 :
567 : /** \brief Signal that the background thread is done.
568 : *
569 : * This callback is called whenever the background thread sends
570 : * a signal to us. This is used to avoid calling end user functions
571 : * that would certainly cause a lot of problem if called from the
572 : * thread.
573 : *
574 : * The function calls the process_connection_failed() if the
575 : * connection did not happen.
576 : *
577 : * The function calls the process_connected() if the connection
578 : * did happen.
579 : *
580 : * \note
581 : * This is used only if the user requested that the connection
582 : * happen in the background (i.e. use_thread was set to true
583 : * in the local_stream_client_permanent_message_connection object
584 : * constructor.)
585 : */
586 1 : void thread_done()
587 : {
588 : // if we used the thread we have to remove the signal used
589 : // to know that the thread was done
590 : //
591 1 : communicator::instance()->remove_connection(f_thread_done);
592 :
593 : // we will access the f_last_error member of the thread runner
594 : // which may not be available to the main thread yet, calling
595 : // stop forces a memory barrier so we are all good.
596 : //
597 : // calling stop() has no effect if we did not use the thread,
598 : // however, not calling stop() when we did use the thread
599 : // causes all sorts of other problems (especially, the thread
600 : // never gets joined)
601 : //
602 1 : f_thread.stop();
603 :
604 2 : messenger::pointer_t client(f_thread_runner.release_client());
605 1 : if(f_done)
606 : {
607 : // already marked done, ignore the result and lose the
608 : // connection immediately
609 : //
610 : //f_thread_running.close(); -- not necessary, 'client' is the connection
611 0 : return;
612 : }
613 :
614 1 : if(client == nullptr)
615 : {
616 : // TODO: fix address in error message using a addr::addr so
617 : // as to handle IPv6 seamlessly.
618 : //
619 0 : SNAP_LOG_ERROR
620 : << "connection to "
621 0 : << f_thread_runner.get_address().to_uri()
622 : << " failed with: "
623 0 : << f_thread_runner.get_last_error()
624 : << SNAP_LOG_SEND;
625 :
626 : // signal that an error occurred
627 : //
628 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
629 : }
630 : else
631 : {
632 1 : f_messenger = client;
633 :
634 : // add the messenger to the communicator
635 : //
636 1 : communicator::instance()->add_connection(f_messenger);
637 :
638 : // if some messages were cached, process them immediately
639 : //
640 3 : while(!f_message_cache.empty())
641 : {
642 1 : f_messenger->send_message(f_message_cache[0]);
643 1 : f_message_cache.erase(f_message_cache.begin());
644 : }
645 :
646 : // let the client know we are now connected
647 : //
648 1 : f_parent->process_connected();
649 : }
650 : }
651 :
652 : /** \brief Send a message to the connection.
653 : *
654 : * This implementation function actually sends the message to the
655 : * connection, assuming that the connection exists. Otherwise, it
656 : * may cache the message (if cache is true.)
657 : *
658 : * Note that the message does not get cached if mark_done() was
659 : * called earlier since we are trying to close the whole connection.
660 : *
661 : * \param[in] msg The message to send.
662 : * \param[in] cache Whether to cache the message if the connection is
663 : * currently down.
664 : *
665 : * \return true if the message was forwarded, false if the message
666 : * was ignored or cached.
667 : */
668 2 : bool send_message(message const & msg, bool cache)
669 : {
670 2 : if(f_messenger != nullptr)
671 : {
672 1 : return f_messenger->send_message(msg);
673 : }
674 :
675 1 : if(cache && !f_done)
676 : {
677 1 : f_message_cache.push_back(msg);
678 : }
679 :
680 1 : return false;
681 : }
682 :
683 :
684 : /** \brief Forget about the messenger connection.
685 : *
686 : * This function is used to fully disconnect from the messenger.
687 : *
688 : * If there is a messenger, this means:
689 : *
690 : * \li Removing the messenger from the communicator instance.
691 : * \li Closing the connection in the thread object.
692 : *
693 : * In most cases, it is called when an error occur, also it happens
694 : * that we call it explicitly through the disconnect() function
695 : * of the permanent connection class.
696 : *
697 : * \note
698 : * This is safe, even though it is called from the messenger itself
699 : * because it will not get deleted yet. This is because the run()
700 : * loop has a copy in its own temporary copy of the vector of
701 : * connections.
702 : */
703 2 : void disconnect()
704 : {
705 2 : if(f_messenger != nullptr)
706 : {
707 1 : communicator::instance()->remove_connection(f_messenger);
708 1 : f_messenger.reset();
709 :
710 : // just the messenger does not close the TCP connection because
711 : // we may have another in the thread runner
712 : //
713 1 : f_thread_runner.close();
714 : }
715 2 : }
716 :
717 :
718 : /** \brief Return the address and size of the remote computer.
719 : *
720 : * This function retrieve the socket address.
721 : *
722 : * \warning
723 : * If the socket is not currently connected, the function returns
724 : * a default Unix address. This means it returns a valid unnamed
725 : * address.
726 : *
727 : * \return The Unix address used to connect.
728 : */
729 0 : addr::unix get_address() const
730 : {
731 0 : if(f_messenger != nullptr)
732 : {
733 0 : return f_messenger->get_address();
734 : }
735 0 : return addr::unix();
736 : }
737 :
738 :
739 : /** \brief Mark the messenger as done.
740 : *
741 : * This function is used to mark the messenger as done. This means it
742 : * will get removed from the communicator instance as soon as it
743 : * is done with its current write buffer if there is one.
744 : *
745 : * You may also want to call the disconnection() function to actually
746 : * reset the pointer along the way.
747 : */
748 1 : void mark_done()
749 : {
750 1 : f_done = true;
751 :
752 : // once done we don't attempt to reconnect so we can as well
753 : // get rid of our existing cache immediately to save some
754 : // memory
755 : //
756 1 : f_message_cache.clear();
757 :
758 1 : if(f_messenger != nullptr)
759 : {
760 1 : f_messenger->mark_done();
761 : }
762 1 : }
763 :
764 :
765 : /** \brief Retrieve the parent of the impl.
766 : *
767 : * This function returns the parent of the impl, which is the main
768 : * local_stream_client_permanent_message_connection point. This is
769 : * saved in the messenger so we can relay events from our internal
770 : * messenger implementation to the main class owned by the user.
771 : *
772 : * \return The pointer to the parent of the impl.
773 : */
774 1 : local_stream_client_permanent_message_connection * parent() const
775 : {
776 1 : return f_parent;
777 : }
778 :
779 :
780 : private:
781 : local_stream_client_permanent_message_connection *
782 : f_parent = nullptr;
783 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
784 : runner f_thread_runner;
785 : cppthread::thread f_thread;
786 : messenger::pointer_t f_messenger = messenger::pointer_t();
787 : message::vector_t f_message_cache = message::vector_t();
788 : bool f_done = false;
789 : };
790 :
791 :
792 :
793 : }
794 : // namespace detail
795 :
796 :
797 :
798 : /** \brief Initializes this TCP client message connection.
799 : *
800 : * This implementation creates what we call a permanent connection.
801 : * Such a connection may fail once in a while. In such circumstances,
802 : * the class automatically requests for a reconnection (see various
803 : * parameters in the regard below.) However, this causes one issue:
804 : * by default, the connection just never ends. When you are about
805 : * ready to close the connection, you must call the mark_done()
806 : * function first. This will tell the various error functions to
807 : * drop this connection instead of restarting it after a small pause.
808 : *
809 : * This constructor makes sure to initialize the timer and saves
810 : * the address, port, mode, pause, and use_thread parameters.
811 : *
812 : * The timer is first set to trigger immediately. This means the TCP
813 : * connection will be attempted as soon as possible (the next time
814 : * the run() loop is entered, it will time out immediately.) You
815 : * are free to call set_timeout_date() with a date in the future if
816 : * you prefer that the connect be attempted a little later.
817 : *
818 : * The \p pause parameter is used if the connection is lost and this
819 : * timer is used again to attempt a new connection. It will be reused
820 : * as long as the connection fails (as a delay). It has to be at least
821 : * 10 microseconds, although really you should not use less than 1
822 : * second (1000000). You may set the pause parameter to 0 in which case
823 : * you are responsible to set the delay (by default there will be no
824 : * delay and thus the timer will never time out.)
825 : *
826 : * To start with a delay, instead of trying to connect immediately,
827 : * you may pass a negative pause parameter. So for example to get the
828 : * first attempt 5 seconds after you created this object, you use
829 : * -5000000LL as the pause parameter.
830 : *
831 : * The \p use_thread parameter determines whether the connection should
832 : * be attempted in a thread (asynchronously) or immediately (which means
833 : * the timeout callback may block for a while.) If the connection is to
834 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
835 : * the thread is probably not required. For connections to a remote
836 : * computer, though, it certainly is important.
837 : *
838 : * \param[in] address The address to listen on. It may be set to "0.0.0.0".
839 : * \param[in] pause The amount of time to wait before attempting a new
840 : * connection after a failure, in microseconds, or 0.
841 : * \param[in] use_thread Whether a thread is used to connect to the
842 : * server.
843 : * \param[in] blocking Whether the socket is going to be blocking or not.
844 : * \param[in] close_on_exec Automatically close the connection if the process
845 : * execute an exec() call.
846 : */
847 1 : local_stream_client_permanent_message_connection::local_stream_client_permanent_message_connection(
848 : addr::unix const & address
849 : , std::int64_t const pause
850 : , bool const use_thread
851 : , bool const blocking
852 1 : , bool const close_on_exec)
853 : : timer(pause < 0 ? -pause : 0)
854 : , f_impl(std::make_shared<detail::local_stream_client_permanent_message_connection_impl>(
855 : this
856 : , address
857 : , blocking
858 : , close_on_exec))
859 1 : , f_pause(llabs(pause))
860 2 : , f_use_thread(use_thread)
861 : {
862 1 : }
863 :
864 :
865 : /** \brief Destroy instance.
866 : *
867 : * This function cleans up everything in the permanent message object.
868 : */
869 1 : local_stream_client_permanent_message_connection::~local_stream_client_permanent_message_connection()
870 : {
871 : // Does nothing
872 1 : }
873 :
874 :
875 : /** \brief Attempt to send a message to this connection.
876 : *
877 : * If the connection is currently enabled, the message is sent immediately.
878 : * Otherwise, it may be cached if the \p cache parameter is set to true.
879 : * A cached message is forwarded as soon as a new successful connection
880 : * happens, which can be a problem if messages need to happen in a very
881 : * specific order (For example, after a reconnection to snapcommunicator
882 : * you first need to REGISTER or CONNECT...)
883 : *
884 : * \param[in] msg The message to send to the connected server.
885 : * \param[in] cache Whether the message should be cached.
886 : *
887 : * \return true if the message was sent, false if it was not sent, although
888 : * if cache was true, it was cached
889 : */
890 2 : bool local_stream_client_permanent_message_connection::send_message(message const & msg, bool cache)
891 : {
892 2 : return f_impl->send_message(msg, cache);
893 : }
894 :
895 :
896 : /** \brief Check whether the connection is up.
897 : *
898 : * This function returns true if the connection is considered to be up.
899 : * This means sending messages will work quickly instead of being
900 : * cached up until an actual TCP/IP connection gets established.
901 : *
902 : * Note that the connection may have hanged up since, and the system
903 : * may not have yet detected the fact (i.e. the connection is going
904 : * to receive the process_hup() call after the event in which you are
905 : * working.)
906 : *
907 : * \return true if connected
908 : */
909 0 : bool local_stream_client_permanent_message_connection::is_connected() const
910 : {
911 0 : return f_impl->is_connected();
912 : }
913 :
914 :
915 : /** \brief Disconnect the messenger now.
916 : *
917 : * This function kills the current connection.
918 : *
919 : * There are a few cases where two daemons communicate between each others
920 : * and at some point one of them wants to exit and needs to disconnect. This
921 : * function can be used in that one situation assuming that you have an
922 : * acknowledgement from the other daemon.
923 : *
924 : * Say you have daemon A and B. B wants to quit and before doing so sends
925 : * a form of "I'm quitting" message to A. In that situation, B is not closing
926 : * the messenger connection, A is responsible for that (i.e. A acknowledges
927 : * receipt of the "I'm quitting" message from B by closing the connection.)
928 : *
929 : * B also wants to call the mark_done() function to make sure that it
930 : * does not reconnected a split second later and instead the permanent
931 : * connection gets removed from the communicator list of connections.
932 : */
933 0 : void local_stream_client_permanent_message_connection::disconnect()
934 : {
935 0 : f_impl->disconnect();
936 0 : }
937 :
938 :
939 : /** \brief Overload so we do not have to use namespace everywhere.
940 : *
941 : * This function overloads the connection::mark_done() function so
942 : * we can call it without the need to use timer::mark_done()
943 : * everywhere.
944 : */
945 0 : void local_stream_client_permanent_message_connection::mark_done()
946 : {
947 0 : timer::mark_done();
948 0 : }
949 :
950 :
951 : /** \brief Mark connection as done.
952 : *
953 : * This function allows you to mark the permanent connection and the
954 : * messenger as done.
955 : *
956 : * Note that calling this function with false is the same as calling the
957 : * base class mark_done() function.
958 : *
959 : * If the \p message parameter is set to true, we suggest you also call
960 : * the disconnect() function. That way the messenger will truly get
961 : * removed from everyone quickly.
962 : *
963 : * \param[in] messenger If true, also mark the messenger as done.
964 : */
965 1 : void local_stream_client_permanent_message_connection::mark_done(bool messenger)
966 : {
967 1 : timer::mark_done();
968 1 : if(messenger)
969 : {
970 1 : f_impl->mark_done();
971 : }
972 1 : }
973 :
974 :
975 : /** \brief Retrieve a copy of the client's address.
976 : *
977 : * This function makes a copy of the address of this client connection
978 : * to the \p address parameter and returns the length.
979 : *
980 : * \return Return a copy of the Unix address.
981 : */
982 0 : addr::unix local_stream_client_permanent_message_connection::get_address() const
983 : {
984 0 : return f_impl->get_address();
985 : }
986 :
987 :
988 : /** \brief Internal timeout callback implementation.
989 : *
990 : * This callback implements the guts of this class: it attempts to connect
991 : * to the specified address and port, optionally after creating a thread
992 : * so the attempt can happen asynchronously.
993 : *
994 : * When the connection fails, the timer is used to try again pause
995 : * microseconds later (pause as specified in the constructor).
996 : *
997 : * When a connection succeeds, the timer is disabled until you detect
998 : * an error while using the connection and re-enable the timer.
999 : *
1000 : * \warning
1001 : * This function changes the timeout delay to the pause amount
1002 : * as defined with the constructor. If you want to change that
1003 : * amount, you can do so an any point after this function call
1004 : * using the set_timeout_delay() function. If the pause parameter
1005 : * was set to -1, then the timeout never gets changed.
1006 : * However, you should not use a permanent message timer as your
1007 : * own or you will interfere with the internal use of the timer.
1008 : */
1009 1 : void local_stream_client_permanent_message_connection::process_timeout()
1010 : {
1011 : // got a spurious call when already marked done
1012 : //
1013 1 : if(is_done())
1014 : {
1015 0 : return;
1016 : }
1017 :
1018 : // change the timeout delay although we will not use it immediately
1019 : // if we start the thread or attempt an immediate connection, but
1020 : // that way the user can change it by calling set_timeout_delay()
1021 : // at any time after the first process_timeout() call
1022 : //
1023 1 : if(f_pause > 0)
1024 : {
1025 1 : set_timeout_delay(f_pause);
1026 1 : f_pause = 0;
1027 : }
1028 :
1029 1 : if(f_use_thread)
1030 : {
1031 : // in this case we create a thread, run it and know whether the
1032 : // connection succeeded only when the thread tells us it did
1033 : //
1034 : // TODO: the background_connect() may return false in two situations:
1035 : // 1) when the thread is already running and then the behavior
1036 : // we have below is INCORRECT
1037 : // 2) when the thread cannot be started (i.e. could not
1038 : // allocate the stack?) in which case the if() below
1039 : // is the correct behavior
1040 : //
1041 1 : if(f_impl->background_connect())
1042 : {
1043 : // we started the thread successfully, so block the timer
1044 : //
1045 1 : set_enable(false);
1046 : }
1047 : }
1048 : else
1049 : {
1050 : // the success is noted when we receive a call to
1051 : // process_connected(); there we do set_enable(false)
1052 : // so the timer stops
1053 : //
1054 0 : f_impl->connect();
1055 : }
1056 : }
1057 :
1058 :
1059 : /** \brief Process an error.
1060 : *
1061 : * When an error occurs, we restart the timer so we can attempt to reconnect
1062 : * to that server.
1063 : *
1064 : * If you overload this function, make sure to either call this
1065 : * implementation or enable the timer yourselves.
1066 : *
1067 : * \warning
1068 : * This function does not call the timer::process_error() function
1069 : * which means that this connection is not automatically removed from
1070 : * the communicator object on failures.
1071 : */
1072 0 : void local_stream_client_permanent_message_connection::process_error()
1073 : {
1074 0 : if(is_done())
1075 : {
1076 0 : timer::process_error();
1077 : }
1078 : else
1079 : {
1080 0 : f_impl->disconnect();
1081 0 : set_enable(true);
1082 : }
1083 0 : }
1084 :
1085 :
1086 : /** \brief Process a hang up.
1087 : *
1088 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1089 : * to that server.
1090 : *
1091 : * If you overload this function, make sure to either call this
1092 : * implementation or enable the timer yourselves.
1093 : *
1094 : * \warning
1095 : * This function does not call the timer::process_hup() function
1096 : * which means that this connection is not automatically removed from
1097 : * the communicator object on failures.
1098 : */
1099 0 : void local_stream_client_permanent_message_connection::process_hup()
1100 : {
1101 0 : if(is_done())
1102 : {
1103 0 : timer::process_hup();
1104 : }
1105 : else
1106 : {
1107 0 : f_impl->disconnect();
1108 0 : set_enable(true);
1109 : }
1110 0 : }
1111 :
1112 :
1113 : /** \brief Process an invalid signal.
1114 : *
1115 : * When an invalid signal occurs, we restart the timer so we can attempt
1116 : * to reconnect to that server.
1117 : *
1118 : * If you overload this function, make sure to either call this
1119 : * implementation or enable the timer yourselves.
1120 : *
1121 : * \warning
1122 : * This function does not call the timer::process_invalid() function
1123 : * which means that this connection is not automatically removed from
1124 : * the communicator object on failures.
1125 : */
1126 0 : void local_stream_client_permanent_message_connection::process_invalid()
1127 : {
1128 0 : if(is_done())
1129 : {
1130 0 : timer::process_invalid();
1131 : }
1132 : else
1133 : {
1134 0 : f_impl->disconnect();
1135 0 : set_enable(true);
1136 : }
1137 0 : }
1138 :
1139 :
1140 : /** \brief Make sure that the messenger connection gets removed.
1141 : *
1142 : * This function makes sure that the messenger sub-connection also gets
1143 : * removed from the communicator. Otherwise it would lock the system
1144 : * since connections are saved in the communicator object as shared
1145 : * pointers.
1146 : */
1147 1 : void local_stream_client_permanent_message_connection::connection_removed()
1148 : {
1149 1 : f_impl->disconnect();
1150 1 : }
1151 :
1152 :
1153 : /** \brief Process a connection failed callback.
1154 : *
1155 : * When a connection attempt fails, we restart the timer so we can
1156 : * attempt to reconnect to that server.
1157 : *
1158 : * If you overload this function, make sure to either call this
1159 : * implementation or enable the timer yourselves.
1160 : *
1161 : * \param[in] error_message The error message that triggered this callback.
1162 : */
1163 0 : void local_stream_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1164 : {
1165 0 : snapdev::NOT_USED(error_message);
1166 0 : set_enable(true);
1167 0 : }
1168 :
1169 :
1170 : /** \brief The connection is ready.
1171 : *
1172 : * This callback gets called whenever the connection succeeded and is
1173 : * ready to be used.
1174 : *
1175 : * You should implement this virtual function if you have to initiate
1176 : * the communication. For example, the snapserver has to send a
1177 : * REGISTER to the snapcommunicator system and thus implements this
1178 : * function.
1179 : *
1180 : * The default implementation makes sure that the timer gets turned off
1181 : * so we do not try to reconnect every minute or so. Make sure you call
1182 : * the default function so you get the proper behavior.
1183 : */
1184 1 : void local_stream_client_permanent_message_connection::process_connected()
1185 : {
1186 1 : set_enable(false);
1187 1 : }
1188 :
1189 :
1190 :
1191 6 : } // namespace ed
1192 : // vim: ts=4 sw=4 et
|