Line data Source code
1 : // Copyright (c) 2012-2022 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the permanent TCP connection.
22 : *
23 : * This class is an extension of the TCP client message connection used
24 : * to handle a TCP connection which supports messages and will automatically
25 : * try to reconnect if the connection drops.
26 : *
27 : * The class will also try to connect with the next address if more than one
28 : * is available.
29 : */
30 :
31 :
32 : // self
33 : //
34 : #include "eventdispatcher/tcp_client_permanent_message_connection.h"
35 :
36 : #include "eventdispatcher/communicator.h"
37 : #include "eventdispatcher/exception.h"
38 : #include "eventdispatcher/tcp_server_client_message_connection.h"
39 : #include "eventdispatcher/thread_done_signal.h"
40 :
41 :
42 : // snaplogger lib
43 : //
44 : #include <snaplogger/message.h>
45 :
46 :
47 : // snapdev lib
48 : //
49 : #include <snapdev/not_used.h>
50 :
51 :
52 : // cppthread lib
53 : //
54 : #include <cppthread/exception.h>
55 : #include <cppthread/guard.h>
56 : #include <cppthread/runner.h>
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // C++ lib
61 : //
62 : #include <cstring>
63 :
64 :
65 : // C lib
66 : //
67 : #include <sys/socket.h>
68 :
69 :
70 : // last include
71 : //
72 : #include <snapdev/poison.h>
73 :
74 :
75 :
76 : namespace ed
77 : {
78 :
79 :
80 :
81 : namespace detail
82 : {
83 :
84 :
85 : /** \brief Internal implementation of the tcp_client_permanent_message_connection class.
86 : *
87 : * This class is used to handle a thread that will process a connection for
88 : * us. This allows us to connect in any amount of time required by the
89 : * Unix system to obtain the connection with the remote server.
90 : *
91 : * \todo
92 : * Having threads at the time we do a fork() is not safe. We may
93 : * want to reconsider offering this functionality here. Because at
94 : * this time we would have no control of when the thread is created
95 : * and thus a way to make sure that no such thread is running when
96 : * we call fork().
97 : */
98 : class tcp_client_permanent_message_connection_impl
99 : {
100 : public:
101 0 : class messenger
102 : : public tcp_server_client_message_connection
103 : {
104 : public:
105 : typedef std::shared_ptr<messenger> pointer_t;
106 :
107 0 : messenger(tcp_client_permanent_message_connection * parent, tcp_bio_client::pointer_t client)
108 0 : : tcp_server_client_message_connection(client)
109 0 : , f_parent(parent)
110 : {
111 0 : set_name("tcp_client_permanent_message_connection_impl::messenger");
112 0 : }
113 :
114 : messenger(messenger const & rhs) = delete;
115 : messenger & operator = (messenger const & rhs) = delete;
116 :
117 : // connection implementation
118 0 : virtual void process_empty_buffer()
119 : {
120 0 : tcp_server_client_message_connection::process_empty_buffer();
121 0 : f_parent->process_empty_buffer();
122 0 : }
123 :
124 : // connection implementation
125 0 : virtual void process_error()
126 : {
127 0 : tcp_server_client_message_connection::process_error();
128 0 : f_parent->process_error();
129 0 : }
130 :
131 : // connection implementation
132 0 : virtual void process_hup()
133 : {
134 0 : tcp_server_client_message_connection::process_hup();
135 0 : f_parent->process_hup();
136 0 : }
137 :
138 : // connection implementation
139 0 : virtual void process_invalid()
140 : {
141 0 : tcp_server_client_message_connection::process_invalid();
142 0 : f_parent->process_invalid();
143 0 : }
144 :
145 : // tcp_server_client_message_connection implementation
146 0 : virtual void process_message(message const & msg)
147 : {
148 : // We call the dispatcher from our parent since the child
149 : // (this messenger) is not given a dispatcher
150 : //
151 0 : message copy(msg);
152 0 : f_parent->dispatch_message(copy);
153 0 : }
154 :
155 : private:
156 : tcp_client_permanent_message_connection * f_parent = nullptr;
157 : };
158 :
159 0 : class thread_signal_handler
160 : : public thread_done_signal
161 : {
162 : public:
163 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
164 :
165 0 : thread_signal_handler(tcp_client_permanent_message_connection_impl * parent_impl)
166 0 : : f_parent_impl(parent_impl)
167 : {
168 0 : set_name("tcp_client_permanent_message_connection_impl::thread_signal_handler");
169 0 : }
170 :
171 : thread_signal_handler(thread_signal_handler const &) = delete;
172 : thread_signal_handler & operator = (thread_signal_handler const &) = delete;
173 :
174 : /** \brief This signal was emitted.
175 : *
176 : * This function gets called whenever the thread is just about to
177 : * quit. Calling f_thread.is_running() may still return true when
178 : * you get in the 'thread_done()' callback. However, an
179 : * f_thread.stop() will return very quickly.
180 : */
181 0 : virtual void process_read()
182 : {
183 0 : thread_done_signal::process_read();
184 :
185 0 : f_parent_impl->thread_done();
186 0 : }
187 :
188 : private:
189 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
190 : };
191 :
192 0 : class runner
193 : : public cppthread::runner
194 : {
195 : public:
196 0 : runner(
197 : tcp_client_permanent_message_connection_impl * parent_impl
198 : , std::string const & address
199 : , int port
200 : , tcp_bio_client::mode_t mode)
201 0 : : cppthread::runner("background tcp_client_permanent_message_connection for asynchronous connections")
202 : , f_parent_impl(parent_impl)
203 : , f_address(address)
204 : , f_port(port)
205 0 : , f_mode(mode)
206 : {
207 0 : }
208 :
209 : runner(runner const &) = delete;
210 : runner & operator = (runner const &) = delete;
211 :
212 :
213 : /** \brief This is the actual function run by the thread.
214 : *
215 : * This function calls the connect() function and then
216 : * tells the main thread we are done.
217 : */
218 0 : virtual void run()
219 : {
220 0 : connect();
221 :
222 : // tell the main thread that we are done
223 : //
224 0 : f_parent_impl->trigger_thread_done();
225 0 : }
226 :
227 :
228 : /** \brief This function attempts to connect.
229 : *
230 : * This function attempts a connection to the specified address
231 : * and port with the specified mode (i.e. plain or encrypted.)
232 : *
233 : * The function may take a long time to succeed connecting with
234 : * the server. The main thread will be awaken whenever this
235 : * thread dies.
236 : *
237 : * If an error occurs, then the f_socket variable member will
238 : * be set to -1. Otherwise it represents the socket that we
239 : * just connected with.
240 : */
241 0 : void connect()
242 : {
243 0 : char const * error_name(nullptr);
244 : try
245 : {
246 : // create a socket using the bio_client class,
247 : // but then just create a duplicate that we will
248 : // use in a server-client TCP object (because
249 : // we cannot directly create the right type of
250 : // object otherwise...)
251 : //
252 0 : f_tcp_connection = std::make_shared<tcp_bio_client>(f_address, f_port, f_mode);
253 0 : return;
254 : }
255 0 : catch(event_dispatcher_initialization_error const & e)
256 : {
257 0 : error_name = "event_dispatcher_initialization_error";
258 0 : f_last_error = e.what();
259 : }
260 0 : catch(event_dispatcher_runtime_error const & e)
261 : {
262 0 : error_name = "event_dispatcher_runtime_error";
263 0 : f_last_error = e.what();
264 : }
265 0 : catch(std::exception const & e)
266 : {
267 0 : error_name = "std::exception";
268 0 : f_last_error = e.what();
269 : }
270 0 : catch(...)
271 : {
272 0 : error_name = "... (any other exception)";
273 0 : f_last_error = "Unknown exception";
274 : }
275 0 : f_tcp_connection.reset();
276 :
277 : // connection failed... we will have to try again later
278 : //
279 : // WARNING: our logger is not multi-thread safe quiet yet
280 : //SNAP_LOG_ERROR
281 : // << "connection to "
282 : // << f_address
283 : // << ":"
284 : // << f_port
285 : // << " failed with: "
286 : // << f_last_error
287 : // << " ("
288 : // << error_name
289 : // << ")"
290 : // << SNAP_LOG_SEND;
291 : }
292 :
293 :
294 : /** \brief Retrieve the address to connect to.
295 : *
296 : * This function returns the address passed in on creation.
297 : *
298 : * \note
299 : * Since the variable is constant, it is likely to never change.
300 : * However, the c_str() function may change the buffer pointer.
301 : * Hence, to be 100% safe, you cannot call this function until
302 : * you make sure that the thread is fully stopped.
303 : *
304 : * \return The destination address.
305 : */
306 0 : std::string const & get_address() const
307 : {
308 0 : return f_address;
309 : }
310 :
311 :
312 : /** \brief Retrieve the port to connect to.
313 : *
314 : * This function returns the port passed in on creation.
315 : *
316 : * \note
317 : * Since the variable is constant, it never gets changed
318 : * which means it is always safe to use it between
319 : * both threads.
320 : *
321 : * \return The port to connect to.
322 : */
323 0 : int get_port() const
324 : {
325 0 : return f_port;
326 : }
327 :
328 :
329 : /** \brief Retrieve the client allocated and connected by the thread.
330 : *
331 : * This function returns the TCP connection object resulting from
332 : * connection attempts of the background thread.
333 : *
334 : * If the pointer is null, then you may get the corresponding
335 : * error message using the get_last_error() function.
336 : *
337 : * You can get the client TCP connection pointer once. After that
338 : * you always get a null pointer.
339 : *
340 : * \note
341 : * This function is guarded so the pointer and the object it
342 : * points to will be valid in another thread that retrieves it.
343 : *
344 : * \return The connection pointer.
345 : */
346 0 : tcp_bio_client::pointer_t release_client()
347 : {
348 0 : cppthread::guard g(f_mutex);
349 0 : tcp_bio_client::pointer_t tcp_connection;
350 0 : tcp_connection.swap(f_tcp_connection);
351 0 : return tcp_connection;
352 : }
353 :
354 :
355 : /** \brief Retrieve the last error message that happened.
356 : *
357 : * This function returns the last error message that was captured
358 : * when trying to connect to the socket. The message is the
359 : * e.what() message from the exception we captured.
360 : *
361 : * The message does not get cleared so the function can be called
362 : * any number of times. To know whether an error was generated
363 : * on the last attempt, make sure to first get the get_socket()
364 : * and if it returns -1, then this message is significant,
365 : * otherwise it is from a previous error.
366 : *
367 : * \warning
368 : * Remember that if the background thread was used the error will
369 : * NOT be available in the main thread until a full memory barrier
370 : * was executed. For that reason we make sure that the thread
371 : * was stopped when we detect an error.
372 : *
373 : * \return The last error message.
374 : */
375 0 : std::string const & get_last_error() const
376 : {
377 0 : return f_last_error;
378 : }
379 :
380 :
381 : /** \brief Close the connection.
382 : *
383 : * This function closes the connection. Since the f_tcp_connection
384 : * holds the socket to the remote server, we have get this function
385 : * called in order to completely disconnect.
386 : *
387 : * \note
388 : * This function does not clear the f_last_error parameter so it
389 : * can be read later.
390 : */
391 0 : void close()
392 : {
393 0 : f_tcp_connection.reset();
394 0 : }
395 :
396 :
397 : private:
398 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
399 : std::string const f_address;
400 : int const f_port;
401 : tcp_bio_client::mode_t const f_mode;
402 : tcp_bio_client::pointer_t f_tcp_connection = tcp_bio_client::pointer_t();
403 : std::string f_last_error = std::string();
404 : };
405 :
406 :
407 : /** \brief Initialize a permanent message connection implementation object.
408 : *
409 : * This object manages the thread used to asynchronically connect to
410 : * the specified address and port.
411 : *
412 : * This class and its sub-classes may end up executing callbacks
413 : * of the tcp_client_permanent_message_connection object.
414 : * However, in all cases these are never run from the thread.
415 : *
416 : * \param[in] parent A pointer to the owner of this
417 : * tcp_client_permanent_message_connection_impl object.
418 : * \param[in] address The address we are to connect to.
419 : * \param[in] port The port we are to connect to.
420 : * \param[in] mode The mode used to connect.
421 : */
422 0 : tcp_client_permanent_message_connection_impl(
423 : tcp_client_permanent_message_connection * parent
424 : , std::string const & address
425 : , int port
426 : , tcp_bio_client::mode_t mode)
427 0 : : f_parent(parent)
428 : , f_thread_runner(this, address, port, mode)
429 0 : , f_thread("background connection handler thread", &f_thread_runner)
430 : {
431 0 : }
432 :
433 :
434 : tcp_client_permanent_message_connection_impl(tcp_client_permanent_message_connection_impl const & rhs) = delete;
435 : tcp_client_permanent_message_connection_impl & operator = (tcp_client_permanent_message_connection_impl const & rhs) = delete;
436 :
437 : /** \brief Destroy the permanent message connection.
438 : *
439 : * This function makes sure that the messenger was lost.
440 : */
441 0 : ~tcp_client_permanent_message_connection_impl()
442 0 : {
443 : // to make sure we can lose the messenger, first we want to be sure
444 : // that we do not have a thread running
445 : //
446 : try
447 : {
448 0 : f_thread.stop();
449 : }
450 0 : catch(cppthread::cppthread_mutex_failed_error const &)
451 : {
452 : }
453 0 : catch(cppthread::cppthread_invalid_error const &)
454 : {
455 : }
456 :
457 : // in this case we may still have an instance of the f_thread_done
458 : // which linger around, we want it out
459 : //
460 : // Note: the call is safe even if the f_thread_done is null
461 : //
462 0 : communicator::instance()->remove_connection(f_thread_done);
463 :
464 : // although the f_messenger variable gets reset automatically in
465 : // the destructor, it would not get removed from the
466 : // communicator instance if we were not doing it explicitly
467 : //
468 0 : disconnect();
469 0 : }
470 :
471 :
472 : /** \brief Direct connect to the messenger.
473 : *
474 : * In this case we try to connect without the thread. This allows
475 : * us to avoid the thread problems, but we are blocked until the
476 : * OS decides to time out or the connection worked.
477 : */
478 0 : void connect()
479 : {
480 0 : if(f_done)
481 : {
482 0 : SNAP_LOG_ERROR
483 : << "Permanent connection marked done. Cannot attempt to reconnect."
484 : << SNAP_LOG_SEND;
485 0 : return;
486 : }
487 :
488 : // call the thread connect() function from the main thread
489 : //
490 0 : f_thread_runner.connect();
491 :
492 : // simulate receiving the thread_done() signal
493 : //
494 0 : thread_done();
495 : }
496 :
497 :
498 : /** \brief Check whether the permanent connection is currently connected.
499 : *
500 : * This function returns true if the messenger exists, which means that
501 : * the connection is up.
502 : *
503 : * \return true if the connection is up.
504 : */
505 0 : bool is_connected()
506 : {
507 0 : return f_messenger != nullptr;
508 : }
509 :
510 :
511 : /** \brief Try to start the thread runner.
512 : *
513 : * This function tries to start the thread runner in order to initiate
514 : * a connection in the background. If the thread could not be started,
515 : * then the function returns false.
516 : *
517 : * If the thread started, then the function returns true. This does
518 : * not mean that the connection was obtained. This is known once
519 : * the process_connected() function is called.
520 : *
521 : * \return true if the thread was successfully started.
522 : */
523 0 : bool background_connect()
524 : {
525 0 : if(f_done)
526 : {
527 0 : SNAP_LOG_ERROR
528 : << "Permanent connection marked done. Cannot attempt to reconnect."
529 : << SNAP_LOG_SEND;
530 0 : return false;
531 : }
532 :
533 0 : if(f_thread.is_running())
534 : {
535 0 : SNAP_LOG_ERROR
536 : << "A background connection attempt is already in progress. Further requests are ignored."
537 : << SNAP_LOG_SEND;
538 0 : return false;
539 : }
540 :
541 : // create the f_thread_done only when required
542 : //
543 0 : if(f_thread_done == nullptr)
544 : {
545 0 : f_thread_done = std::make_shared<thread_signal_handler>(this);
546 : }
547 :
548 0 : communicator::instance()->add_connection(f_thread_done);
549 :
550 0 : if(!f_thread.start())
551 : {
552 0 : SNAP_LOG_ERROR
553 : << "The thread used to run the background connection process did not start."
554 : << SNAP_LOG_SEND;
555 0 : return false;
556 : }
557 :
558 0 : return true;
559 : }
560 :
561 :
562 : /** \brief Tell the main thread that the background thread is done.
563 : *
564 : * This function is called by the thread so the thread_done()
565 : * function of the thread done object gets called. Only the
566 : * thread should call this function.
567 : *
568 : * As a result the thread_done() function of this class will be
569 : * called by the main thread.
570 : */
571 0 : void trigger_thread_done()
572 : {
573 0 : f_thread_done->thread_done();
574 0 : }
575 :
576 :
577 : /** \brief Signal that the background thread is done.
578 : *
579 : * This callback is called whenever the background thread sends
580 : * a signal to us. This is used to avoid calling end user functions
581 : * that would certainly cause a lot of problem if called from the
582 : * thread.
583 : *
584 : * The function calls the process_connection_failed() if the
585 : * connection did not happen.
586 : *
587 : * The function calls the process_connected() if the connection
588 : * did happen.
589 : *
590 : * \note
591 : * This is used only if the user requested that the connection
592 : * happen in the background (i.e. use_thread was set to true
593 : * in the tcp_client_permanent_message_connection object
594 : * constructor.)
595 : */
596 0 : void thread_done()
597 : {
598 : // if we used the thread we have to remove the signal used
599 : // to know that the thread was done
600 : //
601 0 : communicator::instance()->remove_connection(f_thread_done);
602 :
603 : // we will access the f_last_error member of the thread runner
604 : // which may not be available to the main thread yet, calling
605 : // stop forces a memory barrier so we are all good.
606 : //
607 : // calling stop() has no effect if we did not use the thread,
608 : // however, not calling stop() when we did use the thread
609 : // causes all sorts of other problems (especially, the thread
610 : // never gets joined)
611 : //
612 0 : f_thread.stop();
613 :
614 0 : tcp_bio_client::pointer_t client(f_thread_runner.release_client());
615 0 : if(f_done)
616 : {
617 : // already marked done, ignore the result and lose the
618 : // connection immediately
619 : //
620 : //f_thread_running.close(); -- not necessary, 'client' is the connection
621 0 : return;
622 : }
623 :
624 0 : if(client == nullptr)
625 : {
626 : // TODO: fix address in error message using a addr::addr so
627 : // as to handle IPv6 seamlessly.
628 : //
629 0 : SNAP_LOG_ERROR
630 : << "connection to "
631 0 : << f_thread_runner.get_address()
632 0 : << ":"
633 : << f_thread_runner.get_port()
634 : << " failed with: "
635 0 : << f_thread_runner.get_last_error()
636 : << SNAP_LOG_SEND;
637 :
638 : // signal that an error occurred
639 : //
640 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
641 : }
642 : else
643 : {
644 0 : f_messenger = std::make_shared<messenger>(f_parent, client);
645 :
646 : // add the messenger to the communicator
647 : //
648 0 : communicator::instance()->add_connection(f_messenger);
649 :
650 : // if some messages were cached, process them immediately
651 : //
652 0 : while(!f_message_cache.empty())
653 : {
654 0 : f_messenger->send_message(f_message_cache[0]);
655 0 : f_message_cache.erase(f_message_cache.begin());
656 : }
657 :
658 : // let the client know we are now connected
659 : //
660 0 : f_parent->process_connected();
661 : }
662 : }
663 :
664 : /** \brief Send a message to the connection.
665 : *
666 : * This implementation function actually sends the message to the
667 : * connection, assuming that the connection exists. Otherwise, it
668 : * may cache the message (if cache is true.)
669 : *
670 : * Note that the message does not get cached if mark_done() was
671 : * called earlier since we are trying to close the whole connection.
672 : *
673 : * \param[in] msg The message to send.
674 : * \param[in] cache Whether to cache the message if the connection is
675 : * currently down.
676 : *
677 : * \return true if the message was forwarded, false if the message
678 : * was ignored or cached.
679 : */
680 0 : bool send_message(message const & msg, bool cache)
681 : {
682 0 : if(f_messenger != nullptr)
683 : {
684 0 : return f_messenger->send_message(msg);
685 : }
686 :
687 0 : if(cache && !f_done)
688 : {
689 0 : f_message_cache.push_back(msg);
690 : }
691 :
692 0 : return false;
693 : }
694 :
695 :
696 : /** \brief Forget about the messenger connection.
697 : *
698 : * This function is used to fully disconnect from the messenger.
699 : *
700 : * If there is a messenger, this means:
701 : *
702 : * \li Removing the messenger from the communicator instance.
703 : * \li Closing the connection in the thread object.
704 : *
705 : * In most cases, it is called when an error occur, also it happens
706 : * that we call it explicitly through the disconnect() function
707 : * of the permanent connection class.
708 : *
709 : * \note
710 : * This is safe, even though it is called from the messenger itself
711 : * because it will not get deleted yet. This is because the run()
712 : * loop has a copy in its own temporary copy of the vector of
713 : * connections.
714 : */
715 0 : void disconnect()
716 : {
717 0 : if(f_messenger != nullptr)
718 : {
719 0 : communicator::instance()->remove_connection(f_messenger);
720 0 : f_messenger.reset();
721 :
722 : // just the messenger does not close the TCP connection because
723 : // we may have another in the thread runner
724 : //
725 0 : f_thread_runner.close();
726 : }
727 0 : }
728 :
729 :
730 : /** \brief Return the address and size of the remote computer.
731 : *
732 : * This function retrieve the socket address.
733 : *
734 : * \param[out] address The binary address of the remote computer.
735 : *
736 : * \return The size of the sockaddr structure, 0 if no address is available.
737 : */
738 0 : size_t get_client_address(sockaddr_storage & address) const
739 : {
740 0 : if(f_messenger != nullptr)
741 : {
742 0 : return f_messenger->get_client_address(address);
743 : }
744 0 : memset(&address, 0, sizeof(address));
745 0 : return 0;
746 : }
747 :
748 :
749 : /** \brief Return the address of the f_message object.
750 : *
751 : * This function returns the address of the message object.
752 : *
753 : * \return The address of the remote computer.
754 : */
755 0 : std::string get_client_addr() const
756 : {
757 0 : if(f_messenger != nullptr)
758 : {
759 0 : return f_messenger->get_client_addr();
760 : }
761 0 : return std::string();
762 : }
763 :
764 :
765 : /** \brief Mark the messenger as done.
766 : *
767 : * This function is used to mark the messenger as done. This means it
768 : * will get removed from the communicator instance as soon as it
769 : * is done with its current write buffer if there is one.
770 : *
771 : * You may also want to call the disconnection() function to actually
772 : * reset the pointer along the way.
773 : */
774 0 : void mark_done()
775 : {
776 0 : f_done = true;
777 :
778 : // once done we don't attempt to reconnect so we can as well
779 : // get rid of our existing cache immediately to save some
780 : // memory
781 : //
782 0 : f_message_cache.clear();
783 :
784 0 : if(f_messenger != nullptr)
785 : {
786 0 : f_messenger->mark_done();
787 : }
788 0 : }
789 :
790 :
791 : private:
792 : tcp_client_permanent_message_connection * f_parent = nullptr;
793 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
794 : runner f_thread_runner;
795 : cppthread::thread f_thread;
796 : messenger::pointer_t f_messenger = messenger::pointer_t();
797 : message::vector_t f_message_cache = message::vector_t();
798 : bool f_done = false;
799 : };
800 :
801 :
802 :
803 : }
804 : // namespace detail
805 :
806 :
807 :
808 : /** \brief Initializes this TCP client message connection.
809 : *
810 : * This implementation creates what we call a permanent connection.
811 : * Such a connection may fail once in a while. In such circumstances,
812 : * the class automatically requests for a reconnection (see various
813 : * parameters in the regard below.) However, this causes one issue:
814 : * by default, the connection just never ends. When you are about
815 : * ready to close the connection, you must call the mark_done()
816 : * function first. This will tell the various error functions to
817 : * drop this connection instead of restarting it after a small pause.
818 : *
819 : * This constructor makes sure to initialize the timer and saves
820 : * the address, port, mode, pause, and use_thread parameters.
821 : *
822 : * The timer is first set to trigger immediately. This means the TCP
823 : * connection will be attempted as soon as possible (the next time
824 : * the run() loop is entered, it will time out immediately.) You
825 : * are free to call set_timeout_date() with a date in the future if
826 : * you prefer that the connect be attempted a little later.
827 : *
828 : * The \p pause parameter is used if the connection is lost and this
829 : * timer is used again to attempt a new connection. It will be reused
830 : * as long as the connection fails (as a delay). It has to be at least
831 : * 10 microseconds, although really you should not use less than 1
832 : * second (1000000). You may set the pause parameter to 0 in which case
833 : * you are responsible to set the delay (by default there will be no
834 : * delay and thus the timer will never time out.)
835 : *
836 : * To start with a delay, instead of trying to connect immediately,
837 : * you may pass a negative pause parameter. So for example to get the
838 : * first attempt 5 seconds after you created this object, you use
839 : * -5000000LL as the pause parameter.
840 : *
841 : * The \p use_thread parameter determines whether the connection should
842 : * be attempted in a thread (asynchronously) or immediately (which means
843 : * the timeout callback may block for a while.) If the connection is to
844 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
845 : * the thread is probably not required. For connections to a remote
846 : * computer, though, it certainly is important.
847 : *
848 : * \param[in] address The address to listen on. It may be set to "0.0.0.0".
849 : * \param[in] port The port to listen on.
850 : * \param[in] mode The mode to use to open the connection.
851 : * \param[in] pause The amount of time to wait before attempting a new
852 : * connection after a failure, in microseconds, or 0.
853 : * \param[in] use_thread Whether a thread is used to connect to the
854 : * server.
855 : */
856 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
857 : std::string const & address
858 : , int port
859 : , tcp_bio_client::mode_t mode
860 : , std::int64_t const pause
861 0 : , bool const use_thread)
862 : : timer(pause < 0 ? -pause : 0)
863 : , f_impl(std::make_shared<detail::tcp_client_permanent_message_connection_impl>(this, address, port, mode))
864 0 : , f_pause(llabs(pause))
865 0 : , f_use_thread(use_thread)
866 : {
867 0 : }
868 :
869 :
870 : /** \brief Destroy instance.
871 : *
872 : * This function cleans up everything in the permanent message object.
873 : */
874 0 : tcp_client_permanent_message_connection::~tcp_client_permanent_message_connection()
875 : {
876 : // Does nothing
877 0 : }
878 :
879 :
880 : /** \brief Attempt to send a message to this connection.
881 : *
882 : * If the connection is currently enabled, the message is sent immediately.
883 : * Otherwise, it may be cached if the \p cache parameter is set to true.
884 : * A cached message is forwarded as soon as a new successful connection
885 : * happens, which can be a problem if messages need to happen in a very
886 : * specific order (For example, after a reconnection to snapcommunicator
887 : * you first need to REGISTER or CONNECT...)
888 : *
889 : * \param[in] msg The message to send to the connected server.
890 : * \param[in] cache Whether the message should be cached.
891 : *
892 : * \return true if the message was sent, false if it was not sent, although
893 : * if cache was true, it was cached
894 : */
895 0 : bool tcp_client_permanent_message_connection::send_message(
896 : message const & msg
897 : , bool cache)
898 : {
899 0 : return f_impl->send_message(msg, cache);
900 : }
901 :
902 :
903 : /** \brief Check whether the connection is up.
904 : *
905 : * This function returns true if the connection is considered to be up.
906 : * This means sending messages will work quickly instead of being
907 : * cached up until an actual TCP/IP connection gets established.
908 : *
909 : * Note that the connection may have hanged up since, and the system
910 : * may not have yet detected the fact (i.e. the connection is going
911 : * to receive the process_hup() call after the event in which you are
912 : * working.)
913 : *
914 : * \return true if connected
915 : */
916 0 : bool tcp_client_permanent_message_connection::is_connected() const
917 : {
918 0 : return f_impl->is_connected();
919 : }
920 :
921 :
922 : /** \brief Disconnect the messenger now.
923 : *
924 : * This function kills the current connection.
925 : *
926 : * There are a few cases where two daemons communicate between each others
927 : * and at some point one of them wants to exit and needs to disconnect. This
928 : * function can be used in that one situation assuming that you have an
929 : * acknowledgement from the other daemon.
930 : *
931 : * Say you have daemon A and B. B wants to quit and before doing so sends
932 : * a form of "I'm quitting" message to A. In that situation, B is not closing
933 : * the messenger connection, A is responsible for that (i.e. A acknowledges
934 : * receipt of the "I'm quitting" message from B by closing the connection.)
935 : *
936 : * B also wants to call the mark_done() function to make sure that it
937 : * does not reconnected a split second later and instead the permanent
938 : * connection gets removed from the communicator list of connections.
939 : */
940 0 : void tcp_client_permanent_message_connection::disconnect()
941 : {
942 0 : f_impl->disconnect();
943 0 : }
944 :
945 :
946 : /** \brief Overload so we do not have to use namespace everywhere.
947 : *
948 : * This function overloads the connection::mark_done() function so
949 : * we can call it without the need to use timer::mark_done()
950 : * everywhere.
951 : */
952 0 : void tcp_client_permanent_message_connection::mark_done()
953 : {
954 0 : timer::mark_done();
955 0 : }
956 :
957 :
958 : /** \brief Mark connection as done.
959 : *
960 : * This function allows you to mark the permanent connection and the
961 : * messenger as done.
962 : *
963 : * Note that calling this function with false is the same as calling the
964 : * base class mark_done() function.
965 : *
966 : * If the \p message parameter is set to true, we suggest you also call
967 : * the disconnect() function. That way the messenger will truly get
968 : * removed from everyone quickly.
969 : *
970 : * \param[in] messenger If true, also mark the messenger as done.
971 : */
972 0 : void tcp_client_permanent_message_connection::mark_done(bool messenger)
973 : {
974 0 : timer::mark_done();
975 0 : if(messenger)
976 : {
977 0 : f_impl->mark_done();
978 : }
979 0 : }
980 :
981 :
982 : /** \brief Retrieve a copy of the client's address.
983 : *
984 : * This function makes a copy of the address of this client connection
985 : * to the \p address parameter and returns the length.
986 : *
987 : * \param[in] address The reference to an address variable where the
988 : * address gets copied.
989 : *
990 : * \return Return the length of the address which may be smaller than
991 : * sizeof(struct sockaddr). If zero, then no address is defined.
992 : *
993 : * \sa get_addr()
994 : */
995 0 : size_t tcp_client_permanent_message_connection::get_client_address(sockaddr_storage & address) const
996 : {
997 0 : return f_impl->get_client_address(address);
998 : }
999 :
1000 :
1001 : /** \brief Retrieve the remote computer address as a string.
1002 : *
1003 : * This function returns the address of the remote computer as a string.
1004 : * It will be a canonicalized IP address.
1005 : *
1006 : * \return The canonicalized IP address.
1007 : */
1008 0 : std::string tcp_client_permanent_message_connection::get_client_addr() const
1009 : {
1010 0 : return f_impl->get_client_addr();
1011 : }
1012 :
1013 :
1014 : /** \brief Internal timeout callback implementation.
1015 : *
1016 : * This callback implements the guts of this class: it attempts to connect
1017 : * to the specified address and port, optionally after creating a thread
1018 : * so the attempt can happen asynchronously.
1019 : *
1020 : * When the connection fails, the timer is used to try again pause
1021 : * microseconds later (pause as specified in the constructor).
1022 : *
1023 : * When a connection succeeds, the timer is disabled until you detect
1024 : * an error while using the connection and re-enable the timer.
1025 : *
1026 : * \warning
1027 : * This function changes the timeout delay to the pause amount
1028 : * as defined with the constructor. If you want to change that
1029 : * amount, you can do so an any point after this function call
1030 : * using the set_timeout_delay() function. If the pause parameter
1031 : * was set to -1, then the timeout never gets changed.
1032 : * However, you should not use a permanent message timer as your
1033 : * own or you will interfere with the internal use of the timer.
1034 : */
1035 0 : void tcp_client_permanent_message_connection::process_timeout()
1036 : {
1037 : // got a spurious call when already marked done
1038 : //
1039 0 : if(is_done())
1040 : {
1041 0 : return;
1042 : }
1043 :
1044 : // change the timeout delay although we will not use it immediately
1045 : // if we start the thread or attempt an immediate connection, but
1046 : // that way the user can change it by calling set_timeout_delay()
1047 : // at any time after the first process_timeout() call
1048 : //
1049 0 : if(f_pause > 0)
1050 : {
1051 0 : set_timeout_delay(f_pause);
1052 0 : f_pause = 0;
1053 : }
1054 :
1055 0 : if(f_use_thread)
1056 : {
1057 : // in this case we create a thread, run it and know whether the
1058 : // connection succeeded only when the thread tells us it did
1059 : //
1060 : // TODO: the background_connect() may return false in two situations:
1061 : // 1) when the thread is already running and then the behavior
1062 : // we have below is INCORRECT
1063 : // 2) when the thread cannot be started (i.e. could not
1064 : // allocate the stack?) in which case the if() below
1065 : // is the correct behavior
1066 : //
1067 0 : if(f_impl->background_connect())
1068 : {
1069 : // we started the thread successfully, so block the timer
1070 : //
1071 0 : set_enable(false);
1072 : }
1073 : }
1074 : else
1075 : {
1076 : // the success is noted when we receive a call to
1077 : // process_connected(); there we do set_enable(false)
1078 : // so the timer stops
1079 : //
1080 0 : f_impl->connect();
1081 : }
1082 : }
1083 :
1084 :
1085 : /** \brief Process an error.
1086 : *
1087 : * When an error occurs, we restart the timer so we can attempt to reconnect
1088 : * to that server.
1089 : *
1090 : * If you overload this function, make sure to either call this
1091 : * implementation or enable the timer yourselves.
1092 : *
1093 : * \warning
1094 : * This function does not call the timer::process_error() function
1095 : * which means that this connection is not automatically removed from
1096 : * the communicator object on failures.
1097 : */
1098 0 : void tcp_client_permanent_message_connection::process_error()
1099 : {
1100 0 : if(is_done())
1101 : {
1102 0 : timer::process_error();
1103 : }
1104 : else
1105 : {
1106 0 : f_impl->disconnect();
1107 0 : set_enable(true);
1108 : }
1109 0 : }
1110 :
1111 :
1112 : /** \brief Process a hang up.
1113 : *
1114 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1115 : * to that server.
1116 : *
1117 : * If you overload this function, make sure to either call this
1118 : * implementation or enable the timer yourselves.
1119 : *
1120 : * \warning
1121 : * This function does not call the timer::process_hup() function
1122 : * which means that this connection is not automatically removed from
1123 : * the communicator object on failures.
1124 : */
1125 0 : void tcp_client_permanent_message_connection::process_hup()
1126 : {
1127 0 : if(is_done())
1128 : {
1129 0 : timer::process_hup();
1130 : }
1131 : else
1132 : {
1133 0 : f_impl->disconnect();
1134 0 : set_enable(true);
1135 : }
1136 0 : }
1137 :
1138 :
1139 : /** \brief Process an invalid signal.
1140 : *
1141 : * When an invalid signal occurs, we restart the timer so we can attempt
1142 : * to reconnect to that server.
1143 : *
1144 : * If you overload this function, make sure to either call this
1145 : * implementation or enable the timer yourselves.
1146 : *
1147 : * \warning
1148 : * This function does not call the timer::process_invalid() function
1149 : * which means that this connection is not automatically removed from
1150 : * the communicator object on failures.
1151 : */
1152 0 : void tcp_client_permanent_message_connection::process_invalid()
1153 : {
1154 0 : if(is_done())
1155 : {
1156 0 : timer::process_invalid();
1157 : }
1158 : else
1159 : {
1160 0 : f_impl->disconnect();
1161 0 : set_enable(true);
1162 : }
1163 0 : }
1164 :
1165 :
1166 : /** \brief Make sure that the messenger connection gets removed.
1167 : *
1168 : * This function makes sure that the messenger sub-connection also gets
1169 : * removed from the communicator. Otherwise it would lock the system
1170 : * since connections are saved in the communicator object as shared
1171 : * pointers.
1172 : */
1173 0 : void tcp_client_permanent_message_connection::connection_removed()
1174 : {
1175 0 : f_impl->disconnect();
1176 0 : }
1177 :
1178 :
1179 : /** \brief Process a connection failed callback.
1180 : *
1181 : * When a connection attempt fails, we restart the timer so we can
1182 : * attempt to reconnect to that server.
1183 : *
1184 : * If you overload this function, make sure to either call this
1185 : * implementation or enable the timer yourselves.
1186 : *
1187 : * \param[in] error_message The error message that triggered this callback.
1188 : */
1189 0 : void tcp_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1190 : {
1191 0 : snapdev::NOT_USED(error_message);
1192 0 : set_enable(true);
1193 0 : }
1194 :
1195 :
1196 : /** \brief The connection is ready.
1197 : *
1198 : * This callback gets called whenever the connection succeeded and is
1199 : * ready to be used.
1200 : *
1201 : * You should implement this virtual function if you have to initiate
1202 : * the communication. For example, the snapserver has to send a
1203 : * REGISTER to the snapcommunicator system and thus implements this
1204 : * function.
1205 : *
1206 : * The default implementation makes sure that the timer gets turned off
1207 : * so we do not try to reconnect every minute or so.
1208 : */
1209 0 : void tcp_client_permanent_message_connection::process_connected()
1210 : {
1211 0 : set_enable(false);
1212 0 : }
1213 :
1214 :
1215 :
1216 6 : } // namespace ed
1217 : // vim: ts=4 sw=4 et
|