Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/tcp_client_permanent_message_connection.h"
46 :
47 : #include "eventdispatcher/communicator.h"
48 : #include "eventdispatcher/exception.h"
49 : #include "eventdispatcher/tcp_server_client_message_connection.h"
50 : #include "eventdispatcher/thread_done_signal.h"
51 :
52 :
53 : // snaplogger lib
54 : //
55 : #include <snaplogger/message.h>
56 :
57 :
58 : // snapdev lib
59 : //
60 : #include <snapdev/not_used.h>
61 :
62 :
63 : // cppthread lib
64 : //
65 : #include <cppthread/exception.h>
66 : #include <cppthread/guard.h>
67 : #include <cppthread/runner.h>
68 : #include <cppthread/thread.h>
69 :
70 :
71 : // C++ lib
72 : //
73 : #include <cstring>
74 :
75 :
76 : // C lib
77 : //
78 : #include <sys/socket.h>
79 :
80 :
81 : // last include
82 : //
83 : #include <snapdev/poison.h>
84 :
85 :
86 :
87 : namespace ed
88 : {
89 :
90 :
91 :
92 : namespace detail
93 : {
94 :
95 :
96 : /** \brief Internal implementation of the tcp_client_permanent_message_connection class.
97 : *
98 : * This class is used to handle a thread that will process a connection for
99 : * us. This allows us to connect in any amount of time required by the
100 : * Unix system to obtain the connection with the remote server.
101 : *
102 : * \todo
103 : * Having threads at the time we do a fork() is not safe. We may
104 : * want to reconsider offering this functionality here. Because at
105 : * this time we would have no control of when the thread is created
106 : * and thus a way to make sure that no such thread is running when
107 : * we call fork().
108 : */
109 : class tcp_client_permanent_message_connection_impl
110 : {
111 : public:
112 0 : class messenger
113 : : public tcp_server_client_message_connection
114 : {
115 : public:
116 : typedef std::shared_ptr<messenger> pointer_t;
117 :
118 0 : messenger(tcp_client_permanent_message_connection * parent, tcp_bio_client::pointer_t client)
119 0 : : tcp_server_client_message_connection(client)
120 0 : , f_parent(parent)
121 : {
122 0 : set_name("tcp_client_permanent_message_connection_impl::messenger");
123 0 : }
124 :
125 : messenger(messenger const & rhs) = delete;
126 : messenger & operator = (messenger const & rhs) = delete;
127 :
128 : // connection implementation
129 0 : virtual void process_empty_buffer()
130 : {
131 0 : tcp_server_client_message_connection::process_empty_buffer();
132 0 : f_parent->process_empty_buffer();
133 0 : }
134 :
135 : // connection implementation
136 0 : virtual void process_error()
137 : {
138 0 : tcp_server_client_message_connection::process_error();
139 0 : f_parent->process_error();
140 0 : }
141 :
142 : // connection implementation
143 0 : virtual void process_hup()
144 : {
145 0 : tcp_server_client_message_connection::process_hup();
146 0 : f_parent->process_hup();
147 0 : }
148 :
149 : // connection implementation
150 0 : virtual void process_invalid()
151 : {
152 0 : tcp_server_client_message_connection::process_invalid();
153 0 : f_parent->process_invalid();
154 0 : }
155 :
156 : // tcp_server_client_message_connection implementation
157 0 : virtual void process_message(message const & msg)
158 : {
159 : // We call the dispatcher from our parent since the child
160 : // (this messenger) is not given a dispatcher
161 : //
162 0 : message copy(msg);
163 0 : f_parent->dispatch_message(copy);
164 0 : }
165 :
166 : private:
167 : tcp_client_permanent_message_connection * f_parent = nullptr;
168 : };
169 :
170 0 : class thread_signal_handler
171 : : public thread_done_signal
172 : {
173 : public:
174 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
175 :
176 0 : thread_signal_handler(tcp_client_permanent_message_connection_impl * parent_impl)
177 0 : : f_parent_impl(parent_impl)
178 : {
179 0 : set_name("tcp_client_permanent_message_connection_impl::thread_signal_handler");
180 0 : }
181 :
182 : thread_signal_handler(thread_signal_handler const & rhs) = delete;
183 : thread_signal_handler & operator = (thread_signal_handler const & rhs) = delete;
184 :
185 : /** \brief This signal was emitted.
186 : *
187 : * This function gets called whenever the thread is just about to
188 : * quit. Calling f_thread.is_running() may still return true when
189 : * you get in the 'thread_done()' callback. However, an
190 : * f_thread.stop() will return very quickly.
191 : */
192 0 : virtual void process_read()
193 : {
194 0 : thread_done_signal::process_read();
195 :
196 0 : f_parent_impl->thread_done();
197 0 : }
198 :
199 : private:
200 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
201 : };
202 :
203 0 : class runner
204 : : public cppthread::runner
205 : {
206 : public:
207 0 : runner(
208 : tcp_client_permanent_message_connection_impl * parent_impl
209 : , std::string const & address
210 : , int port
211 : , tcp_bio_client::mode_t mode)
212 0 : : cppthread::runner("background tcp_client_permanent_message_connection for asynchronous connections")
213 : , f_parent_impl(parent_impl)
214 : , f_address(address)
215 : , f_port(port)
216 0 : , f_mode(mode)
217 : {
218 0 : }
219 :
220 : runner(runner const & rhs) = delete;
221 : runner & operator = (runner const & rhs) = delete;
222 :
223 :
224 : /** \brief This is the actual function run by the thread.
225 : *
226 : * This function calls the connect() function and then
227 : * tells the main thread we are done.
228 : */
229 0 : virtual void run()
230 : {
231 0 : connect();
232 :
233 : // tell the main thread that we are done
234 : //
235 0 : f_parent_impl->trigger_thread_done();
236 0 : }
237 :
238 :
239 : /** \brief This function attempts to connect.
240 : *
241 : * This function attempts a connection to the specified address
242 : * and port with the specified mode (i.e. plain or encrypted.)
243 : *
244 : * The function may take a long time to succeed connecting with
245 : * the server. The main thread will be awaken whenever this
246 : * thread dies.
247 : *
248 : * If an error occurs, then the f_socket variable member will
249 : * be set to -1. Otherwise it represents the socket that we
250 : * just connected with.
251 : */
252 0 : void connect()
253 : {
254 0 : char const * error_name(nullptr);
255 : try
256 : {
257 : // create a socket using the bio_client class,
258 : // but then just create a duplicate that we will
259 : // use in a server-client TCP object (because
260 : // we cannot directly create the right type of
261 : // object otherwise...)
262 : //
263 0 : f_tcp_connection = std::make_shared<tcp_bio_client>(f_address, f_port, f_mode);
264 0 : return;
265 : }
266 0 : catch(event_dispatcher_initialization_error const & e)
267 : {
268 0 : error_name = "event_dispatcher_initialization_error";
269 0 : f_last_error = e.what();
270 : }
271 0 : catch(event_dispatcher_runtime_error const & e)
272 : {
273 0 : error_name = "event_dispatcher_runtime_error";
274 0 : f_last_error = e.what();
275 : }
276 0 : catch(std::exception const & e)
277 : {
278 0 : error_name = "std::exception";
279 0 : f_last_error = e.what();
280 : }
281 0 : catch(...)
282 : {
283 0 : error_name = "... (any other exception)";
284 0 : f_last_error = "Unknown exception";
285 : }
286 0 : f_tcp_connection.reset();
287 :
288 : // connection failed... we will have to try again later
289 : //
290 : // WARNING: our logger is not multi-thread safe quiet yet
291 : //SNAP_LOG_ERROR
292 : // << "connection to "
293 : // << f_address
294 : // << ":"
295 : // << f_port
296 : // << " failed with: "
297 : // << f_last_error
298 : // << " ("
299 : // << error_name
300 : // << ")"
301 : // << SNAP_LOG_SEND;
302 : }
303 :
304 :
305 : /** \brief Retrieve the address to connect to.
306 : *
307 : * This function returns the address passed in on creation.
308 : *
309 : * \note
310 : * Since the variable is constant, it is likely to never change.
311 : * However, the c_str() function may change the buffer pointer.
312 : * Hence, to be 100% safe, you cannot call this function until
313 : * you make sure that the thread is fully stopped.
314 : */
315 0 : std::string const & get_address() const
316 : {
317 0 : return f_address;
318 : }
319 :
320 :
321 : /** \brief Retrieve the port to connect to.
322 : *
323 : * This function returns the port passed in on creation.
324 : *
325 : * \note
326 : * Since the variable is constant, it never gets changed
327 : * which means it is always safe to use it between
328 : * both threads.
329 : */
330 0 : int get_port() const
331 : {
332 0 : return f_port;
333 : }
334 :
335 :
336 : /** \brief Retrieve the client allocated and connected by the thread.
337 : *
338 : * This function returns the TCP connection object resulting from
339 : * connection attempts of the background thread.
340 : *
341 : * If the pointer is null, then you may get the corresponding
342 : * error message using the get_last_error() function.
343 : *
344 : * You can get the client TCP connection pointer once. After that
345 : * you always get a null pointer.
346 : *
347 : * \note
348 : * This function is guarded so the pointer and the object it
349 : * points to will be valid in another thread that retrieves it.
350 : *
351 : * \return The connection pointer.
352 : */
353 0 : tcp_bio_client::pointer_t release_client()
354 : {
355 0 : cppthread::guard g(f_mutex);
356 0 : tcp_bio_client::pointer_t tcp_connection;
357 0 : tcp_connection.swap(f_tcp_connection);
358 0 : return tcp_connection;
359 : }
360 :
361 :
362 : /** \brief Retrieve the last error message that happened.
363 : *
364 : * This function returns the last error message that was captured
365 : * when trying to connect to the socket. The message is the
366 : * e.what() message from the exception we captured.
367 : *
368 : * The message does not get cleared so the function can be called
369 : * any number of times. To know whether an error was generated
370 : * on the last attempt, make sure to first get the get_socket()
371 : * and if it returns -1, then this message is significant,
372 : * otherwise it is from a previous error.
373 : *
374 : * \warning
375 : * Remember that if the background thread was used the error will
376 : * NOT be available in the main thread until a full memory barrier
377 : * was executed. For that reason we make sure that the thread
378 : * was stopped when we detect an error.
379 : *
380 : * \return The last error message.
381 : */
382 0 : std::string const & get_last_error() const
383 : {
384 0 : return f_last_error;
385 : }
386 :
387 :
388 : /** \brief Close the connection.
389 : *
390 : * This function closes the connection. Since the f_tcp_connection
391 : * holds the socket to the remote server, we have get this function
392 : * called in order to completely disconnect.
393 : *
394 : * \note
395 : * This function does not clear the f_last_error parameter so it
396 : * can be read later.
397 : */
398 0 : void close()
399 : {
400 0 : f_tcp_connection.reset();
401 0 : }
402 :
403 :
404 : private:
405 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
406 : std::string const f_address;
407 : int const f_port;
408 : tcp_bio_client::mode_t const f_mode;
409 : tcp_bio_client::pointer_t f_tcp_connection = tcp_bio_client::pointer_t();
410 : std::string f_last_error = std::string();
411 : };
412 :
413 :
414 : /** \brief Initialize a permanent message connection implementation object.
415 : *
416 : * This object manages the thread used to asynchronically connect to
417 : * the specified address and port.
418 : *
419 : * This class and its sub-classes may end up executing callbacks
420 : * of the tcp_client_permanent_message_connection object.
421 : * However, in all cases these are never run from the thread.
422 : *
423 : * \param[in] client A pointer to the owner of this
424 : * tcp_client_permanent_message_connection_impl object.
425 : * \param[in] address The address we are to connect to.
426 : * \param[in] port The port we are to connect to.
427 : * \param[in] mode The mode used to connect.
428 : */
429 0 : tcp_client_permanent_message_connection_impl(
430 : tcp_client_permanent_message_connection * parent
431 : , std::string const & address
432 : , int port
433 : , tcp_bio_client::mode_t mode)
434 0 : : f_parent(parent)
435 : , f_thread_runner(this, address, port, mode)
436 0 : , f_thread("background connection handler thread", &f_thread_runner)
437 : {
438 0 : }
439 :
440 :
441 : tcp_client_permanent_message_connection_impl(tcp_client_permanent_message_connection_impl const & rhs) = delete;
442 : tcp_client_permanent_message_connection_impl & operator = (tcp_client_permanent_message_connection_impl const & rhs) = delete;
443 :
444 : /** \brief Destroy the permanent message connection.
445 : *
446 : * This function makes sure that the messenger was lost.
447 : */
448 0 : ~tcp_client_permanent_message_connection_impl()
449 0 : {
450 : // to make sure we can lose the messenger, first we want to be sure
451 : // that we do not have a thread running
452 : //
453 : try
454 : {
455 0 : f_thread.stop();
456 : }
457 0 : catch(cppthread::cppthread_mutex_failed_error const &)
458 : {
459 : }
460 0 : catch(cppthread::cppthread_invalid_error const &)
461 : {
462 : }
463 :
464 : // in this case we may still have an instance of the f_thread_done
465 : // which linger around, we want it out
466 : //
467 : // Note: the call is safe even if the f_thread_done is null
468 : //
469 0 : communicator::instance()->remove_connection(f_thread_done);
470 :
471 : // although the f_messenger variable gets reset automatically in
472 : // the destructor, it would not get removed from the
473 : // communicator instance if we were not doing it explicitly
474 : //
475 0 : disconnect();
476 0 : }
477 :
478 :
479 : /** \brief Direct connect to the messenger.
480 : *
481 : * In this case we try to connect without the thread. This allows
482 : * us to avoid the thread problems, but we are blocked until the
483 : * OS decides to time out or the connection worked.
484 : */
485 0 : void connect()
486 : {
487 0 : if(f_done)
488 : {
489 0 : SNAP_LOG_ERROR
490 0 : << "Permanent connection marked done. Cannot attempt to reconnect."
491 : << SNAP_LOG_SEND;
492 0 : return;
493 : }
494 :
495 : // call the thread connect() function from the main thread
496 : //
497 0 : f_thread_runner.connect();
498 :
499 : // simulate receiving the thread_done() signal
500 : //
501 0 : thread_done();
502 : }
503 :
504 :
505 : /** \brief Check whether the permanent connection is currently connected.
506 : *
507 : * This function returns true if the messenger exists, which means that
508 : * the connection is up.
509 : *
510 : * \return true if the connection is up.
511 : */
512 0 : bool is_connected()
513 : {
514 0 : return f_messenger != nullptr;
515 : }
516 :
517 :
518 : /** \brief Try to start the thread runner.
519 : *
520 : * This function tries to start the thread runner in order to initiate
521 : * a connection in the background. If the thread could not be started,
522 : * then the function returns false.
523 : *
524 : * If the thread started, then the function returns true. This does
525 : * not mean that the connection was obtained. This is known once
526 : * the process_connected() function is called.
527 : *
528 : * \return true if the thread was successfully started.
529 : */
530 0 : bool background_connect()
531 : {
532 0 : if(f_done)
533 : {
534 0 : SNAP_LOG_ERROR
535 0 : << "Permanent connection marked done. Cannot attempt to reconnect."
536 : << SNAP_LOG_SEND;
537 0 : return false;
538 : }
539 :
540 0 : if(f_thread.is_running())
541 : {
542 0 : SNAP_LOG_ERROR
543 0 : << "A background connection attempt is already in progress. Further requests are ignored."
544 : << SNAP_LOG_SEND;
545 0 : return false;
546 : }
547 :
548 : // create the f_thread_done only when required
549 : //
550 0 : if(f_thread_done == nullptr)
551 : {
552 0 : f_thread_done = std::make_shared<thread_signal_handler>(this);
553 : }
554 :
555 0 : communicator::instance()->add_connection(f_thread_done);
556 :
557 0 : if(!f_thread.start())
558 : {
559 0 : SNAP_LOG_ERROR
560 0 : << "The thread used to run the background connection process did not start."
561 : << SNAP_LOG_SEND;
562 0 : return false;
563 : }
564 :
565 0 : return true;
566 : }
567 :
568 :
569 : /** \brief Tell the main thread that the background thread is done.
570 : *
571 : * This function is called by the thread so the thread_done()
572 : * function of the thread done object gets called. Only the
573 : * thread should call this function.
574 : *
575 : * As a result the thread_done() function of this class will be
576 : * called by the main thread.
577 : */
578 0 : void trigger_thread_done()
579 : {
580 0 : f_thread_done->thread_done();
581 0 : }
582 :
583 :
584 : /** \brief Signal that the background thread is done.
585 : *
586 : * This callback is called whenever the background thread sends
587 : * a signal to us. This is used to avoid calling end user functions
588 : * that would certainly cause a lot of problem if called from the
589 : * thread.
590 : *
591 : * The function calls the process_connection_failed() if the
592 : * connection did not happen.
593 : *
594 : * The function calls the process_connected() if the connection
595 : * did happen.
596 : *
597 : * \note
598 : * This is used only if the user requested that the connection
599 : * happen in the background (i.e. use_thread was set to true
600 : * in the tcp_client_permanent_message_connection object
601 : * constructor.)
602 : */
603 0 : void thread_done()
604 : {
605 : // if we used the thread we have to remove the signal used
606 : // to know that the thread was done
607 : //
608 0 : communicator::instance()->remove_connection(f_thread_done);
609 :
610 : // we will access the f_last_error member of the thread runner
611 : // which may not be available to the main thread yet, calling
612 : // stop forces a memory barrier so we are all good.
613 : //
614 : // calling stop() has no effect if we did not use the thread,
615 : // however, not calling stop() when we did use the thread
616 : // causes all sorts of other problems (especially, the thread
617 : // never gets joined)
618 : //
619 0 : f_thread.stop();
620 :
621 0 : tcp_bio_client::pointer_t client(f_thread_runner.release_client());
622 0 : if(f_done)
623 : {
624 : // already marked done, ignore the result and lose the
625 : // connection immediately
626 : //
627 : //f_thread_running.close(); -- not necessary, 'client' is the connection
628 0 : return;
629 : }
630 :
631 0 : if(client == nullptr)
632 : {
633 : // TODO: fix address in error message using a snap::addr so
634 : // as to handle IPv6 seamlessly.
635 : //
636 0 : SNAP_LOG_ERROR
637 0 : << "connection to "
638 0 : << f_thread_runner.get_address()
639 0 : << ":"
640 : << f_thread_runner.get_port()
641 : << " failed with: "
642 0 : << f_thread_runner.get_last_error()
643 : << SNAP_LOG_SEND;
644 :
645 : // signal that an error occurred
646 : //
647 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
648 : }
649 : else
650 : {
651 0 : f_messenger = std::make_shared<messenger>(f_parent, client);
652 :
653 : // add the messenger to the communicator
654 : //
655 0 : communicator::instance()->add_connection(f_messenger);
656 :
657 : // if some messages were cached, process them immediately
658 : //
659 0 : while(!f_message_cache.empty())
660 : {
661 0 : f_messenger->send_message(f_message_cache[0]);
662 0 : f_message_cache.erase(f_message_cache.begin());
663 : }
664 :
665 : // let the client know we are now connected
666 : //
667 0 : f_parent->process_connected();
668 : }
669 : }
670 :
671 : /** \brief Send a message to the connection.
672 : *
673 : * This implementation function actually sends the message to the
674 : * connection, assuming that the connection exists. Otherwise, it
675 : * may cache the message (if cache is true.)
676 : *
677 : * Note that the message does not get cached if mark_done() was
678 : * called earlier since we are trying to close the whole connection.
679 : *
680 : * \param[in] message The message to send.
681 : * \param[in] cache Whether to cache the message if the connection is
682 : * currently down.
683 : *
684 : * \return true if the message was forwarded, false if the message
685 : * was ignored or cached.
686 : */
687 0 : bool send_message(message const & msg, bool cache)
688 : {
689 0 : if(f_messenger != nullptr)
690 : {
691 0 : return f_messenger->send_message(msg);
692 : }
693 :
694 0 : if(cache && !f_done)
695 : {
696 0 : f_message_cache.push_back(msg);
697 : }
698 :
699 0 : return false;
700 : }
701 :
702 :
703 : /** \brief Forget about the messenger connection.
704 : *
705 : * This function is used to fully disconnect from the messenger.
706 : *
707 : * If there is a messenger, this means:
708 : *
709 : * \li Removing the messenger from the communicator instance.
710 : * \li Closing the connection in the thread object.
711 : *
712 : * In most cases, it is called when an error occur, also it happens
713 : * that we call it explicitly through the disconnect() function
714 : * of the permanent connection class.
715 : *
716 : * \note
717 : * This is safe, even though it is called from the messenger itself
718 : * because it will not get deleted yet. This is because the run()
719 : * loop has a copy in its own temporary copy of the vector of
720 : * connections.
721 : */
722 0 : void disconnect()
723 : {
724 0 : if(f_messenger != nullptr)
725 : {
726 0 : communicator::instance()->remove_connection(f_messenger);
727 0 : f_messenger.reset();
728 :
729 : // just the messenger does not close the TCP connection because
730 : // we may have another in the thread runner
731 : //
732 0 : f_thread_runner.close();
733 : }
734 0 : }
735 :
736 :
737 : /** \brief Return the address and size of the remote computer.
738 : *
739 : * This function retrieve the socket address.
740 : *
741 : * \param[out] address The binary address of the remote computer.
742 : *
743 : * \return The size of the sockaddr structure, 0 if no address is available.
744 : */
745 0 : size_t get_client_address(sockaddr_storage & address) const
746 : {
747 0 : if(f_messenger != nullptr)
748 : {
749 0 : return f_messenger->get_client_address(address);
750 : }
751 0 : memset(&address, 0, sizeof(address));
752 0 : return 0;
753 : }
754 :
755 :
756 : /** \brief Return the address of the f_message object.
757 : *
758 : * This function returns the address of the message object.
759 : *
760 : * \return The address of the remote computer.
761 : */
762 0 : std::string get_client_addr() const
763 : {
764 0 : if(f_messenger != nullptr)
765 : {
766 0 : return f_messenger->get_client_addr();
767 : }
768 0 : return std::string();
769 : }
770 :
771 :
772 : /** \brief Mark the messenger as done.
773 : *
774 : * This function is used to mark the messenger as done. This means it
775 : * will get removed from the communicator instance as soon as it
776 : * is done with its current write buffer if there is one.
777 : *
778 : * You may also want to call the disconnection() function to actually
779 : * reset the pointer along the way.
780 : */
781 0 : void mark_done()
782 : {
783 0 : f_done = true;
784 :
785 : // once done we don't attempt to reconnect so we can as well
786 : // get rid of our existing cache immediately to save some
787 : // memory
788 : //
789 0 : f_message_cache.clear();
790 :
791 0 : if(f_messenger != nullptr)
792 : {
793 0 : f_messenger->mark_done();
794 : }
795 0 : }
796 :
797 :
798 : private:
799 : tcp_client_permanent_message_connection * f_parent = nullptr;
800 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
801 : runner f_thread_runner;
802 : cppthread::thread f_thread;
803 : messenger::pointer_t f_messenger = messenger::pointer_t();
804 : message::vector_t f_message_cache = message::vector_t();
805 : bool f_done = false;
806 : };
807 :
808 :
809 :
810 : }
811 : // namespace detail
812 :
813 :
814 :
815 : /** \brief Initializes this TCP client message connection.
816 : *
817 : * This implementation creates what we call a permanent connection.
818 : * Such a connection may fail once in a while. In such circumstances,
819 : * the class automatically requests for a reconnection (see various
820 : * parameters in the regard below.) However, this causes one issue:
821 : * by default, the connection just never ends. When you are about
822 : * ready to close the connection, you must call the mark_done()
823 : * function first. This will tell the various error functions to
824 : * drop this connection instead of restarting it after a small pause.
825 : *
826 : * This constructor makes sure to initialize the timer and saves
827 : * the address, port, mode, pause, and use_thread parameters.
828 : *
829 : * The timer is first set to trigger immediately. This means the TCP
830 : * connection will be attempted as soon as possible (the next time
831 : * the run() loop is entered, it will time out immediately.) You
832 : * are free to call set_timeout_date() with a date in the future if
833 : * you prefer that the connect be attempted a little later.
834 : *
835 : * The \p pause parameter is used if the connection is lost and this
836 : * timer is used again to attempt a new connection. It will be reused
837 : * as long as the connection fails (as a delay). It has to be at least
838 : * 10 microseconds, although really you should not use less than 1
839 : * second (1000000). You may set the pause parameter to 0 in which case
840 : * you are responsible to set the delay (by default there will be no
841 : * delay and thus the timer will never time out.)
842 : *
843 : * To start with a delay, instead of trying to connect immediately,
844 : * you may pass a negative pause parameter. So for example to get the
845 : * first attempt 5 seconds after you created this object, you use
846 : * -5000000LL as the pause parameter.
847 : *
848 : * The \p use_thread parameter determines whether the connection should
849 : * be attempted in a thread (asynchronously) or immediately (which means
850 : * the timeout callback may block for a while.) If the connection is to
851 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
852 : * the thread is probably not required. For connections to a remote
853 : * computer, though, it certainly is important.
854 : *
855 : * \param[in] address The address to listen on. It may be set to "0.0.0.0".
856 : * \param[in] port The port to listen on.
857 : * \param[in] mode The mode to use to open the connection.
858 : * \param[in] pause The amount of time to wait before attempting a new
859 : * connection after a failure, in microseconds, or 0.
860 : * \param[in] use_thread Whether a thread is used to connect to the
861 : * server.
862 : */
863 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
864 : std::string const & address
865 : , int port
866 : , tcp_bio_client::mode_t mode
867 : , std::int64_t const pause
868 0 : , bool const use_thread)
869 : : timer(pause < 0 ? -pause : 0)
870 : , f_impl(std::make_shared<detail::tcp_client_permanent_message_connection_impl>(this, address, port, mode))
871 0 : , f_pause(llabs(pause))
872 0 : , f_use_thread(use_thread)
873 : {
874 0 : }
875 :
876 :
877 : /** \brief Destroy instance.
878 : *
879 : * This function cleans up everything in the permanent message object.
880 : */
881 0 : tcp_client_permanent_message_connection::~tcp_client_permanent_message_connection()
882 : {
883 : // Does nothing
884 0 : }
885 :
886 :
887 : /** \brief Attempt to send a message to this connection.
888 : *
889 : * If the connection is currently enabled, the message is sent immediately.
890 : * Otherwise, it may be cached if the \p cache parameter is set to true.
891 : * A cached message is forwarded as soon as a new successful connection
892 : * happens, which can be a problem if messages need to happen in a very
893 : * specific order (For example, after a reconnection to snapcommunicator
894 : * you first need to REGISTER or CONNECT...)
895 : *
896 : * \param[in] message The message to send to the connected server.
897 : * \param[in] cache Whether the message should be cached.
898 : *
899 : * \return true if the message was sent, false if it was not sent, although
900 : * if cache was true, it was cached
901 : */
902 0 : bool tcp_client_permanent_message_connection::send_message(message const & msg, bool cache)
903 : {
904 0 : return f_impl->send_message(msg, cache);
905 : }
906 :
907 :
908 : /** \brief Check whether the connection is up.
909 : *
910 : * This function returns true if the connection is considered to be up.
911 : * This means sending messages will work quickly instead of being
912 : * cached up until an actual TCP/IP connection gets established.
913 : *
914 : * Note that the connection may have hanged up since, and the system
915 : * may not have yet detected the fact (i.e. the connection is going
916 : * to receive the process_hup() call after the event in which you are
917 : * working.)
918 : *
919 : * \return true if connected
920 : */
921 0 : bool tcp_client_permanent_message_connection::is_connected() const
922 : {
923 0 : return f_impl->is_connected();
924 : }
925 :
926 :
927 : /** \brief Disconnect the messenger now.
928 : *
929 : * This function kills the current connection.
930 : *
931 : * There are a few cases where two daemons communicate between each others
932 : * and at some point one of them wants to exit and needs to disconnect. This
933 : * function can be used in that one situation assuming that you have an
934 : * acknowledgement from the other daemon.
935 : *
936 : * Say you have daemon A and B. B wants to quit and before doing so sends
937 : * a form of "I'm quitting" message to A. In that situation, B is not closing
938 : * the messenger connection, A is responsible for that (i.e. A acknowledges
939 : * receipt of the "I'm quitting" message from B by closing the connection.)
940 : *
941 : * B also wants to call the mark_done() function to make sure that it
942 : * does not reconnected a split second later and instead the permanent
943 : * connection gets removed from the communicator list of connections.
944 : */
945 0 : void tcp_client_permanent_message_connection::disconnect()
946 : {
947 0 : f_impl->disconnect();
948 0 : }
949 :
950 :
951 : /** \brief Overload so we do not have to use namespace everywhere.
952 : *
953 : * This function overloads the connection::mark_done() function so
954 : * we can call it without the need to use timer::mark_done()
955 : * everywhere.
956 : */
957 0 : void tcp_client_permanent_message_connection::mark_done()
958 : {
959 0 : timer::mark_done();
960 0 : }
961 :
962 :
963 : /** \brief Mark connection as done.
964 : *
965 : * This function allows you to mark the permanent connection and the
966 : * messenger as done.
967 : *
968 : * Note that calling this function with false is the same as calling the
969 : * base class mark_done() function.
970 : *
971 : * If the \p message parameter is set to true, we suggest you also call
972 : * the disconnect() function. That way the messenger will truly get
973 : * removed from everyone quickly.
974 : *
975 : * \param[in] messenger If true, also mark the messenger as done.
976 : */
977 0 : void tcp_client_permanent_message_connection::mark_done(bool messenger)
978 : {
979 0 : timer::mark_done();
980 0 : if(messenger)
981 : {
982 0 : f_impl->mark_done();
983 : }
984 0 : }
985 :
986 :
987 : /** \brief Retrieve a copy of the client's address.
988 : *
989 : * This function makes a copy of the address of this client connection
990 : * to the \p address parameter and returns the length.
991 : *
992 : * \param[in] address The reference to an address variable where the
993 : * address gets copied.
994 : *
995 : * \return Return the length of the address which may be smaller than
996 : * sizeof(struct sockaddr). If zero, then no address is defined.
997 : *
998 : * \sa get_addr()
999 : */
1000 0 : size_t tcp_client_permanent_message_connection::get_client_address(sockaddr_storage & address) const
1001 : {
1002 0 : return f_impl->get_client_address(address);
1003 : }
1004 :
1005 :
1006 : /** \brief Retrieve the remote computer address as a string.
1007 : *
1008 : * This function returns the address of the remote computer as a string.
1009 : * It will be a canonicalized IP address.
1010 : *
1011 : * \return The canonicalized IP address.
1012 : */
1013 0 : std::string tcp_client_permanent_message_connection::get_client_addr() const
1014 : {
1015 0 : return f_impl->get_client_addr();
1016 : }
1017 :
1018 :
1019 : /** \brief Internal timeout callback implementation.
1020 : *
1021 : * This callback implements the guts of this class: it attempts to connect
1022 : * to the specified address and port, optionally after creating a thread
1023 : * so the attempt can happen asynchronously.
1024 : *
1025 : * When the connection fails, the timer is used to try again pause
1026 : * microseconds later (pause as specified in the constructor).
1027 : *
1028 : * When a connection succeeds, the timer is disabled until you detect
1029 : * an error while using the connection and re-enable the timer.
1030 : *
1031 : * \warning
1032 : * This function changes the timeout delay to the pause amount
1033 : * as defined with the constructor. If you want to change that
1034 : * amount, you can do so an any point after this function call
1035 : * using the set_timeout_delay() function. If the pause parameter
1036 : * was set to -1, then the timeout never gets changed.
1037 : * However, you should not use a permanent message timer as your
1038 : * own or you will interfere with the internal use of the timer.
1039 : */
1040 0 : void tcp_client_permanent_message_connection::process_timeout()
1041 : {
1042 : // got a spurious call when already marked done
1043 : //
1044 0 : if(is_done())
1045 : {
1046 0 : return;
1047 : }
1048 :
1049 : // change the timeout delay although we will not use it immediately
1050 : // if we start the thread or attempt an immediate connection, but
1051 : // that way the user can change it by calling set_timeout_delay()
1052 : // at any time after the first process_timeout() call
1053 : //
1054 0 : if(f_pause > 0)
1055 : {
1056 0 : set_timeout_delay(f_pause);
1057 0 : f_pause = 0;
1058 : }
1059 :
1060 0 : if(f_use_thread)
1061 : {
1062 : // in this case we create a thread, run it and know whether the
1063 : // connection succeeded only when the thread tells us it did
1064 : //
1065 : // TODO: the background_connect() may return false in two situations:
1066 : // 1) when the thread is already running and then the behavior
1067 : // we have below is INCORRECT
1068 : // 2) when the thread cannot be started (i.e. could not
1069 : // allocate the stack?) in which case the if() below
1070 : // is the correct behavior
1071 : //
1072 0 : if(f_impl->background_connect())
1073 : {
1074 : // we started the thread successfully, so block the timer
1075 : //
1076 0 : set_enable(false);
1077 : }
1078 : }
1079 : else
1080 : {
1081 : // the success is noted when we receive a call to
1082 : // process_connected(); there we do set_enable(false)
1083 : // so the timer stops
1084 : //
1085 0 : f_impl->connect();
1086 : }
1087 : }
1088 :
1089 :
1090 : /** \brief Process an error.
1091 : *
1092 : * When an error occurs, we restart the timer so we can attempt to reconnect
1093 : * to that server.
1094 : *
1095 : * If you overload this function, make sure to either call this
1096 : * implementation or enable the timer yourselves.
1097 : *
1098 : * \warning
1099 : * This function does not call the timer::process_error() function
1100 : * which means that this connection is not automatically removed from
1101 : * the communicator object on failures.
1102 : */
1103 0 : void tcp_client_permanent_message_connection::process_error()
1104 : {
1105 0 : if(is_done())
1106 : {
1107 0 : timer::process_error();
1108 : }
1109 : else
1110 : {
1111 0 : f_impl->disconnect();
1112 0 : set_enable(true);
1113 : }
1114 0 : }
1115 :
1116 :
1117 : /** \brief Process a hang up.
1118 : *
1119 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1120 : * to that server.
1121 : *
1122 : * If you overload this function, make sure to either call this
1123 : * implementation or enable the timer yourselves.
1124 : *
1125 : * \warning
1126 : * This function does not call the timer::process_hup() function
1127 : * which means that this connection is not automatically removed from
1128 : * the communicator object on failures.
1129 : */
1130 0 : void tcp_client_permanent_message_connection::process_hup()
1131 : {
1132 0 : if(is_done())
1133 : {
1134 0 : timer::process_hup();
1135 : }
1136 : else
1137 : {
1138 0 : f_impl->disconnect();
1139 0 : set_enable(true);
1140 : }
1141 0 : }
1142 :
1143 :
1144 : /** \brief Process an invalid signal.
1145 : *
1146 : * When an invalid signal occurs, we restart the timer so we can attempt
1147 : * to reconnect to that server.
1148 : *
1149 : * If you overload this function, make sure to either call this
1150 : * implementation or enable the timer yourselves.
1151 : *
1152 : * \warning
1153 : * This function does not call the timer::process_invalid() function
1154 : * which means that this connection is not automatically removed from
1155 : * the communicator object on failures.
1156 : */
1157 0 : void tcp_client_permanent_message_connection::process_invalid()
1158 : {
1159 0 : if(is_done())
1160 : {
1161 0 : timer::process_invalid();
1162 : }
1163 : else
1164 : {
1165 0 : f_impl->disconnect();
1166 0 : set_enable(true);
1167 : }
1168 0 : }
1169 :
1170 :
1171 : /** \brief Make sure that the messenger connection gets removed.
1172 : *
1173 : * This function makes sure that the messenger sub-connection also gets
1174 : * removed from the communicator. Otherwise it would lock the system
1175 : * since connections are saved in the communicator object as shared
1176 : * pointers.
1177 : */
1178 0 : void tcp_client_permanent_message_connection::connection_removed()
1179 : {
1180 0 : f_impl->disconnect();
1181 0 : }
1182 :
1183 :
1184 : /** \brief Process a connection failed callback.
1185 : *
1186 : * When a connection attempt fails, we restart the timer so we can
1187 : * attempt to reconnect to that server.
1188 : *
1189 : * If you overload this function, make sure to either call this
1190 : * implementation or enable the timer yourselves.
1191 : *
1192 : * \param[in] error_message The error message that triggered this callback.
1193 : */
1194 0 : void tcp_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1195 : {
1196 0 : snap::NOT_USED(error_message);
1197 0 : set_enable(true);
1198 0 : }
1199 :
1200 :
1201 : /** \brief The connection is ready.
1202 : *
1203 : * This callback gets called whenever the connection succeeded and is
1204 : * ready to be used.
1205 : *
1206 : * You should implement this virtual function if you have to initiate
1207 : * the communication. For example, the snapserver has to send a
1208 : * REGISTER to the snapcommunicator system and thus implements this
1209 : * function.
1210 : *
1211 : * The default implementation makes sure that the timer gets turned off
1212 : * so we do not try to reconnect every minute or so.
1213 : */
1214 0 : void tcp_client_permanent_message_connection::process_connected()
1215 : {
1216 0 : set_enable(false);
1217 0 : }
1218 :
1219 :
1220 :
1221 6 : } // namespace ed
1222 : // vim: ts=4 sw=4 et
|