Line data Source code
1 : // Copyright (c) 2012-2022 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation client permanent message connection.
22 : *
23 : * This class implements a permanent connection to a Unix socket. This
24 : * means if the server is restarted, this class is capable to automatically
25 : * reconnect under the hood. This can be done using a thread so if the
26 : * connect() command is slow (needs to time out), it won't block the
27 : * rest of your event loop.
28 : */
29 :
30 :
31 : // self
32 : //
33 : #include "eventdispatcher/local_stream_client_permanent_message_connection.h"
34 :
35 : #include "eventdispatcher/communicator.h"
36 : #include "eventdispatcher/exception.h"
37 : //#include "eventdispatcher/local_stream_server_client_message_connection.h"
38 : #include "eventdispatcher/local_stream_client_message_connection.h"
39 : #include "eventdispatcher/thread_done_signal.h"
40 :
41 :
42 : // snaplogger lib
43 : //
44 : #include <snaplogger/message.h>
45 :
46 :
47 : // snapdev lib
48 : //
49 : //#include <snapdev/not_used.h>
50 :
51 :
52 : // cppthread lib
53 : //
54 : #include <cppthread/exception.h>
55 : #include <cppthread/guard.h>
56 : #include <cppthread/runner.h>
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // C++ lib
61 : //
62 : //#include <cstring>
63 :
64 :
65 : // C lib
66 : //
67 : //#include <sys/socket.h>
68 :
69 :
70 : // last include
71 : //
72 : #include <snapdev/poison.h>
73 :
74 :
75 :
76 : namespace ed
77 : {
78 :
79 :
80 :
81 : namespace detail
82 : {
83 :
84 :
85 : /** \brief Internal implementation of the local_stream_client_permanent_message_connection class.
86 : *
87 : * This class is used to handle a thread that will process a connection for
88 : * us. This allows us to connect in any amount of time required by the
89 : * Unix system to obtain the connection with the remote server.
90 : *
91 : * \todo
92 : * Having threads at the time we do a fork() is not safe. We may
93 : * want to reconsider offering this functionality here. Because at
94 : * this time we would have no control of when the thread is created
95 : * and thus a way to make sure that no such thread is running when
96 : * we call fork().
97 : */
98 : class local_stream_client_permanent_message_connection_impl
99 : {
100 : public:
101 1 : class messenger
102 : : public local_stream_client_message_connection
103 : {
104 : public:
105 : typedef std::shared_ptr<messenger> pointer_t;
106 :
107 1 : messenger(
108 : local_stream_client_permanent_message_connection * parent
109 : , addr::unix const & address
110 : , bool const blocking
111 : , bool const close_on_exec)
112 1 : : local_stream_client_message_connection(
113 : address
114 : , blocking
115 : , close_on_exec)
116 1 : , f_parent(parent)
117 : {
118 1 : set_name("local_stream_client_permanent_message_connection_impl::messenger");
119 1 : }
120 :
121 : messenger(messenger const & rhs) = delete;
122 : messenger & operator = (messenger const & rhs) = delete;
123 :
124 : // connection implementation
125 2 : virtual void process_empty_buffer()
126 : {
127 2 : local_stream_client_message_connection::process_empty_buffer();
128 2 : f_parent->process_empty_buffer();
129 2 : }
130 :
131 : // connection implementation
132 0 : virtual void process_error()
133 : {
134 0 : local_stream_client_message_connection::process_error();
135 0 : f_parent->process_error();
136 0 : }
137 :
138 : // connection implementation
139 0 : virtual void process_hup()
140 : {
141 0 : local_stream_client_message_connection::process_hup();
142 0 : f_parent->process_hup();
143 0 : }
144 :
145 : // connection implementation
146 0 : virtual void process_invalid()
147 : {
148 0 : local_stream_client_message_connection::process_invalid();
149 0 : f_parent->process_invalid();
150 0 : }
151 :
152 : // local_stream_server_client_message_connection implementation
153 1 : virtual void process_message(message & msg)
154 : {
155 : // We call the dispatcher from our parent since the child
156 : // (this messenger) is not given a dispatcher
157 : //
158 1 : f_parent->dispatch_message(msg);
159 1 : }
160 :
161 : private:
162 : local_stream_client_permanent_message_connection * f_parent = nullptr;
163 : };
164 :
165 1 : class thread_signal_handler
166 : : public thread_done_signal
167 : {
168 : public:
169 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
170 :
171 1 : thread_signal_handler(local_stream_client_permanent_message_connection_impl * parent_impl)
172 1 : : f_parent_impl(parent_impl)
173 : {
174 1 : set_name("local_stream_client_permanent_message_connection_impl::thread_signal_handler");
175 1 : }
176 :
177 : thread_signal_handler(thread_signal_handler const & rhs) = delete;
178 : thread_signal_handler & operator = (thread_signal_handler const & rhs) = delete;
179 :
180 : /** \brief This signal was emitted.
181 : *
182 : * This function gets called whenever the thread is just about to
183 : * quit. Calling f_thread.is_running() may still return true when
184 : * you get in the 'thread_done()' callback. However, an
185 : * f_thread.stop() will return very quickly.
186 : */
187 1 : virtual void process_read()
188 : {
189 1 : thread_done_signal::process_read();
190 :
191 1 : f_parent_impl->thread_done();
192 1 : }
193 :
194 : private:
195 : local_stream_client_permanent_message_connection_impl * f_parent_impl = nullptr;
196 : };
197 :
198 1 : class runner
199 : : public cppthread::runner
200 : {
201 : public:
202 1 : runner(
203 : local_stream_client_permanent_message_connection_impl * parent_impl
204 : , addr::unix const & address
205 : , bool const blocking = false
206 : , bool const close_on_exec = true)
207 1 : : cppthread::runner("background local_stream_client_permanent_message_connection for asynchronous connections")
208 : , f_parent_impl(parent_impl)
209 : , f_address(address)
210 : , f_blocking(blocking)
211 1 : , f_close_on_exec(close_on_exec)
212 : {
213 1 : }
214 :
215 : runner(runner const & rhs) = delete;
216 : runner & operator = (runner const & rhs) = delete;
217 :
218 :
219 : /** \brief This is the actual function run by the thread.
220 : *
221 : * This function calls the connect() function and then
222 : * tells the main thread we are done.
223 : */
224 1 : virtual void run()
225 : {
226 1 : connect();
227 :
228 : // tell the main thread that we are done
229 : //
230 1 : f_parent_impl->trigger_thread_done();
231 1 : }
232 :
233 :
234 : /** \brief This function attempts to connect.
235 : *
236 : * This function attempts a connection to the specified address
237 : * and port with the specified mode (i.e. plain or encrypted.)
238 : *
239 : * The function may take a long time to succeed connecting with
240 : * the server. The main thread will be awaken whenever this
241 : * thread dies.
242 : *
243 : * If an error occurs, then the f_socket variable member will
244 : * be set to -1. Otherwise it represents the socket that we
245 : * just connected with.
246 : */
247 1 : void connect()
248 : {
249 1 : char const * error_name(nullptr);
250 : try
251 : {
252 3 : f_messenger = std::make_shared<messenger>(
253 2 : f_parent_impl->parent()
254 : , f_address
255 : , f_blocking
256 1 : , f_close_on_exec);
257 1 : return;
258 : }
259 0 : catch(initialization_error const & e)
260 : {
261 0 : error_name = "initialization_error";
262 0 : f_last_error = e.what();
263 : }
264 0 : catch(runtime_error const & e)
265 : {
266 0 : error_name = "runtime_error";
267 0 : f_last_error = e.what();
268 : }
269 0 : catch(std::exception const & e)
270 : {
271 0 : error_name = "std::exception";
272 0 : f_last_error = e.what();
273 : }
274 0 : catch(...)
275 : {
276 0 : error_name = "... (any other exception)";
277 0 : f_last_error = "Unknown exception";
278 : }
279 0 : f_messenger.reset();
280 :
281 : // connection failed... we will have to try again later
282 : //
283 0 : SNAP_LOG_ERROR
284 : << "connection to "
285 0 : << f_address.to_string()
286 : << " failed with: "
287 : << f_last_error
288 : << " ("
289 : << error_name
290 : << ")"
291 : << SNAP_LOG_SEND;
292 : }
293 :
294 :
295 : /** \brief Retrieve the address to connect to.
296 : *
297 : * This function returns the address passed in on creation.
298 : *
299 : * \note
300 : * Since the variable is constant, it is likely to never change.
301 : * However, the c_str() function may change the buffer pointer.
302 : * Hence, to be 100% safe, you cannot call this function until
303 : * you make sure that the thread is fully stopped.
304 : *
305 : * \return The destination address.
306 : */
307 0 : addr::unix get_address() const
308 : {
309 0 : return f_address;
310 : }
311 :
312 :
313 : /** \brief Retrieve the client allocated and connected by the thread.
314 : *
315 : * This function returns the TCP connection object resulting from
316 : * connection attempts of the background thread.
317 : *
318 : * If the pointer is null, then you may get the corresponding
319 : * error message using the get_last_error() function.
320 : *
321 : * You can get the client TCP connection pointer once. After that
322 : * you always get a null pointer.
323 : *
324 : * \note
325 : * This function is guarded so the pointer and the object it
326 : * points to will be valid in another thread that retrieves it.
327 : *
328 : * \return The connection pointer.
329 : */
330 1 : messenger::pointer_t release_client()
331 : {
332 2 : cppthread::guard g(f_mutex);
333 1 : messenger::pointer_t release;
334 1 : release.swap(f_messenger);
335 2 : return release;
336 : }
337 :
338 :
339 : /** \brief Retrieve the last error message that happened.
340 : *
341 : * This function returns the last error message that was captured
342 : * when trying to connect to the socket. The message is the
343 : * e.what() message from the exception we captured.
344 : *
345 : * The message does not get cleared so the function can be called
346 : * any number of times. To know whether an error was generated
347 : * on the last attempt, make sure to first get the get_socket()
348 : * and if it returns -1, then this message is significant,
349 : * otherwise it is from a previous error.
350 : *
351 : * \warning
352 : * Remember that if the background thread was used the error will
353 : * NOT be available in the main thread until a full memory barrier
354 : * was executed. For that reason we make sure that the thread
355 : * was stopped when we detect an error.
356 : *
357 : * \return The last error message.
358 : */
359 0 : std::string const & get_last_error() const
360 : {
361 0 : return f_last_error;
362 : }
363 :
364 :
365 : /** \brief Close the connection.
366 : *
367 : * This function closes the connection. Since the f_local_stream_connection
368 : * holds the socket to the remote server, we have get this function
369 : * called in order to completely disconnect.
370 : *
371 : * \note
372 : * This function does not clear the f_last_error parameter so it
373 : * can be read later.
374 : */
375 1 : void close()
376 : {
377 1 : f_messenger.reset();
378 1 : }
379 :
380 :
381 : private:
382 : local_stream_client_permanent_message_connection_impl *
383 : f_parent_impl = nullptr;
384 : addr::unix const f_address;
385 : bool const f_blocking;
386 : bool const f_close_on_exec;
387 : messenger::pointer_t f_messenger = messenger::pointer_t();
388 : std::string f_last_error = std::string();
389 : };
390 :
391 :
392 : /** \brief Initialize a permanent message connection implementation object.
393 : *
394 : * This object manages the thread used to asynchronically connect to
395 : * the specified address and port.
396 : *
397 : * This class and its sub-classes may end up executing callbacks
398 : * of the local_stream_client_permanent_message_connection object.
399 : * However, in all cases these are never run from the thread.
400 : *
401 : * \param[in] parent A pointer to the owner of this
402 : * local_stream_client_permanent_message_connection_impl object.
403 : * \param[in] address The address we are to connect to.
404 : * \param[in] blocking Whether to open in blocking mode or not.
405 : * \param[in] close_on_exec Whether to mark the socket as requiring to
406 : * be closed on an exec() call.
407 : */
408 1 : local_stream_client_permanent_message_connection_impl(
409 : local_stream_client_permanent_message_connection * parent
410 : , addr::unix const & address
411 : , bool const blocking
412 : , bool const close_on_exec)
413 1 : : f_parent(parent)
414 : , f_thread_runner(this, address, blocking, close_on_exec)
415 1 : , f_thread("background connection handler thread", &f_thread_runner)
416 : {
417 1 : }
418 :
419 :
420 : local_stream_client_permanent_message_connection_impl(local_stream_client_permanent_message_connection_impl const & rhs) = delete;
421 : local_stream_client_permanent_message_connection_impl & operator = (local_stream_client_permanent_message_connection_impl const & rhs) = delete;
422 :
423 : /** \brief Destroy the permanent message connection.
424 : *
425 : * This function makes sure that the messenger was lost.
426 : */
427 1 : ~local_stream_client_permanent_message_connection_impl()
428 1 : {
429 : // to make sure we can lose the messenger, first we want to be sure
430 : // that we do not have a thread running
431 : //
432 : try
433 : {
434 1 : f_thread.stop();
435 : }
436 0 : catch(cppthread::cppthread_mutex_failed_error const &)
437 : {
438 : }
439 0 : catch(cppthread::cppthread_invalid_error const &)
440 : {
441 : }
442 :
443 : // in this case we may still have an instance of the f_thread_done
444 : // which linger around, we want it out
445 : //
446 : // Note: the call is safe even if the f_thread_done is null
447 : //
448 1 : communicator::instance()->remove_connection(f_thread_done);
449 :
450 : // although the f_messenger variable gets reset automatically in
451 : // the destructor, it would not get removed from the
452 : // communicator instance if we were not doing it explicitly
453 : //
454 1 : disconnect();
455 1 : }
456 :
457 :
458 : /** \brief Direct connect to the messenger.
459 : *
460 : * In this case we try to connect without the thread. This allows
461 : * us to avoid the thread problems, but we are blocked until the
462 : * OS decides to time out or the connection worked.
463 : */
464 0 : void connect()
465 : {
466 0 : if(f_done)
467 : {
468 0 : SNAP_LOG_ERROR
469 : << "Permanent connection marked done. Cannot attempt to reconnect."
470 : << SNAP_LOG_SEND;
471 0 : return;
472 : }
473 :
474 : // call the thread connect() function from the main thread
475 : //
476 0 : f_thread_runner.connect();
477 :
478 : // simulate receiving the thread_done() signal
479 : //
480 0 : thread_done();
481 : }
482 :
483 :
484 : /** \brief Check whether the permanent connection is currently connected.
485 : *
486 : * This function returns true if the messenger exists, which means that
487 : * the connection is up.
488 : *
489 : * \return true if the connection is up.
490 : */
491 0 : bool is_connected()
492 : {
493 0 : return f_messenger != nullptr;
494 : }
495 :
496 :
497 : /** \brief Try to start the thread runner.
498 : *
499 : * This function tries to start the thread runner in order to initiate
500 : * a connection in the background. If the thread could not be started,
501 : * then the function returns false.
502 : *
503 : * If the thread started, then the function returns true. This does
504 : * not mean that the connection was obtained. This is known once
505 : * the process_connected() function is called.
506 : *
507 : * \return true if the thread was successfully started.
508 : */
509 1 : bool background_connect()
510 : {
511 1 : if(f_done)
512 : {
513 0 : SNAP_LOG_ERROR
514 : << "Permanent connection marked done. Cannot attempt to reconnect."
515 : << SNAP_LOG_SEND;
516 0 : return false;
517 : }
518 :
519 1 : if(f_thread.is_running())
520 : {
521 0 : SNAP_LOG_ERROR
522 : << "A background connection attempt is already in progress. Further requests are ignored."
523 : << SNAP_LOG_SEND;
524 0 : return false;
525 : }
526 :
527 : // create the f_thread_done only when required
528 : //
529 1 : if(f_thread_done == nullptr)
530 : {
531 1 : f_thread_done = std::make_shared<thread_signal_handler>(this);
532 : }
533 :
534 1 : communicator::instance()->add_connection(f_thread_done);
535 :
536 1 : if(!f_thread.start())
537 : {
538 0 : SNAP_LOG_ERROR
539 : << "The thread used to run the background connection process did not start."
540 : << SNAP_LOG_SEND;
541 0 : return false;
542 : }
543 :
544 1 : return true;
545 : }
546 :
547 :
548 : /** \brief Tell the main thread that the background thread is done.
549 : *
550 : * This function is called by the thread so the thread_done()
551 : * function of the thread done object gets called. Only the
552 : * thread should call this function.
553 : *
554 : * As a result the thread_done() function of this class will be
555 : * called by the main thread.
556 : */
557 1 : void trigger_thread_done()
558 : {
559 1 : f_thread_done->thread_done();
560 1 : }
561 :
562 :
563 : /** \brief Signal that the background thread is done.
564 : *
565 : * This callback is called whenever the background thread sends
566 : * a signal to us. This is used to avoid calling end user functions
567 : * that would certainly cause a lot of problem if called from the
568 : * thread.
569 : *
570 : * The function calls the process_connection_failed() if the
571 : * connection did not happen.
572 : *
573 : * The function calls the process_connected() if the connection
574 : * did happen.
575 : *
576 : * \note
577 : * This is used only if the user requested that the connection
578 : * happen in the background (i.e. use_thread was set to true
579 : * in the local_stream_client_permanent_message_connection object
580 : * constructor.)
581 : */
582 1 : void thread_done()
583 : {
584 : // if we used the thread we have to remove the signal used
585 : // to know that the thread was done
586 : //
587 1 : communicator::instance()->remove_connection(f_thread_done);
588 :
589 : // we will access the f_last_error member of the thread runner
590 : // which may not be available to the main thread yet, calling
591 : // stop forces a memory barrier so we are all good.
592 : //
593 : // calling stop() has no effect if we did not use the thread,
594 : // however, not calling stop() when we did use the thread
595 : // causes all sorts of other problems (especially, the thread
596 : // never gets joined)
597 : //
598 1 : f_thread.stop();
599 :
600 2 : messenger::pointer_t client(f_thread_runner.release_client());
601 1 : if(f_done)
602 : {
603 : // already marked done, ignore the result and lose the
604 : // connection immediately
605 : //
606 : //f_thread_running.close(); -- not necessary, 'client' is the connection
607 0 : return;
608 : }
609 :
610 1 : if(client == nullptr)
611 : {
612 : // TODO: fix address in error message using a addr::addr so
613 : // as to handle IPv6 seamlessly.
614 : //
615 0 : SNAP_LOG_ERROR
616 : << "connection to "
617 0 : << f_thread_runner.get_address().to_uri()
618 : << " failed with: "
619 0 : << f_thread_runner.get_last_error()
620 : << SNAP_LOG_SEND;
621 :
622 : // signal that an error occurred
623 : //
624 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
625 : }
626 : else
627 : {
628 1 : f_messenger = client;
629 :
630 : // add the messenger to the communicator
631 : //
632 1 : communicator::instance()->add_connection(f_messenger);
633 :
634 : // if some messages were cached, process them immediately
635 : //
636 3 : while(!f_message_cache.empty())
637 : {
638 1 : f_messenger->send_message(f_message_cache[0]);
639 1 : f_message_cache.erase(f_message_cache.begin());
640 : }
641 :
642 : // let the client know we are now connected
643 : //
644 1 : f_parent->process_connected();
645 : }
646 : }
647 :
648 : /** \brief Send a message to the connection.
649 : *
650 : * This implementation function actually sends the message to the
651 : * connection, assuming that the connection exists. Otherwise, it
652 : * may cache the message (if cache is true.)
653 : *
654 : * Note that the message does not get cached if mark_done() was
655 : * called earlier since we are trying to close the whole connection.
656 : *
657 : * \param[in] msg The message to send.
658 : * \param[in] cache Whether to cache the message if the connection is
659 : * currently down.
660 : *
661 : * \return true if the message was forwarded, false if the message
662 : * was ignored or cached.
663 : */
664 2 : bool send_message(message & msg, bool cache)
665 : {
666 2 : if(f_messenger != nullptr)
667 : {
668 1 : return f_messenger->send_message(msg);
669 : }
670 :
671 1 : if(cache && !f_done)
672 : {
673 1 : f_message_cache.push_back(msg);
674 : }
675 :
676 1 : return false;
677 : }
678 :
679 :
680 : /** \brief Forget about the messenger connection.
681 : *
682 : * This function is used to fully disconnect from the messenger.
683 : *
684 : * If there is a messenger, this means:
685 : *
686 : * \li Removing the messenger from the communicator instance.
687 : * \li Closing the connection in the thread object.
688 : *
689 : * In most cases, it is called when an error occur, also it happens
690 : * that we call it explicitly through the disconnect() function
691 : * of the permanent connection class.
692 : *
693 : * \note
694 : * This is safe, even though it is called from the messenger itself
695 : * because it will not get deleted yet. This is because the run()
696 : * loop has a copy in its own temporary copy of the vector of
697 : * connections.
698 : */
699 2 : void disconnect()
700 : {
701 2 : if(f_messenger != nullptr)
702 : {
703 1 : communicator::instance()->remove_connection(f_messenger);
704 1 : f_messenger.reset();
705 :
706 : // just the messenger does not close the TCP connection because
707 : // we may have another in the thread runner
708 : //
709 1 : f_thread_runner.close();
710 : }
711 2 : }
712 :
713 :
714 : /** \brief Return the address and size of the remote computer.
715 : *
716 : * This function retrieve the socket address.
717 : *
718 : * \warning
719 : * If the socket is not currently connected, the function returns
720 : * a default Unix address. This means it returns a valid unnamed
721 : * address.
722 : *
723 : * \return The Unix address used to connect.
724 : */
725 0 : addr::unix get_address() const
726 : {
727 0 : if(f_messenger != nullptr)
728 : {
729 0 : return f_messenger->get_address();
730 : }
731 0 : return addr::unix();
732 : }
733 :
734 :
735 : /** \brief Mark the messenger as done.
736 : *
737 : * This function is used to mark the messenger as done. This means it
738 : * will get removed from the communicator instance as soon as it
739 : * is done with its current write buffer if there is one.
740 : *
741 : * You may also want to call the disconnection() function to actually
742 : * reset the pointer along the way.
743 : */
744 1 : void mark_done()
745 : {
746 1 : f_done = true;
747 :
748 : // once done we don't attempt to reconnect so we can as well
749 : // get rid of our existing cache immediately to save some
750 : // memory
751 : //
752 1 : f_message_cache.clear();
753 :
754 1 : if(f_messenger != nullptr)
755 : {
756 1 : f_messenger->mark_done();
757 : }
758 1 : }
759 :
760 :
761 : /** \brief Retrieve the parent of the impl.
762 : *
763 : * This function returns the parent of the impl, which is the main
764 : * local_stream_client_permanent_message_connection point. This is
765 : * saved in the messenger so we can relay events from our internal
766 : * messenger implementation to the main class owned by the user.
767 : *
768 : * \return The pointer to the parent of the impl.
769 : */
770 1 : local_stream_client_permanent_message_connection * parent() const
771 : {
772 1 : return f_parent;
773 : }
774 :
775 :
776 : private:
777 : local_stream_client_permanent_message_connection *
778 : f_parent = nullptr;
779 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
780 : runner f_thread_runner;
781 : cppthread::thread f_thread;
782 : messenger::pointer_t f_messenger = messenger::pointer_t();
783 : message::vector_t f_message_cache = message::vector_t();
784 : bool f_done = false;
785 : };
786 :
787 :
788 :
789 : }
790 : // namespace detail
791 :
792 :
793 :
794 : /** \brief Initializes this TCP client message connection.
795 : *
796 : * This implementation creates what we call a permanent connection.
797 : * Such a connection may fail once in a while. In such circumstances,
798 : * the class automatically requests for a reconnection (see various
799 : * parameters in the regard below.) However, this causes one issue:
800 : * by default, the connection just never ends. When you are about
801 : * ready to close the connection, you must call the mark_done()
802 : * function first. This will tell the various error functions to
803 : * drop this connection instead of restarting it after a small pause.
804 : *
805 : * This constructor makes sure to initialize the timer and saves
806 : * the address, port, mode, pause, and use_thread parameters.
807 : *
808 : * The timer is first set to trigger immediately. This means the TCP
809 : * connection will be attempted as soon as possible (the next time
810 : * the run() loop is entered, it will time out immediately.) You
811 : * are free to call set_timeout_date() with a date in the future if
812 : * you prefer that the connect be attempted a little later.
813 : *
814 : * The \p pause parameter is used if the connection is lost and this
815 : * timer is used again to attempt a new connection. It will be reused
816 : * as long as the connection fails (as a delay). It has to be at least
817 : * 10 microseconds, although really you should not use less than 1
818 : * second (1000000). You may set the pause parameter to 0 in which case
819 : * you are responsible to set the delay (by default there will be no
820 : * delay and thus the timer will never time out.)
821 : *
822 : * To start with a delay, instead of trying to connect immediately,
823 : * you may pass a negative pause parameter. So for example to get the
824 : * first attempt 5 seconds after you created this object, you use
825 : * -5000000LL as the pause parameter.
826 : *
827 : * The \p use_thread parameter determines whether the connection should
828 : * be attempted in a thread (asynchronously) or immediately (which means
829 : * the timeout callback may block for a while.) If the connection is to
830 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
831 : * the thread is probably not required. For connections to a remote
832 : * computer, though, it certainly is important.
833 : *
834 : * \param[in] address The address to listen on. It may be set to "0.0.0.0".
835 : * \param[in] pause The amount of time to wait before attempting a new
836 : * connection after a failure, in microseconds, or 0.
837 : * \param[in] use_thread Whether a thread is used to connect to the
838 : * server.
839 : * \param[in] blocking Whether the socket is going to be blocking or not.
840 : * \param[in] close_on_exec Automatically close the connection if the process
841 : * execute an exec() call.
842 : */
843 1 : local_stream_client_permanent_message_connection::local_stream_client_permanent_message_connection(
844 : addr::unix const & address
845 : , std::int64_t const pause
846 : , bool const use_thread
847 : , bool const blocking
848 1 : , bool const close_on_exec)
849 : : timer(pause < 0 ? -pause : 0)
850 : , f_impl(std::make_shared<detail::local_stream_client_permanent_message_connection_impl>(
851 : this
852 : , address
853 : , blocking
854 : , close_on_exec))
855 1 : , f_pause(llabs(pause))
856 2 : , f_use_thread(use_thread)
857 : {
858 1 : }
859 :
860 :
861 : /** \brief Destroy instance.
862 : *
863 : * This function cleans up everything in the permanent message object.
864 : */
865 1 : local_stream_client_permanent_message_connection::~local_stream_client_permanent_message_connection()
866 : {
867 : // Does nothing
868 1 : }
869 :
870 :
871 : /** \brief Attempt to send a message to this connection.
872 : *
873 : * If the connection is currently enabled, the message is sent immediately.
874 : * Otherwise, it may be cached if the \p cache parameter is set to true.
875 : * A cached message is forwarded as soon as a new successful connection
876 : * happens, which can be a problem if messages need to happen in a very
877 : * specific order (For example, after a reconnection to snapcommunicator
878 : * you first need to REGISTER or CONNECT...)
879 : *
880 : * \param[in] msg The message to send to the connected server.
881 : * \param[in] cache Whether the message should be cached.
882 : *
883 : * \return true if the message was sent, false if it was not sent, although
884 : * if cache was true, it was cached
885 : */
886 2 : bool local_stream_client_permanent_message_connection::send_message(message & msg, bool cache)
887 : {
888 2 : return f_impl->send_message(msg, cache);
889 : }
890 :
891 :
892 : /** \brief Check whether the connection is up.
893 : *
894 : * This function returns true if the connection is considered to be up.
895 : * This means sending messages will work quickly instead of being
896 : * cached up until an actual TCP/IP connection gets established.
897 : *
898 : * Note that the connection may have hanged up since, and the system
899 : * may not have yet detected the fact (i.e. the connection is going
900 : * to receive the process_hup() call after the event in which you are
901 : * working.)
902 : *
903 : * \return true if connected
904 : */
905 0 : bool local_stream_client_permanent_message_connection::is_connected() const
906 : {
907 0 : return f_impl->is_connected();
908 : }
909 :
910 :
911 : /** \brief Disconnect the messenger now.
912 : *
913 : * This function kills the current connection.
914 : *
915 : * There are a few cases where two daemons communicate between each others
916 : * and at some point one of them wants to exit and needs to disconnect. This
917 : * function can be used in that one situation assuming that you have an
918 : * acknowledgement from the other daemon.
919 : *
920 : * Say you have daemon A and B. B wants to quit and before doing so sends
921 : * a form of "I'm quitting" message to A. In that situation, B is not closing
922 : * the messenger connection, A is responsible for that (i.e. A acknowledges
923 : * receipt of the "I'm quitting" message from B by closing the connection.)
924 : *
925 : * B also wants to call the mark_done() function to make sure that it
926 : * does not reconnected a split second later and instead the permanent
927 : * connection gets removed from the communicator list of connections.
928 : */
929 0 : void local_stream_client_permanent_message_connection::disconnect()
930 : {
931 0 : f_impl->disconnect();
932 0 : }
933 :
934 :
935 : /** \brief Overload so we do not have to use namespace everywhere.
936 : *
937 : * This function overloads the connection::mark_done() function so
938 : * we can call it without the need to use timer::mark_done()
939 : * everywhere.
940 : */
941 0 : void local_stream_client_permanent_message_connection::mark_done()
942 : {
943 0 : timer::mark_done();
944 0 : }
945 :
946 :
947 : /** \brief Mark connection as done.
948 : *
949 : * This function allows you to mark the permanent connection and the
950 : * messenger as done.
951 : *
952 : * Note that calling this function with false is the same as calling the
953 : * base class mark_done() function.
954 : *
955 : * If the \p message parameter is set to true, we suggest you also call
956 : * the disconnect() function. That way the messenger will truly get
957 : * removed from everyone quickly.
958 : *
959 : * \param[in] messenger If true, also mark the messenger as done.
960 : */
961 1 : void local_stream_client_permanent_message_connection::mark_done(bool messenger)
962 : {
963 1 : timer::mark_done();
964 1 : if(messenger)
965 : {
966 1 : f_impl->mark_done();
967 : }
968 1 : }
969 :
970 :
971 : /** \brief Retrieve a copy of the client's address.
972 : *
973 : * This function makes a copy of the address of this client connection
974 : * to the \p address parameter and returns the length.
975 : *
976 : * \return Return a copy of the Unix address.
977 : */
978 0 : addr::unix local_stream_client_permanent_message_connection::get_address() const
979 : {
980 0 : return f_impl->get_address();
981 : }
982 :
983 :
984 : /** \brief Internal timeout callback implementation.
985 : *
986 : * This callback implements the guts of this class: it attempts to connect
987 : * to the specified address and port, optionally after creating a thread
988 : * so the attempt can happen asynchronously.
989 : *
990 : * When the connection fails, the timer is used to try again pause
991 : * microseconds later (pause as specified in the constructor).
992 : *
993 : * When a connection succeeds, the timer is disabled until you detect
994 : * an error while using the connection and re-enable the timer.
995 : *
996 : * \warning
997 : * This function changes the timeout delay to the pause amount
998 : * as defined with the constructor. If you want to change that
999 : * amount, you can do so an any point after this function call
1000 : * using the set_timeout_delay() function. If the pause parameter
1001 : * was set to -1, then the timeout never gets changed.
1002 : * However, you should not use a permanent message timer as your
1003 : * own or you will interfere with the internal use of the timer.
1004 : */
1005 1 : void local_stream_client_permanent_message_connection::process_timeout()
1006 : {
1007 : // got a spurious call when already marked done
1008 : //
1009 1 : if(is_done())
1010 : {
1011 0 : return;
1012 : }
1013 :
1014 : // change the timeout delay although we will not use it immediately
1015 : // if we start the thread or attempt an immediate connection, but
1016 : // that way the user can change it by calling set_timeout_delay()
1017 : // at any time after the first process_timeout() call
1018 : //
1019 1 : if(f_pause > 0)
1020 : {
1021 1 : set_timeout_delay(f_pause);
1022 1 : f_pause = 0;
1023 : }
1024 :
1025 1 : if(f_use_thread)
1026 : {
1027 : // in this case we create a thread, run it and know whether the
1028 : // connection succeeded only when the thread tells us it did
1029 : //
1030 : // TODO: the background_connect() may return false in two situations:
1031 : // 1) when the thread is already running and then the behavior
1032 : // we have below is INCORRECT
1033 : // 2) when the thread cannot be started (i.e. could not
1034 : // allocate the stack?) in which case the if() below
1035 : // is the correct behavior
1036 : //
1037 1 : if(f_impl->background_connect())
1038 : {
1039 : // we started the thread successfully, so block the timer
1040 : //
1041 1 : set_enable(false);
1042 : }
1043 : }
1044 : else
1045 : {
1046 : // the success is noted when we receive a call to
1047 : // process_connected(); there we do set_enable(false)
1048 : // so the timer stops
1049 : //
1050 0 : f_impl->connect();
1051 : }
1052 : }
1053 :
1054 :
1055 : /** \brief Process an error.
1056 : *
1057 : * When an error occurs, we restart the timer so we can attempt to reconnect
1058 : * to that server.
1059 : *
1060 : * If you overload this function, make sure to either call this
1061 : * implementation or enable the timer yourselves.
1062 : *
1063 : * \warning
1064 : * This function does not call the timer::process_error() function
1065 : * which means that this connection is not automatically removed from
1066 : * the communicator object on failures.
1067 : */
1068 0 : void local_stream_client_permanent_message_connection::process_error()
1069 : {
1070 0 : if(is_done())
1071 : {
1072 0 : timer::process_error();
1073 : }
1074 : else
1075 : {
1076 0 : f_impl->disconnect();
1077 0 : set_enable(true);
1078 : }
1079 0 : }
1080 :
1081 :
1082 : /** \brief Process a hang up.
1083 : *
1084 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1085 : * to that server.
1086 : *
1087 : * If you overload this function, make sure to either call this
1088 : * implementation or enable the timer yourselves.
1089 : *
1090 : * \warning
1091 : * This function does not call the timer::process_hup() function
1092 : * which means that this connection is not automatically removed from
1093 : * the communicator object on failures.
1094 : */
1095 0 : void local_stream_client_permanent_message_connection::process_hup()
1096 : {
1097 0 : if(is_done())
1098 : {
1099 0 : timer::process_hup();
1100 : }
1101 : else
1102 : {
1103 0 : f_impl->disconnect();
1104 0 : set_enable(true);
1105 : }
1106 0 : }
1107 :
1108 :
1109 : /** \brief Process an invalid signal.
1110 : *
1111 : * When an invalid signal occurs, we restart the timer so we can attempt
1112 : * to reconnect to that server.
1113 : *
1114 : * If you overload this function, make sure to either call this
1115 : * implementation or enable the timer yourselves.
1116 : *
1117 : * \warning
1118 : * This function does not call the timer::process_invalid() function
1119 : * which means that this connection is not automatically removed from
1120 : * the communicator object on failures.
1121 : */
1122 0 : void local_stream_client_permanent_message_connection::process_invalid()
1123 : {
1124 0 : if(is_done())
1125 : {
1126 0 : timer::process_invalid();
1127 : }
1128 : else
1129 : {
1130 0 : f_impl->disconnect();
1131 0 : set_enable(true);
1132 : }
1133 0 : }
1134 :
1135 :
1136 : /** \brief Make sure that the messenger connection gets removed.
1137 : *
1138 : * This function makes sure that the messenger sub-connection also gets
1139 : * removed from the communicator. Otherwise it would lock the system
1140 : * since connections are saved in the communicator object as shared
1141 : * pointers.
1142 : */
1143 1 : void local_stream_client_permanent_message_connection::connection_removed()
1144 : {
1145 1 : f_impl->disconnect();
1146 1 : }
1147 :
1148 :
1149 : /** \brief Process a connection failed callback.
1150 : *
1151 : * When a connection attempt fails, we restart the timer so we can
1152 : * attempt to reconnect to that server.
1153 : *
1154 : * If you overload this function, make sure to either call this
1155 : * implementation or enable the timer yourselves.
1156 : *
1157 : * \param[in] error_message The error message that triggered this callback.
1158 : */
1159 0 : void local_stream_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1160 : {
1161 0 : snapdev::NOT_USED(error_message);
1162 0 : set_enable(true);
1163 0 : }
1164 :
1165 :
1166 : /** \brief The connection is ready.
1167 : *
1168 : * This callback gets called whenever the connection succeeded and is
1169 : * ready to be used.
1170 : *
1171 : * You should implement this virtual function if you have to initiate
1172 : * the communication. For example, the snapserver has to send a
1173 : * REGISTER to the snapcommunicator system and thus implements this
1174 : * function.
1175 : *
1176 : * The default implementation makes sure that the timer gets turned off
1177 : * so we do not try to reconnect every minute or so. Make sure you call
1178 : * the default function so you get the proper behavior.
1179 : */
1180 1 : void local_stream_client_permanent_message_connection::process_connected()
1181 : {
1182 1 : set_enable(false);
1183 1 : }
1184 :
1185 :
1186 :
1187 6 : } // namespace ed
1188 : // vim: ts=4 sw=4 et
|