Line data Source code
1 : // Copyright (c) 2012-2022 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the permanent TCP connection.
22 : *
23 : * This class is an extension of the TCP client message connection used
24 : * to handle a TCP connection which supports messages and will automatically
25 : * try to reconnect if the connection drops.
26 : *
27 : * The class will also try to connect with the next address if more than one
28 : * is available.
29 : */
30 :
31 :
32 : // self
33 : //
34 : #include "eventdispatcher/tcp_client_permanent_message_connection.h"
35 :
36 : #include "eventdispatcher/communicator.h"
37 : #include "eventdispatcher/exception.h"
38 : #include "eventdispatcher/tcp_server_client_message_connection.h"
39 : #include "eventdispatcher/thread_done_signal.h"
40 :
41 :
42 : // snaplogger
43 : //
44 : #include <snaplogger/message.h>
45 :
46 :
47 : // snapdev
48 : //
49 : #include <snapdev/not_used.h>
50 :
51 :
52 : // cppthread
53 : //
54 : #include <cppthread/exception.h>
55 : #include <cppthread/guard.h>
56 : #include <cppthread/runner.h>
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // C++
61 : //
62 : #include <cstring>
63 :
64 :
65 : // C
66 : //
67 : #include <sys/socket.h>
68 :
69 :
70 : // last include
71 : //
72 : #include <snapdev/poison.h>
73 :
74 :
75 :
76 : namespace ed
77 : {
78 :
79 :
80 :
81 : namespace detail
82 : {
83 :
84 :
85 : /** \brief Internal implementation of the tcp_client_permanent_message_connection class.
86 : *
87 : * This class is used to handle a thread that will process a connection for
88 : * us. This allows us to connect in any amount of time required by the
89 : * Unix system to obtain the connection with the remote server.
90 : *
91 : * \todo
92 : * Having threads at the time we do a fork() is not safe. We may
93 : * want to reconsider offering this functionality here. Because at
94 : * this time we would have no control of when the thread is created
95 : * and thus a way to make sure that no such thread is running when
96 : * we call fork().
97 : */
98 : class tcp_client_permanent_message_connection_impl
99 : {
100 : public:
101 0 : class messenger
102 : : public tcp_server_client_message_connection
103 : {
104 : public:
105 : typedef std::shared_ptr<messenger> pointer_t;
106 :
107 0 : messenger(tcp_client_permanent_message_connection * parent, tcp_bio_client::pointer_t client)
108 0 : : tcp_server_client_message_connection(client)
109 0 : , f_parent(parent)
110 : {
111 0 : set_name("tcp_client_permanent_message_connection_impl::messenger");
112 0 : }
113 :
114 : messenger(messenger const & rhs) = delete;
115 : messenger & operator = (messenger const & rhs) = delete;
116 :
117 : // connection implementation
118 0 : virtual void process_empty_buffer()
119 : {
120 0 : tcp_server_client_message_connection::process_empty_buffer();
121 0 : f_parent->process_empty_buffer();
122 0 : }
123 :
124 : // connection implementation
125 0 : virtual void process_error()
126 : {
127 0 : tcp_server_client_message_connection::process_error();
128 0 : f_parent->process_error();
129 0 : }
130 :
131 : // connection implementation
132 0 : virtual void process_hup()
133 : {
134 0 : tcp_server_client_message_connection::process_hup();
135 0 : f_parent->process_hup();
136 0 : }
137 :
138 : // connection implementation
139 0 : virtual void process_invalid()
140 : {
141 0 : tcp_server_client_message_connection::process_invalid();
142 0 : f_parent->process_invalid();
143 0 : }
144 :
145 : // tcp_server_client_message_connection implementation
146 0 : virtual void process_message(message & msg)
147 : {
148 : // We call the dispatcher from our parent since the child
149 : // (this messenger) is not given a dispatcher
150 : //
151 0 : f_parent->dispatch_message(msg);
152 0 : }
153 :
154 : private:
155 : tcp_client_permanent_message_connection * f_parent = nullptr;
156 : };
157 :
158 0 : class thread_signal_handler
159 : : public thread_done_signal
160 : {
161 : public:
162 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
163 :
164 0 : thread_signal_handler(tcp_client_permanent_message_connection_impl * parent_impl)
165 0 : : f_parent_impl(parent_impl)
166 : {
167 0 : set_name("tcp_client_permanent_message_connection_impl::thread_signal_handler");
168 0 : }
169 :
170 : thread_signal_handler(thread_signal_handler const &) = delete;
171 : thread_signal_handler & operator = (thread_signal_handler const &) = delete;
172 :
173 : /** \brief This signal was emitted.
174 : *
175 : * This function gets called whenever the thread is just about to
176 : * quit. Calling f_thread.is_running() may still return true when
177 : * you get in the 'thread_done()' callback. However, an
178 : * f_thread.stop() will return very quickly.
179 : */
180 0 : virtual void process_read()
181 : {
182 0 : thread_done_signal::process_read();
183 :
184 0 : f_parent_impl->thread_done();
185 0 : }
186 :
187 : private:
188 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
189 : };
190 :
191 0 : class runner
192 : : public cppthread::runner
193 : {
194 : public:
195 0 : runner(
196 : tcp_client_permanent_message_connection_impl * parent_impl
197 : , addr::addr::vector_t const & addresses
198 : , mode_t mode)
199 0 : : cppthread::runner("background tcp_client_permanent_message_connection for asynchronous connections")
200 : , f_parent_impl(parent_impl)
201 : , f_addresses(addresses)
202 0 : , f_mode(mode)
203 : {
204 0 : }
205 :
206 : runner(runner const &) = delete;
207 : runner & operator = (runner const &) = delete;
208 :
209 :
210 : /** \brief This is the actual function run by the thread.
211 : *
212 : * This function calls the connect() function and then
213 : * tells the main thread we are done.
214 : */
215 0 : virtual void run()
216 : {
217 0 : connect();
218 :
219 : // tell the main thread that we are done
220 : //
221 0 : f_parent_impl->trigger_thread_done();
222 0 : }
223 :
224 :
225 : /** \brief This function attempts to connect.
226 : *
227 : * This function attempts a connection to the specified address
228 : * and port with the specified mode (i.e. plain or encrypted.)
229 : *
230 : * The function may take a long time to succeed connecting with
231 : * the server. The main thread will be awaken whenever this
232 : * thread dies.
233 : *
234 : * If an error occurs, then the f_socket variable member will
235 : * be set to -1. Otherwise it represents the socket that we
236 : * just connected with.
237 : */
238 0 : void connect()
239 : {
240 0 : char const * error_name(nullptr);
241 : try
242 : {
243 : // create a socket using the bio_client class,
244 : // but then just create a duplicate that we will
245 : // use in a server-client TCP object (because
246 : // we cannot directly create the right type of
247 : // object otherwise...)
248 : //
249 0 : f_tcp_connection = std::make_shared<tcp_bio_client>(f_addresses[f_index], f_mode);
250 0 : return;
251 : }
252 0 : catch(failed_connecting const & e)
253 : {
254 0 : error_name = "failed_connecting";
255 0 : f_last_error = e.what();
256 : }
257 0 : catch(initialization_error const & e)
258 : {
259 0 : error_name = "initialization_error";
260 0 : f_last_error = e.what();
261 : }
262 0 : catch(runtime_error const & e)
263 : {
264 0 : error_name = "runtime_error";
265 0 : f_last_error = e.what();
266 : }
267 0 : catch(std::exception const & e)
268 : {
269 0 : error_name = "std::exception";
270 0 : f_last_error = e.what();
271 : }
272 0 : catch(...)
273 : {
274 0 : error_name = "a non-standard exception";
275 0 : f_last_error = "Unknown exception";
276 : }
277 0 : f_tcp_connection.reset();
278 :
279 : // on an error, we want to try the next address
280 : //
281 0 : ++f_index;
282 0 : if(f_index >= f_addresses.size())
283 : {
284 0 : f_index = 0;
285 : }
286 :
287 : // connection failed... we will have to try again later
288 : //
289 0 : SNAP_LOG_ERROR
290 : << "connection to "
291 : << addr::setaddrmode(addr::addr::string_ip_t::STRING_IP_PORT)
292 0 : << f_addresses
293 : << " failed with: "
294 : << f_last_error
295 : << " ("
296 : << error_name
297 : << ")"
298 : << SNAP_LOG_SEND;
299 : }
300 :
301 :
302 : /** \brief Retrieve the address to connect to.
303 : *
304 : * This function returns the address we connected to or, if not
305 : * connected, the one that we will attempt to connect to next.
306 : *
307 : * \note
308 : * Since the variable is constant, it is likely to never change.
309 : * However, the c_str() function may change the buffer pointer.
310 : * Hence, to be 100% safe, you cannot call this function until
311 : * you make sure that the thread is fully stopped.
312 : *
313 : * \return The destination address.
314 : */
315 0 : addr::addr const & get_address() const
316 : {
317 0 : return f_addresses[f_index];
318 : }
319 :
320 :
321 : /** \brief Retrieve the vector of addresses to connect to.
322 : *
323 : * This function returns the addresses passed in on creation.
324 : *
325 : * \note
326 : * Since the variable is constant, it is likely to never change.
327 : * However, the c_str() function may change the buffer pointer.
328 : * Hence, to be 100% safe, you cannot call this function until
329 : * you make sure that the thread is fully stopped.
330 : *
331 : * \return The destination addresses.
332 : */
333 : addr::addr::vector_t const & get_addresses() const
334 : {
335 : return f_addresses;
336 : }
337 :
338 :
339 : /** \brief Retrieve the client allocated and connected by the thread.
340 : *
341 : * This function returns the TCP connection object resulting from
342 : * connection attempts of the background thread.
343 : *
344 : * If the pointer is null, then you may get the corresponding
345 : * error message using the get_last_error() function.
346 : *
347 : * You can get the client TCP connection pointer once. After that
348 : * you always get a null pointer.
349 : *
350 : * \note
351 : * This function is guarded so the pointer and the object it
352 : * points to will be valid in another thread that retrieves it.
353 : *
354 : * \return The connection pointer.
355 : */
356 0 : tcp_bio_client::pointer_t release_client()
357 : {
358 0 : cppthread::guard g(f_mutex);
359 0 : tcp_bio_client::pointer_t tcp_connection;
360 0 : tcp_connection.swap(f_tcp_connection);
361 0 : return tcp_connection;
362 : }
363 :
364 :
365 : /** \brief Retrieve the last error message that happened.
366 : *
367 : * This function returns the last error message that was captured
368 : * when trying to connect to the socket. The message is the
369 : * e.what() message from the exception we captured.
370 : *
371 : * The message does not get cleared so the function can be called
372 : * any number of times. To know whether an error was generated
373 : * on the last attempt, make sure to first get the get_socket()
374 : * and if it returns -1, then this message is significant,
375 : * otherwise it is from a previous error.
376 : *
377 : * \warning
378 : * Remember that if the background thread was used the error will
379 : * NOT be available in the main thread until a full memory barrier
380 : * was executed. For that reason we make sure that the thread
381 : * was stopped when we detect an error.
382 : *
383 : * \return The last error message.
384 : */
385 0 : std::string const & get_last_error() const
386 : {
387 0 : return f_last_error;
388 : }
389 :
390 :
391 : /** \brief Close the connection.
392 : *
393 : * This function closes the connection. Since the f_tcp_connection
394 : * holds the socket to the remote server, we have get this function
395 : * called in order to completely disconnect.
396 : *
397 : * \note
398 : * This function does not clear the f_last_error parameter so it
399 : * can be read later.
400 : */
401 0 : void close()
402 : {
403 0 : f_tcp_connection.reset();
404 0 : }
405 :
406 :
407 : private:
408 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
409 : std::size_t f_index = 0;
410 : addr::addr::vector_t const f_addresses;
411 : mode_t const f_mode;
412 : tcp_bio_client::pointer_t f_tcp_connection = tcp_bio_client::pointer_t();
413 : std::string f_last_error = std::string();
414 : };
415 :
416 :
417 : /** \brief Initialize a permanent message connection implementation object.
418 : *
419 : * This object manages the thread used to asynchronically connect to
420 : * the specified address and port.
421 : *
422 : * This class and its sub-classes may end up executing callbacks
423 : * of the tcp_client_permanent_message_connection object.
424 : * However, in all cases these are never run from the thread.
425 : *
426 : * \param[in] parent A pointer to the owner of this
427 : * tcp_client_permanent_message_connection_impl object.
428 : * \param[in] addresses The addresses we are to connect to.
429 : * \param[in] mode The mode used to connect.
430 : */
431 0 : tcp_client_permanent_message_connection_impl(
432 : tcp_client_permanent_message_connection * parent
433 : , addr::addr::vector_t const & addresses
434 : , mode_t mode)
435 0 : : f_parent(parent)
436 : , f_thread_runner(this, addresses, mode)
437 0 : , f_thread("background connection handler thread", &f_thread_runner)
438 : {
439 0 : }
440 :
441 :
442 : tcp_client_permanent_message_connection_impl(tcp_client_permanent_message_connection_impl const & rhs) = delete;
443 : tcp_client_permanent_message_connection_impl & operator = (tcp_client_permanent_message_connection_impl const & rhs) = delete;
444 :
445 : /** \brief Destroy the permanent message connection.
446 : *
447 : * This function makes sure that the messenger was lost.
448 : */
449 0 : ~tcp_client_permanent_message_connection_impl()
450 0 : {
451 : // to make sure we can lose the messenger, first we want to be sure
452 : // that we do not have a thread running
453 : //
454 : try
455 : {
456 0 : f_thread.stop();
457 : }
458 0 : catch(cppthread::cppthread_mutex_failed_error const &)
459 : {
460 : }
461 0 : catch(cppthread::cppthread_invalid_error const &)
462 : {
463 : }
464 :
465 : // in this case we may still have an instance of the f_thread_done
466 : // which linger around, we want it out
467 : //
468 : // Note: the call is safe even if the f_thread_done is null
469 : //
470 0 : communicator::instance()->remove_connection(f_thread_done);
471 :
472 : // although the f_messenger variable gets reset automatically in
473 : // the destructor, it would not get removed from the
474 : // communicator instance if we were not doing it explicitly
475 : //
476 0 : disconnect();
477 0 : }
478 :
479 :
480 : /** \brief Direct connect to the messenger.
481 : *
482 : * In this case we try to connect without the thread. This allows
483 : * us to avoid the thread problems, but we are blocked until the
484 : * OS decides to time out or the connection worked.
485 : */
486 0 : void connect()
487 : {
488 0 : if(f_done)
489 : {
490 0 : SNAP_LOG_ERROR
491 : << "Permanent connection marked done. Cannot attempt to reconnect."
492 : << SNAP_LOG_SEND;
493 0 : return;
494 : }
495 :
496 : // call the thread connect() function from the main thread
497 : //
498 0 : f_thread_runner.connect();
499 :
500 : // simulate receiving the thread_done() signal
501 : //
502 0 : thread_done();
503 : }
504 :
505 :
506 : /** \brief Check whether the permanent connection is currently connected.
507 : *
508 : * This function returns true if the messenger exists, which means that
509 : * the connection is up.
510 : *
511 : * \return true if the connection is up.
512 : */
513 0 : bool is_connected()
514 : {
515 0 : return f_messenger != nullptr;
516 : }
517 :
518 :
519 : /** \brief Try to start the thread runner.
520 : *
521 : * This function tries to start the thread runner in order to initiate
522 : * a connection in the background. If the thread could not be started,
523 : * then the function returns false.
524 : *
525 : * If the thread started, then the function returns true. This does
526 : * not mean that the connection was obtained. This is known once
527 : * the process_connected() function is called.
528 : *
529 : * \return true if the thread was successfully started.
530 : */
531 0 : bool background_connect()
532 : {
533 0 : if(f_done)
534 : {
535 0 : SNAP_LOG_ERROR
536 : << "Permanent connection marked done. Cannot attempt to reconnect."
537 : << SNAP_LOG_SEND;
538 0 : return false;
539 : }
540 :
541 0 : if(f_thread.is_running())
542 : {
543 0 : SNAP_LOG_ERROR
544 : << "A background connection attempt is already in progress. Further requests are ignored."
545 : << SNAP_LOG_SEND;
546 0 : return false;
547 : }
548 :
549 : // create the f_thread_done only when required
550 : //
551 0 : if(f_thread_done == nullptr)
552 : {
553 0 : f_thread_done = std::make_shared<thread_signal_handler>(this);
554 : }
555 :
556 0 : communicator::instance()->add_connection(f_thread_done);
557 :
558 0 : if(!f_thread.start())
559 : {
560 0 : SNAP_LOG_ERROR
561 : << "The thread used to run the background connection process did not start."
562 : << SNAP_LOG_SEND;
563 0 : return false;
564 : }
565 :
566 0 : return true;
567 : }
568 :
569 :
570 : /** \brief Tell the main thread that the background thread is done.
571 : *
572 : * This function is called by the thread so the thread_done()
573 : * function of the thread done object gets called. Only the
574 : * thread should call this function.
575 : *
576 : * As a result the thread_done() function of this class will be
577 : * called by the main thread.
578 : */
579 0 : void trigger_thread_done()
580 : {
581 0 : f_thread_done->thread_done();
582 0 : }
583 :
584 :
585 : /** \brief Signal that the background thread is done.
586 : *
587 : * This callback is called whenever the background thread sends
588 : * a signal to us. This is used to avoid calling end user functions
589 : * that would certainly cause a lot of problem if called from the
590 : * thread.
591 : *
592 : * The function calls the process_connection_failed() if the
593 : * connection did not happen.
594 : *
595 : * The function calls the process_connected() if the connection
596 : * did happen.
597 : *
598 : * \note
599 : * This is used only if the user requested that the connection
600 : * happen in the background (i.e. use_thread was set to true
601 : * in the tcp_client_permanent_message_connection object
602 : * constructor.)
603 : */
604 0 : void thread_done()
605 : {
606 : // if we used the thread we have to remove the signal used
607 : // to know that the thread was done
608 : //
609 0 : communicator::instance()->remove_connection(f_thread_done);
610 :
611 : // we will access the f_last_error member of the thread runner
612 : // which may not be available to the main thread yet, calling
613 : // stop forces a memory barrier so we are all good.
614 : //
615 : // calling stop() has no effect if we did not use the thread,
616 : // however, not calling stop() when we did use the thread
617 : // causes all sorts of other problems (especially, the thread
618 : // never gets joined)
619 : //
620 0 : f_thread.stop();
621 :
622 0 : tcp_bio_client::pointer_t client(f_thread_runner.release_client());
623 0 : if(f_done)
624 : {
625 : // already marked done, ignore the result and lose the
626 : // connection immediately
627 : //
628 : //f_thread_running.close(); -- not necessary, 'client' is the connection
629 0 : return;
630 : }
631 :
632 0 : if(client == nullptr)
633 : {
634 0 : SNAP_LOG_ERROR
635 : << "connection to "
636 0 : << f_thread_runner.get_address().to_ipv4or6_string(addr::addr::string_ip_t::STRING_IP_PORT)
637 : << " failed with: "
638 0 : << f_thread_runner.get_last_error()
639 : << SNAP_LOG_SEND;
640 :
641 : // signal that an error occurred
642 : //
643 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
644 : }
645 : else
646 : {
647 0 : f_messenger = std::make_shared<messenger>(f_parent, client);
648 :
649 : // add the messenger to the communicator
650 : //
651 0 : communicator::instance()->add_connection(f_messenger);
652 :
653 : // if some messages were cached, process them immediately
654 : //
655 0 : while(!f_message_cache.empty())
656 : {
657 0 : f_messenger->send_message(f_message_cache[0]);
658 0 : f_message_cache.erase(f_message_cache.begin());
659 : }
660 :
661 : // let the client know we are now connected
662 : //
663 0 : f_parent->process_connected();
664 : }
665 : }
666 :
667 : /** \brief Send a message to the connection.
668 : *
669 : * This implementation function actually sends the message to the
670 : * connection, assuming that the connection exists. Otherwise, it
671 : * may cache the message (if cache is true.)
672 : *
673 : * Note that the message does not get cached if mark_done() was
674 : * called earlier since we are trying to close the whole connection.
675 : *
676 : * \param[in] msg The message to send.
677 : * \param[in] cache Whether to cache the message if the connection is
678 : * currently down.
679 : *
680 : * \return true if the message was forwarded, false if the message
681 : * was ignored or cached.
682 : */
683 0 : bool send_message(message & msg, bool cache)
684 : {
685 0 : if(f_messenger != nullptr)
686 : {
687 0 : return f_messenger->send_message(msg);
688 : }
689 :
690 0 : if(cache && !f_done)
691 : {
692 0 : f_message_cache.push_back(msg);
693 : }
694 :
695 0 : return false;
696 : }
697 :
698 :
699 : /** \brief Forget about the messenger connection.
700 : *
701 : * This function is used to fully disconnect from the messenger.
702 : *
703 : * If there is a messenger, this means:
704 : *
705 : * \li Removing the messenger from the communicator instance.
706 : * \li Closing the connection in the thread object.
707 : *
708 : * In most cases, it is called when an error occur, also it happens
709 : * that we call it explicitly through the disconnect() function
710 : * of the permanent connection class.
711 : *
712 : * \note
713 : * This is safe, even though it is called from the messenger itself
714 : * because it will not get deleted yet. This is because the run()
715 : * loop has a copy in its own temporary copy of the vector of
716 : * connections.
717 : */
718 0 : void disconnect()
719 : {
720 0 : if(f_messenger != nullptr)
721 : {
722 0 : communicator::instance()->remove_connection(f_messenger);
723 0 : f_messenger.reset();
724 :
725 : // just the messenger does not close the TCP connection because
726 : // we may have another in the thread runner
727 : //
728 0 : f_thread_runner.close();
729 : }
730 0 : }
731 :
732 :
733 : /** \brief Return the address of the remote computer.
734 : *
735 : * This function retrieve a copy of the socket address of the remote
736 : * computer (also called the peer).
737 : *
738 : * If the connection is not currently valid, then the returned address
739 : * is the default address (all zeroes). You can determine such by
740 : * calling the addr::is_default() function.
741 : *
742 : * \return The address of the remote computer.
743 : */
744 0 : addr::addr get_client_address() const
745 : {
746 0 : if(f_messenger != nullptr)
747 : {
748 0 : return f_messenger->get_client_address();
749 : }
750 0 : return addr::addr();
751 : }
752 :
753 :
754 : /** \brief Mark the messenger as done.
755 : *
756 : * This function is used to mark the messenger as done. This means it
757 : * will get removed from the communicator instance as soon as it
758 : * is done with its current write buffer if there is one.
759 : *
760 : * You may also want to call the disconnection() function to actually
761 : * reset the pointer along the way.
762 : */
763 0 : void mark_done()
764 : {
765 0 : f_done = true;
766 :
767 : // once done we don't attempt to reconnect so we can as well
768 : // get rid of our existing cache immediately to save some
769 : // memory
770 : //
771 0 : f_message_cache.clear();
772 :
773 0 : if(f_messenger != nullptr)
774 : {
775 0 : f_messenger->mark_done();
776 : }
777 0 : }
778 :
779 :
780 : private:
781 : tcp_client_permanent_message_connection * f_parent = nullptr;
782 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
783 : runner f_thread_runner;
784 : cppthread::thread f_thread;
785 : messenger::pointer_t f_messenger = messenger::pointer_t();
786 : message::vector_t f_message_cache = message::vector_t();
787 : bool f_done = false;
788 : };
789 :
790 :
791 :
792 : }
793 : // namespace detail
794 :
795 :
796 :
797 : /** \brief Initializes this TCP client message connection.
798 : *
799 : * This implementation creates what we call a permanent connection.
800 : * Such a connection may fail once in a while. In such circumstances,
801 : * the class automatically requests for a reconnection (see various
802 : * parameters below in that regard). However, this causes one issue:
803 : * by default, the connection just never ends. When you are about
804 : * ready to close the connection, you must call the mark_done()
805 : * function first. This will tell the various error functions to
806 : * drop this connection instead of restarting it after a small pause.
807 : *
808 : * This constructor makes sure to initialize the timer and saves
809 : * the address, port, mode, pause, and use_thread parameters. This
810 : * constructor makes use of a single address.
811 : *
812 : * The timer is first set to trigger immediately. This means the TCP
813 : * connection will be attempted as soon as possible (the next time
814 : * the run() loop is entered, it will time out immediately.) You
815 : * are free to call set_timeout_date() with a date in the future if
816 : * you prefer that the connect be attempted a little later.
817 : *
818 : * The \p pause parameter is used if the connection is lost and this
819 : * timer is used again to attempt a new connection. It will be reused
820 : * as long as the connection fails (as a delay). It has to be at least
821 : * 10 microseconds, although really you should not use less than 1
822 : * second (1000000). You may set the pause parameter to 0 in which case
823 : * you are responsible to set the delay (by default there will be no
824 : * delay and thus the timer will never time out.)
825 : *
826 : * To start with a delay, instead of trying to connect immediately,
827 : * you may pass a negative pause parameter. So for example to get the
828 : * first attempt 5 seconds after you created this object, you use
829 : * -5000000LL as the pause parameter.
830 : *
831 : * The \p use_thread parameter determines whether the connection should
832 : * be attempted in a thread (asynchronously) or immediately (which means
833 : * the timeout callback may block for a while.) If the connection is to
834 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
835 : * the thread is probably not required. For connections to a remote
836 : * computer, though, it certainly is important.
837 : *
838 : * \param[in] address The address and port to connect to.
839 : * \param[in] mode The mode to use to open the connection.
840 : * \param[in] pause The amount of time to wait before attempting a new
841 : * connection after a failure, in microseconds, or 0.
842 : * \param[in] use_thread Whether a thread is used to connect to the
843 : * server.
844 : * \param[in] service_name The name of your daemon service. Only use once
845 : * on your permanent connection to snapcommunicator.
846 : */
847 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
848 : addr::addr const & address
849 : , mode_t mode
850 : , std::int64_t const pause
851 : , bool const use_thread
852 0 : , std::string const & service_name)
853 : : timer(pause < 0 ? -pause : 0)
854 : , connection_with_send_message(service_name)
855 0 : , f_impl(std::make_shared<detail::tcp_client_permanent_message_connection_impl>(this, addr::addr::vector_t{address}, mode))
856 0 : , f_pause(llabs(pause))
857 0 : , f_use_thread(use_thread)
858 : {
859 0 : }
860 :
861 :
862 : /** \brief Initializes this TCP client message connection.
863 : *
864 : * The difference between this constructor and the other one is the
865 : * array of addresses instead of using just one address, this constructor
866 : * supports multiple. Each address is used to try to connect to the
867 : * server on the other side. The use of another address happens
868 : * if the connection fails the connection, the SNI, the handshakes, etc.
869 : * If the connection comes up and is severed later, then the same address
870 : * is used to attempt the first reconnect.
871 : *
872 : * You may pass a vector with a single address in which case the system
873 : * behaves the same way as the other constructor.
874 : *
875 : * \param[in] addresses The addresses and ports to connect to.
876 : * \param[in] mode The mode to use to open the connection.
877 : * \param[in] pause The amount of time to wait before attempting a new
878 : * connection after a failure, in microseconds, or 0.
879 : * \param[in] use_thread Whether a thread is used to connect to the
880 : * server.
881 : * \param[in] service_name The name of your daemon service. Only use once
882 : * on your permanent connection to snapcommunicator.
883 : */
884 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
885 : addr::addr::vector_t const & addresses
886 : , mode_t mode
887 : , std::int64_t const pause
888 : , bool const use_thread
889 0 : , std::string const & service_name)
890 : : timer(pause < 0 ? -pause : 0)
891 : , connection_with_send_message(service_name)
892 : , f_impl(std::make_shared<detail::tcp_client_permanent_message_connection_impl>(this, addresses, mode))
893 0 : , f_pause(llabs(pause))
894 0 : , f_use_thread(use_thread)
895 : {
896 0 : }
897 :
898 :
899 : /** \brief Initializes this TCP client message connection.
900 : *
901 : * The difference between this constructor and the others is the array of
902 : * address ranges instead of using just one address or a vector of addresses.
903 : *
904 : * This constructor simply transforms the address ranges in a vector of
905 : * addresses and passes that down to our implementation which handles the
906 : * connection attempts.
907 : *
908 : * \param[in] address_ranges The address ranges and ports to connect to.
909 : * \param[in] mode The mode to use to open the connection.
910 : * \param[in] pause The amount of time to wait before attempting a new
911 : * connection after a failure, in microseconds, or 0.
912 : * \param[in] use_thread Whether a thread is used to connect to the
913 : * server.
914 : * \param[in] service_name The name of your daemon service. Only use once
915 : * on your permanent connection to snapcommunicator.
916 : */
917 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
918 : addr::addr_range::vector_t const & address_ranges
919 : , mode_t mode
920 : , std::int64_t const pause
921 : , bool const use_thread
922 0 : , std::string const & service_name)
923 : : timer(pause < 0 ? -pause : 0)
924 : , connection_with_send_message(service_name)
925 : , f_impl(std::make_shared<detail::tcp_client_permanent_message_connection_impl>(
926 : this
927 0 : , addr::addr_range::to_addresses(address_ranges)
928 : , mode))
929 0 : , f_pause(llabs(pause))
930 0 : , f_use_thread(use_thread)
931 : {
932 0 : }
933 :
934 :
935 : /** \brief Destroy instance.
936 : *
937 : * This function cleans up everything in the permanent message object.
938 : */
939 0 : tcp_client_permanent_message_connection::~tcp_client_permanent_message_connection()
940 : {
941 : // Does nothing
942 0 : }
943 :
944 :
945 : /** \brief Attempt to send a message to this connection.
946 : *
947 : * If the connection is currently enabled, the message is sent immediately.
948 : * Otherwise, it may be cached if the \p cache parameter is set to true.
949 : * A cached message is forwarded as soon as a new successful connection
950 : * happens, which can be a problem if messages need to happen in a very
951 : * specific order (For example, after a reconnection to snapcommunicator
952 : * you first need to REGISTER or CONNECT...)
953 : *
954 : * \param[in] msg The message to send to the connected server.
955 : * \param[in] cache Whether the message should be cached.
956 : *
957 : * \return true if the message was sent, false if it was not sent, although
958 : * if cache was true, it was cached
959 : */
960 0 : bool tcp_client_permanent_message_connection::send_message(
961 : message & msg
962 : , bool cache)
963 : {
964 0 : return f_impl->send_message(msg, cache);
965 : }
966 :
967 :
968 : /** \brief Check whether the connection is up.
969 : *
970 : * This function returns true if the connection is considered to be up.
971 : * This means sending messages will work quickly instead of being
972 : * cached up until an actual TCP/IP connection gets established.
973 : *
974 : * Note that the connection may have hanged up since, and the system
975 : * may not have yet detected the fact (i.e. the connection is going
976 : * to receive the process_hup() call after the event in which you are
977 : * working.)
978 : *
979 : * \return true if connected
980 : */
981 0 : bool tcp_client_permanent_message_connection::is_connected() const
982 : {
983 0 : return f_impl->is_connected();
984 : }
985 :
986 :
987 : /** \brief Disconnect the messenger now.
988 : *
989 : * This function kills the current connection.
990 : *
991 : * There are a few cases where two daemons communicate between each others
992 : * and at some point one of them wants to exit and needs to disconnect. This
993 : * function can be used in that one situation assuming that you have an
994 : * acknowledgement from the other daemon.
995 : *
996 : * Say you have daemon A and B. B wants to quit and before doing so sends
997 : * a form of "I'm quitting" message to A. In that situation, B is not closing
998 : * the messenger connection, A is responsible for that (i.e. A acknowledges
999 : * receipt of the "I'm quitting" message from B by closing the connection.)
1000 : *
1001 : * B also wants to call the mark_done() function to make sure that it
1002 : * does not reconnected a split second later and instead the permanent
1003 : * connection gets removed from the communicator list of connections.
1004 : */
1005 0 : void tcp_client_permanent_message_connection::disconnect()
1006 : {
1007 0 : f_impl->disconnect();
1008 0 : }
1009 :
1010 :
1011 : /** \brief Overload so we do not have to use namespace everywhere.
1012 : *
1013 : * This function overloads the connection::mark_done() function so
1014 : * we can call it without the need to use timer::mark_done()
1015 : * everywhere.
1016 : */
1017 0 : void tcp_client_permanent_message_connection::mark_done()
1018 : {
1019 0 : timer::mark_done();
1020 0 : }
1021 :
1022 :
1023 : /** \brief Mark connection as done.
1024 : *
1025 : * This function allows you to mark the permanent connection and the
1026 : * messenger as done.
1027 : *
1028 : * Note that calling this function with false is the same as calling the
1029 : * base class mark_done() function.
1030 : *
1031 : * If the \p message parameter is set to true, we suggest you also call
1032 : * the disconnect() function. That way the messenger will truly get
1033 : * removed from everywhere quickly.
1034 : *
1035 : * \param[in] messenger If true, also mark the messenger as done.
1036 : */
1037 0 : void tcp_client_permanent_message_connection::mark_done(bool messenger)
1038 : {
1039 0 : timer::mark_done();
1040 0 : if(messenger)
1041 : {
1042 0 : f_impl->mark_done();
1043 : }
1044 0 : }
1045 :
1046 :
1047 : /** \brief Retrieve a copy of the client's address.
1048 : *
1049 : * This function makes a copy of the address of this client connection.
1050 : *
1051 : * \return Return the client's address.
1052 : *
1053 : * \sa get_addr()
1054 : */
1055 0 : addr::addr tcp_client_permanent_message_connection::get_client_address() const
1056 : {
1057 0 : return f_impl->get_client_address();
1058 : }
1059 :
1060 :
1061 : /** \brief Internal timeout callback implementation.
1062 : *
1063 : * This callback implements the guts of this class: it attempts to connect
1064 : * to the specified address and port, optionally after creating a thread
1065 : * so the attempt can happen asynchronously.
1066 : *
1067 : * When the connection fails, the timer is used to try again pause
1068 : * microseconds later (pause as specified in the constructor).
1069 : *
1070 : * When a connection succeeds, the timer is disabled until you detect
1071 : * an error while using the connection and re-enable the timer.
1072 : *
1073 : * \warning
1074 : * This function changes the timeout delay to the pause amount
1075 : * as defined with the constructor. If you want to change that
1076 : * amount, you can do so an any point after this function call
1077 : * using the set_timeout_delay() function. If the pause parameter
1078 : * was set to -1, then the timeout never gets changed.
1079 : * However, you should not use a permanent message timer as your
1080 : * own or you will interfere with the internal use of the timer.
1081 : */
1082 0 : void tcp_client_permanent_message_connection::process_timeout()
1083 : {
1084 : // got a spurious call when already marked done
1085 : //
1086 0 : if(is_done())
1087 : {
1088 0 : return;
1089 : }
1090 :
1091 : // change the timeout delay although we will not use it immediately
1092 : // if we start the thread or attempt an immediate connection, but
1093 : // that way the user can change it by calling set_timeout_delay()
1094 : // at any time after the first process_timeout() call
1095 : //
1096 0 : if(f_pause > 0)
1097 : {
1098 0 : set_timeout_delay(f_pause);
1099 0 : f_pause = 0;
1100 : }
1101 :
1102 0 : if(f_use_thread)
1103 : {
1104 : // in this case we create a thread, run it and know whether the
1105 : // connection succeeded only when the thread tells us it did
1106 : //
1107 : // TODO: the background_connect() may return false in two situations:
1108 : // 1) when the thread is already running and then the behavior
1109 : // we have below is INCORRECT
1110 : // 2) when the thread cannot be started (i.e. could not
1111 : // allocate the stack?) in which case the if() below
1112 : // is the correct behavior
1113 : //
1114 0 : if(f_impl->background_connect())
1115 : {
1116 : // we started the thread successfully, so block the timer
1117 : //
1118 0 : set_enable(false);
1119 : }
1120 : }
1121 : else
1122 : {
1123 : // the success is noted when we receive a call to
1124 : // process_connected(); there we do set_enable(false)
1125 : // so the timer stops
1126 : //
1127 0 : f_impl->connect();
1128 : }
1129 : }
1130 :
1131 :
1132 : /** \brief Process an error.
1133 : *
1134 : * When an error occurs, we restart the timer so we can attempt to reconnect
1135 : * to that server.
1136 : *
1137 : * If you overload this function, make sure to either call this
1138 : * implementation or enable the timer yourselves.
1139 : *
1140 : * \warning
1141 : * This function does not call the timer::process_error() function
1142 : * which means that this connection is not automatically removed from
1143 : * the communicator object on failures.
1144 : */
1145 0 : void tcp_client_permanent_message_connection::process_error()
1146 : {
1147 0 : if(is_done())
1148 : {
1149 0 : timer::process_error();
1150 : }
1151 : else
1152 : {
1153 0 : f_impl->disconnect();
1154 0 : set_enable(true);
1155 : }
1156 0 : }
1157 :
1158 :
1159 : /** \brief Process a hang up.
1160 : *
1161 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1162 : * to that server.
1163 : *
1164 : * If you overload this function, make sure to either call this
1165 : * implementation or enable the timer yourselves.
1166 : *
1167 : * \warning
1168 : * This function does not call the timer::process_hup() function
1169 : * which means that this connection is not automatically removed from
1170 : * the communicator object on failures.
1171 : */
1172 0 : void tcp_client_permanent_message_connection::process_hup()
1173 : {
1174 0 : if(is_done())
1175 : {
1176 0 : timer::process_hup();
1177 : }
1178 : else
1179 : {
1180 0 : f_impl->disconnect();
1181 0 : set_enable(true);
1182 : }
1183 0 : }
1184 :
1185 :
1186 : /** \brief Process an invalid signal.
1187 : *
1188 : * When an invalid signal occurs, we restart the timer so we can attempt
1189 : * to reconnect to that server.
1190 : *
1191 : * If you overload this function, make sure to either call this
1192 : * implementation or enable the timer yourselves.
1193 : *
1194 : * \warning
1195 : * This function does not call the timer::process_invalid() function
1196 : * which means that this connection is not automatically removed from
1197 : * the communicator object on failures.
1198 : */
1199 0 : void tcp_client_permanent_message_connection::process_invalid()
1200 : {
1201 0 : if(is_done())
1202 : {
1203 0 : timer::process_invalid();
1204 : }
1205 : else
1206 : {
1207 0 : f_impl->disconnect();
1208 0 : set_enable(true);
1209 : }
1210 0 : }
1211 :
1212 :
1213 : /** \brief Make sure that the messenger connection gets removed.
1214 : *
1215 : * This function makes sure that the messenger sub-connection also gets
1216 : * removed from the communicator. Otherwise it would lock the system
1217 : * since connections are saved in the communicator object as shared
1218 : * pointers.
1219 : */
1220 0 : void tcp_client_permanent_message_connection::connection_removed()
1221 : {
1222 0 : f_impl->disconnect();
1223 0 : }
1224 :
1225 :
1226 : /** \brief Process a connection failed callback.
1227 : *
1228 : * When a connection attempt fails, we restart the timer so we can
1229 : * attempt to reconnect to that server.
1230 : *
1231 : * If you overload this function, make sure to either call this
1232 : * implementation or enable the timer yourselves.
1233 : *
1234 : * \param[in] error_message The error message that triggered this callback.
1235 : */
1236 0 : void tcp_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1237 : {
1238 0 : snapdev::NOT_USED(error_message);
1239 0 : set_enable(true);
1240 0 : }
1241 :
1242 :
1243 : /** \brief The connection is ready.
1244 : *
1245 : * This callback gets called whenever the connection succeeded and is
1246 : * ready to be used.
1247 : *
1248 : * You should implement this virtual function if you have to initiate
1249 : * the communication. For example, the snapserver has to send a
1250 : * REGISTER to the snapcommunicator system and thus implements this
1251 : * function.
1252 : *
1253 : * The default implementation makes sure that the timer gets turned off
1254 : * so we do not try to reconnect every minute or so.
1255 : */
1256 0 : void tcp_client_permanent_message_connection::process_connected()
1257 : {
1258 0 : set_enable(false);
1259 0 : }
1260 :
1261 :
1262 :
1263 6 : } // namespace ed
1264 : // vim: ts=4 sw=4 et
|