Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/tcp_client_permanent_message_connection.h"
46 :
47 : #include "eventdispatcher/communicator.h"
48 : #include "eventdispatcher/exception.h"
49 : #include "eventdispatcher/tcp_server_client_message_connection.h"
50 : #include "eventdispatcher/thread_done_signal.h"
51 :
52 :
53 : // snaplogger lib
54 : //
55 : #include <snaplogger/message.h>
56 :
57 :
58 : // snapdev lib
59 : //
60 : #include <snapdev/not_used.h>
61 :
62 :
63 : // cppthread lib
64 : //
65 : #include <cppthread/exception.h>
66 : #include <cppthread/guard.h>
67 : #include <cppthread/runner.h>
68 : #include <cppthread/thread.h>
69 :
70 :
71 : // C lib
72 : //
73 : #include <sys/socket.h>
74 :
75 :
76 : // last include
77 : //
78 : #include <snapdev/poison.h>
79 :
80 :
81 :
82 : namespace ed
83 : {
84 :
85 :
86 :
87 : namespace detail
88 : {
89 :
90 :
91 : /** \brief Internal implementation of the snap_tcp_client_permanent_message_connection class.
92 : *
93 : * This class is used to handle a thread that will process a connection for
94 : * us. This allows us to connect in any amount of time required by the
95 : * Unix system to obtain the connection with the remote server.
96 : *
97 : * \todo
98 : * Having threads at the time we do a fork() is not safe. We may
99 : * want to reconsider offering this functionality here. Because at
100 : * this time we would have no control of when the thread is created
101 : * and thus a way to make sure that no such thread is running when
102 : * we call fork().
103 : */
104 : class tcp_client_permanent_message_connection_impl
105 : {
106 : public:
107 0 : class messenger
108 : : public tcp_server_client_message_connection
109 : {
110 : public:
111 : typedef std::shared_ptr<messenger> pointer_t;
112 :
113 0 : messenger(tcp_client_permanent_message_connection * parent, tcp_bio_client::pointer_t client)
114 : : tcp_server_client_message_connection(client)
115 0 : , f_parent(parent)
116 : {
117 0 : set_name("tcp_client_permanent_message_connection_impl::messenger");
118 0 : }
119 :
120 : messenger(messenger const & rhs) = delete;
121 : messenger & operator = (messenger const & rhs) = delete;
122 :
123 : // snap_connection implementation
124 0 : virtual void process_empty_buffer()
125 : {
126 0 : tcp_server_client_message_connection::process_empty_buffer();
127 0 : f_parent->process_empty_buffer();
128 0 : }
129 :
130 : // snap_connection implementation
131 0 : virtual void process_error()
132 : {
133 0 : tcp_server_client_message_connection::process_error();
134 0 : f_parent->process_error();
135 0 : }
136 :
137 : // snap_connection implementation
138 0 : virtual void process_hup()
139 : {
140 0 : tcp_server_client_message_connection::process_hup();
141 0 : f_parent->process_hup();
142 0 : }
143 :
144 : // snap_connection implementation
145 0 : virtual void process_invalid()
146 : {
147 0 : tcp_server_client_message_connection::process_invalid();
148 0 : f_parent->process_invalid();
149 0 : }
150 :
151 : // snap_tcp_server_client_message_connection implementation
152 0 : virtual void process_message(message const & msg)
153 : {
154 : // We call the dispatcher from our parent since the child
155 : // (this messenger) is not given a dispatcher
156 : //
157 0 : message copy(msg);
158 0 : f_parent->dispatch_message(copy);
159 0 : }
160 :
161 : private:
162 : tcp_client_permanent_message_connection * f_parent = nullptr;
163 : };
164 :
165 0 : class thread_signal_handler
166 : : public thread_done_signal
167 : {
168 : public:
169 : typedef std::shared_ptr<thread_signal_handler> pointer_t;
170 :
171 0 : thread_signal_handler(tcp_client_permanent_message_connection_impl * parent_impl)
172 0 : : f_parent_impl(parent_impl)
173 : {
174 0 : set_name("tcp_client_permanent_message_connection_impl::thread_signal_handler");
175 0 : }
176 :
177 : thread_signal_handler(thread_signal_handler const & rhs) = delete;
178 : thread_signal_handler & operator = (thread_signal_handler const & rhs) = delete;
179 :
180 : /** \brief This signal was emitted.
181 : *
182 : * This function gets called whenever the thread is just about to
183 : * quit. Calling f_thread.is_running() may still return true when
184 : * you get in the 'thread_done()' callback. However, an
185 : * f_thread.stop() will return very quickly.
186 : */
187 0 : virtual void process_read()
188 : {
189 0 : thread_signal_handler::process_read();
190 :
191 0 : f_parent_impl->thread_done();
192 0 : }
193 :
194 : private:
195 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
196 : };
197 :
198 0 : class runner
199 : : public cppthread::runner
200 : {
201 : public:
202 0 : runner(
203 : tcp_client_permanent_message_connection_impl * parent_impl
204 : , std::string const & address
205 : , int port
206 : , tcp_bio_client::mode_t mode)
207 : : cppthread::runner("background tcp_client_permanent_message_connection for asynchroneous connections")
208 : , f_parent_impl(parent_impl)
209 : , f_address(address)
210 : , f_port(port)
211 0 : , f_mode(mode)
212 : {
213 0 : }
214 :
215 : runner(runner const & rhs) = delete;
216 : runner & operator = (runner const & rhs) = delete;
217 :
218 :
219 : /** \brief This is the actual function run by the thread.
220 : *
221 : * This function calls the connect() function and then
222 : * tells the main thread we are done.
223 : */
224 0 : virtual void run()
225 : {
226 0 : connect();
227 :
228 : // tell the main thread that we are done
229 : //
230 0 : f_parent_impl->trigger_thread_done();
231 0 : }
232 :
233 :
234 : /** \brief This function attempts to connect.
235 : *
236 : * This function attempts a connection to the specified address
237 : * and port with the specified mode (i.e. plain or encrypted.)
238 : *
239 : * The function may take a long time to succeed connecting with
240 : * the server. The main thread will be awaken whenever this
241 : * thread dies.
242 : *
243 : * If an error occurs, then the f_socket variable member will
244 : * be set to -1. Otherwise it represents the socket that we
245 : * just connected with.
246 : */
247 0 : void connect()
248 : {
249 : try
250 : {
251 : // create a socket using the bio_client class,
252 : // but then just create a duplicate that we will
253 : // use in a server-client TCP object (because
254 : // we cannot directly create the right type of
255 : // object otherwise...)
256 : //
257 0 : f_tcp_connection.reset(new tcp_bio_client(f_address, f_port, f_mode));
258 : }
259 0 : catch(event_dispatcher_runtime_error const & e)
260 : {
261 : // connection failed... we will have to try again later
262 : //
263 : // WARNING: our logger is not multi-thread safe
264 : //SNAP_LOG_ERROR
265 : // << "connection to "
266 : // << f_address
267 : // << ":"
268 : // << f_port
269 : // << " failed with: "
270 : // << e.what();
271 0 : f_last_error = e.what();
272 0 : f_tcp_connection.reset();
273 : }
274 0 : }
275 :
276 :
277 : /** \brief Retrieve the address to connect to.
278 : *
279 : * This function returns the address passed in on creation.
280 : *
281 : * \note
282 : * Since the variable is constant, it is likely to never change.
283 : * However, the c_str() function may change the buffer pointer.
284 : * Hence, to be 100% safe, you cannot call this function until
285 : * you make sure that the thread is fully stopped.
286 : */
287 0 : std::string const & get_address() const
288 : {
289 0 : return f_address;
290 : }
291 :
292 :
293 : /** \brief Retrieve the port to connect to.
294 : *
295 : * This function returns the port passed in on creation.
296 : *
297 : * \note
298 : * Since the variable is constant, it never gets changed
299 : * which means it is always safe to use it between
300 : * both threads.
301 : */
302 0 : int get_port() const
303 : {
304 0 : return f_port;
305 : }
306 :
307 :
308 : /** \brief Retrieve the client allocated and connected by the thread.
309 : *
310 : * This function returns the TCP connection object resulting from
311 : * connection attempts of the background thread.
312 : *
313 : * If the pointer is null, then you may get the corresponding
314 : * error message using the get_last_error() function.
315 : *
316 : * You can get the client TCP connection pointer once. After that
317 : * you always get a null pointer.
318 : *
319 : * \note
320 : * This function is guarded so the pointer and the object it
321 : * points to will be valid in another thread that retrieves it.
322 : *
323 : * \return The connection pointer.
324 : */
325 0 : tcp_bio_client::pointer_t release_client()
326 : {
327 0 : cppthread::guard g(f_mutex);
328 0 : tcp_bio_client::pointer_t tcp_connection;
329 0 : tcp_connection.swap(f_tcp_connection);
330 0 : return tcp_connection;
331 : }
332 :
333 :
334 : /** \brief Retrieve the last error message that happened.
335 : *
336 : * This function returns the last error message that was captured
337 : * when trying to connect to the socket. The message is the
338 : * e.what() message from the exception we captured.
339 : *
340 : * The message does not get cleared so the function can be called
341 : * any number of times. To know whether an error was generated
342 : * on the last attempt, make sure to first get the get_socket()
343 : * and if it returns -1, then this message is significant,
344 : * otherwise it is from a previous error.
345 : *
346 : * \warning
347 : * Remember that if the background thread was used the error will
348 : * NOT be available in the main thread until a full memory barrier
349 : * was executed. For that reason we make sure that the thread
350 : * was stopped when we detect an error.
351 : *
352 : * \return The last error message.
353 : */
354 0 : std::string const & get_last_error() const
355 : {
356 0 : return f_last_error;
357 : }
358 :
359 :
360 : /** \brief Close the connection.
361 : *
362 : * This function closes the connection. Since the f_tcp_connection
363 : * holds the socket to the remote server, we have get this function
364 : * called in order to completely disconnect.
365 : *
366 : * \note
367 : * This function does not clear the f_last_error parameter so it
368 : * can be read later.
369 : */
370 0 : void close()
371 : {
372 0 : f_tcp_connection.reset();
373 0 : }
374 :
375 :
376 : private:
377 : tcp_client_permanent_message_connection_impl * f_parent_impl = nullptr;
378 : std::string const f_address;
379 : int const f_port;
380 : tcp_bio_client::mode_t const f_mode;
381 : tcp_bio_client::pointer_t f_tcp_connection = tcp_bio_client::pointer_t();
382 : std::string f_last_error = std::string();
383 : };
384 :
385 :
386 : /** \brief Initialize a permanent message connection implementation object.
387 : *
388 : * This object manages the thread used to asynchronically connect to
389 : * the specified address and port.
390 : *
391 : * This class and its sub-classes may end up executing callbacks
392 : * of the snap_tcp_client_permanent_message_connection object.
393 : * However, in all cases these are never run from the thread.
394 : *
395 : * \param[in] client A pointer to the owner of this
396 : * snap_tcp_client_permanent_message_connection_impl object.
397 : * \param[in] address The address we are to connect to.
398 : * \param[in] port The port we are to connect to.
399 : * \param[in] mode The mode used to connect.
400 : */
401 0 : tcp_client_permanent_message_connection_impl(
402 : tcp_client_permanent_message_connection * parent
403 : , std::string const & address
404 : , int port
405 : , tcp_bio_client::mode_t mode)
406 : : f_parent(parent)
407 : //, f_thread_done() -- auto-init
408 : , f_thread_runner(this, address, port, mode)
409 0 : , f_thread("background connection handler thread", &f_thread_runner)
410 : //, f_messenger(nullptr) -- auto-init
411 : //, f_message_cache() -- auto-init
412 : {
413 0 : }
414 :
415 :
416 : tcp_client_permanent_message_connection_impl(tcp_client_permanent_message_connection_impl const & rhs) = delete;
417 : tcp_client_permanent_message_connection_impl & operator = (tcp_client_permanent_message_connection_impl const & rhs) = delete;
418 :
419 : /** \brief Destroy the permanent message connection.
420 : *
421 : * This function makes sure that the messenger was lost.
422 : */
423 0 : ~tcp_client_permanent_message_connection_impl()
424 0 : {
425 : // to make sure we can lose the messenger, first we want to be sure
426 : // that we do not have a thread running
427 : //
428 : try
429 : {
430 0 : f_thread.stop();
431 : }
432 0 : catch(cppthread::cppthread_exception_mutex_failed_error const &)
433 : {
434 : }
435 0 : catch(cppthread::cppthread_exception_invalid_error const &)
436 : {
437 : }
438 :
439 : // in this case we may still have an instance of the f_thread_done
440 : // which linger around, we want it out
441 : //
442 : // Note: the call is safe even if the f_thread_done is null
443 : //
444 0 : communicator::instance()->remove_connection(f_thread_done);
445 :
446 : // although the f_messenger variable gets reset automatically in
447 : // the destructor, it would not get removed from the snap
448 : // communicator instance if we were not doing it explicitly
449 : //
450 0 : disconnect();
451 0 : }
452 :
453 :
454 : /** \brief Direct connect to the messenger.
455 : *
456 : * In this case we try to connect without the thread. This allows
457 : * us to avoid the thread problems, but we are blocked until the
458 : * OS decides to time out or the connection worked.
459 : */
460 0 : void connect()
461 : {
462 0 : if(f_done)
463 : {
464 : SNAP_LOG_ERROR
465 0 : << "Permanent connection marked done. Cannot attempt to reconnect.";
466 0 : return;
467 : }
468 :
469 : // call the thread connect() function from the main thread
470 : //
471 0 : f_thread_runner.connect();
472 :
473 : // simulate receiving the thread_done() signal
474 : //
475 0 : thread_done();
476 : }
477 :
478 :
479 : /** \brief Check whether the permanent connection is currently connected.
480 : *
481 : * This function returns true if the messenger exists, which means that
482 : * the connection is up.
483 : *
484 : * \return true if the connection is up.
485 : */
486 0 : bool is_connected()
487 : {
488 0 : return f_messenger != nullptr;
489 : }
490 :
491 :
492 : /** \brief Try to start the thread runner.
493 : *
494 : * This function tries to start the thread runner in order to initiate
495 : * a connection in the background. If the thread could not be started,
496 : * then the function returns false.
497 : *
498 : * If the thread started, then the function returns true. This does
499 : * not mean that the connection was obtained. This is known once
500 : * the process_connected() function is called.
501 : *
502 : * \return true if the thread was successfully started.
503 : */
504 0 : bool background_connect()
505 : {
506 0 : if(f_done)
507 : {
508 : SNAP_LOG_ERROR
509 0 : << "Permanent connection marked done. Cannot attempt to reconnect.";
510 0 : return false;
511 : }
512 :
513 0 : if(f_thread.is_running())
514 : {
515 : SNAP_LOG_ERROR
516 0 : << "A background connection attempt is already in progress. Further requests are ignored.";
517 0 : return false;
518 : }
519 :
520 : // create the f_thread_done only when required
521 : //
522 0 : if(f_thread_done == nullptr)
523 : {
524 0 : f_thread_done.reset(new thread_signal_handler(this));
525 : }
526 :
527 0 : communicator::instance()->add_connection(f_thread_done);
528 :
529 0 : if(!f_thread.start())
530 : {
531 : SNAP_LOG_ERROR
532 0 : << "The thread used to run the background connection process did not start.";
533 0 : return false;
534 : }
535 :
536 0 : return true;
537 : }
538 :
539 :
540 : /** \brief Tell the main thread that the background thread is done.
541 : *
542 : * This function is called by the thread so the thread_done()
543 : * function of the thread done object gets called. Only the
544 : * thread should call this function.
545 : *
546 : * As a result the thread_done() function of this class will be
547 : * called by the main thread.
548 : */
549 0 : void trigger_thread_done()
550 : {
551 0 : f_thread_done->thread_done();
552 0 : }
553 :
554 :
555 : /** \brief Signal that the background thread is done.
556 : *
557 : * This callback is called whenever the background thread sends
558 : * a signal to us. This is used to avoid calling end user functions
559 : * that would certainly cause a lot of problem if called from the
560 : * thread.
561 : *
562 : * The function calls the process_connection_failed() if the
563 : * connection did not happen.
564 : *
565 : * The function calls the process_connected() if the connection
566 : * did happen.
567 : *
568 : * \note
569 : * This is used only if the user requested that the connection
570 : * happen in the background (i.e. use_thread was set to true
571 : * in the snap_tcp_client_permanent_message_connection object
572 : * constructor.)
573 : */
574 0 : void thread_done()
575 : {
576 : // if we used the thread we have to remove the signal used
577 : // to know that the thread was done
578 : //
579 0 : communicator::instance()->remove_connection(f_thread_done);
580 :
581 : // we will access the f_last_error member of the thread runner
582 : // which may not be available to the main thread yet, calling
583 : // stop forces a memory barrier so we are all good.
584 : //
585 : // calling stop() has no effect if we did not use the thread,
586 : // however, not calling stop() when we did use the thread
587 : // causes all sorts of other problems (especially, the thread
588 : // never gets joined)
589 : //
590 0 : f_thread.stop();
591 :
592 0 : tcp_bio_client::pointer_t client(f_thread_runner.release_client());
593 0 : if(f_done)
594 : {
595 : // already marked done, ignore the result and lose the
596 : // connection immediately
597 : //
598 : //f_thread_running.close(); -- not necessary, 'client' is the connection
599 0 : return;
600 : }
601 :
602 0 : if(client == nullptr)
603 : {
604 : // TODO: fix address in error message using a snap::addr so
605 : // as to handle IPv6 seemlessly.
606 : //
607 : SNAP_LOG_ERROR
608 0 : << "connection to "
609 0 : << f_thread_runner.get_address()
610 0 : << ":"
611 0 : << f_thread_runner.get_port()
612 0 : << " failed with: "
613 0 : << f_thread_runner.get_last_error();
614 :
615 : // signal that an error occurred
616 : //
617 0 : f_parent->process_connection_failed(f_thread_runner.get_last_error());
618 : }
619 : else
620 : {
621 0 : f_messenger.reset(new messenger(f_parent, client));
622 :
623 : // add the messenger to the communicator
624 : //
625 0 : communicator::instance()->add_connection(f_messenger);
626 :
627 : // if some messages were cached, process them immediately
628 : //
629 0 : while(!f_message_cache.empty())
630 : {
631 0 : f_messenger->send_message(f_message_cache[0]);
632 0 : f_message_cache.erase(f_message_cache.begin());
633 : }
634 :
635 : // let the client know we are now connected
636 : //
637 0 : f_parent->process_connected();
638 : }
639 : }
640 :
641 : /** \brief Send a message to the connection.
642 : *
643 : * This implementation function actually sends the message to the
644 : * connection, assuming that the connection exists. Otherwise, it
645 : * may cache the message (if cache is true.)
646 : *
647 : * Note that the message does not get cached if mark_done() was
648 : * called earlier since we are trying to close the whole connection.
649 : *
650 : * \param[in] message The message to send.
651 : * \param[in] cache Whether to cache the message if the connection is
652 : * currently down.
653 : *
654 : * \return true if the message was forwarded, false if the message
655 : * was ignored or cached.
656 : */
657 0 : bool send_message(message const & msg, bool cache)
658 : {
659 0 : if(f_messenger != nullptr)
660 : {
661 0 : return f_messenger->send_message(msg);
662 : }
663 :
664 0 : if(cache && !f_done)
665 : {
666 0 : f_message_cache.push_back(msg);
667 : }
668 :
669 0 : return false;
670 : }
671 :
672 :
673 : /** \brief Forget about the messenger connection.
674 : *
675 : * This function is used to fully disconnect from the messenger.
676 : *
677 : * If there is a messenger, this means:
678 : *
679 : * \li Removing the messenger from the snap_communicator instance.
680 : * \li Closing the connection in the thread object.
681 : *
682 : * In most cases, it is called when an error occur, also it happens
683 : * that we call it explicitly through the disconnect() function
684 : * of the permanent connection class.
685 : *
686 : * \note
687 : * This is safe, even though it is called from the messenger itself
688 : * because it will not get deleted yet. This is because the run()
689 : * loop has a copy in its own temporary copy of the vector of
690 : * connections.
691 : */
692 0 : void disconnect()
693 : {
694 0 : if(f_messenger != nullptr)
695 : {
696 0 : communicator::instance()->remove_connection(f_messenger);
697 0 : f_messenger.reset();
698 :
699 : // just the messenger does not close the TCP connection because
700 : // we may have another in the thread runner
701 : //
702 0 : f_thread_runner.close();
703 : }
704 0 : }
705 :
706 :
707 : /** \brief Return the address and size of the remote computer.
708 : *
709 : * This function retrieve the socket address.
710 : *
711 : * \param[out] address The binary address of the remote computer.
712 : *
713 : * \return The size of the sockaddr structure, 0 if no address is available.
714 : */
715 0 : size_t get_client_address(sockaddr_storage & address) const
716 : {
717 0 : if(f_messenger != nullptr)
718 : {
719 0 : return f_messenger->get_client_address(address);
720 : }
721 0 : memset(&address, 0, sizeof(address));
722 0 : return 0;
723 : }
724 :
725 :
726 : /** \brief Return the address of the f_message object.
727 : *
728 : * This function returns the address of the message object.
729 : *
730 : * \return The address of the remote computer.
731 : */
732 0 : std::string get_client_addr() const
733 : {
734 0 : if(f_messenger != nullptr)
735 : {
736 0 : return f_messenger->get_client_addr();
737 : }
738 0 : return std::string();
739 : }
740 :
741 :
742 : /** \brief Mark the messenger as done.
743 : *
744 : * This function is used to mark the messenger as done. This means it
745 : * will get removed from the snap_communicator instance as soon as it
746 : * is done with its current write buffer if there is one.
747 : *
748 : * You may also want to call the disconnection() function to actually
749 : * reset the pointer along the way.
750 : */
751 0 : void mark_done()
752 : {
753 0 : f_done = true;
754 :
755 : // once done we don't attempt to reconnect so we can as well
756 : // get rid of our existing cache immediately to save some
757 : // memory
758 : //
759 0 : f_message_cache.clear();
760 :
761 0 : if(f_messenger != nullptr)
762 : {
763 0 : f_messenger->mark_done();
764 : }
765 0 : }
766 :
767 :
768 : private:
769 : tcp_client_permanent_message_connection * f_parent = nullptr;
770 : thread_signal_handler::pointer_t f_thread_done = thread_signal_handler::pointer_t();
771 : runner f_thread_runner;
772 : cppthread::thread f_thread;
773 : messenger::pointer_t f_messenger = messenger::pointer_t();
774 : message::vector_t f_message_cache = message::vector_t();
775 : bool f_done = false;
776 : };
777 :
778 :
779 :
780 : }
781 : // namespace detail
782 :
783 :
784 :
785 : /** \brief Initializes this TCP client message connection.
786 : *
787 : * This implementation creates what we call a permanent connection.
788 : * Such a connection may fail once in a while. In such circumstances,
789 : * the class automatically requests for a reconnection (see various
790 : * parameters in the regard below.) However, this causes one issue:
791 : * by default, the connection just never ends. When you are about
792 : * ready to close the connection, you must call the mark_done()
793 : * function first. This will tell the various error functions to
794 : * drop this connection instead of restarting it after a small pause.
795 : *
796 : * This constructor makes sure to initialize the timer and saves
797 : * the address, port, mode, pause, and use_thread parameters.
798 : *
799 : * The timer is first set to trigger immediately. This means the TCP
800 : * connection will be attempted as soon as possible (the next time
801 : * the run() loop is entered, it will time out immediately.) You
802 : * are free to call set_timeout_date() with a date in the future if
803 : * you prefer that the connect be attempted a little later.
804 : *
805 : * The \p pause parameter is used if the connection is lost and this
806 : * timer is used again to attempt a new connection. It will be reused
807 : * as long as the connection fails (as a delay). It has to be at least
808 : * 10 microseconds, although really you should not use less than 1
809 : * second (1000000). You may set the pause parameter to 0 in which case
810 : * you are responsible to set the delay (by default there will be no
811 : * delay and thus the timer will never time out.)
812 : *
813 : * To start with a delay, instead of trying to connect immediately,
814 : * you may pass a negative pause parameter. So for example to get the
815 : * first attempt 5 seconds after you created this object, you use
816 : * -5000000LL as the pause parameter.
817 : *
818 : * The \p use_thread parameter determines whether the connection should
819 : * be attempted in a thread (asynchroneously) or immediately (which means
820 : * the timeout callback may block for a while.) If the connection is to
821 : * a local server with an IP address specified as numbers (i.e. 127.0.0.1),
822 : * the thread is probably not required. For connections to a remote
823 : * computer, though, it certainly is important.
824 : *
825 : * \param[in] address The address to listen on. It may be set to "0.0.0.0".
826 : * \param[in] port The port to listen on.
827 : * \param[in] mode The mode to use to open the connection.
828 : * \param[in] pause The amount of time to wait before attempting a new
829 : * connection after a failure, in microseconds, or 0.
830 : * \param[in] use_thread Whether a thread is used to connect to the
831 : * server.
832 : */
833 0 : tcp_client_permanent_message_connection::tcp_client_permanent_message_connection(
834 : std::string const & address
835 : , int port
836 : , tcp_bio_client::mode_t mode
837 : , std::int64_t const pause
838 : , bool const use_thread)
839 : : timer(pause < 0 ? -pause : 0)
840 0 : , f_impl(new detail::tcp_client_permanent_message_connection_impl(this, address, port, mode))
841 0 : , f_pause(llabs(pause))
842 0 : , f_use_thread(use_thread)
843 : {
844 0 : }
845 :
846 :
847 : /** \brief Destroy instance.
848 : *
849 : * This function cleans up everything in the permanent message object.
850 : */
851 0 : tcp_client_permanent_message_connection::~tcp_client_permanent_message_connection()
852 : {
853 : // Does nothing
854 0 : }
855 :
856 :
857 : /** \brief Attempt to send a message to this connection.
858 : *
859 : * If the connection is currently enabled, the message is sent immediately.
860 : * Otherwise, it may be cached if the \p cache parameter is set to true.
861 : * A cached message is forwarded as soon as a new successful connection
862 : * happens, which can be a problem if messages need to happen in a very
863 : * specific order (For example, after a reconnection to snapcommunicator
864 : * you first need to REGISTER or CONNECT...)
865 : *
866 : * \param[in] message The message to send to the connected server.
867 : * \param[in] cache Whether the message should be cached.
868 : *
869 : * \return true if the message was sent, false if it was not sent, although
870 : * if cache was true, it was cached
871 : */
872 0 : bool tcp_client_permanent_message_connection::send_message(message const & msg, bool cache)
873 : {
874 0 : return f_impl->send_message(msg, cache);
875 : }
876 :
877 :
878 : /** \brief Check whether the connection is up.
879 : *
880 : * This function returns true if the connection is considered to be up.
881 : * This means sending messages will work quickly instead of being
882 : * cached up until an actual TCP/IP connection gets established.
883 : *
884 : * Note that the connection may have hanged up since, and the system
885 : * may not have yet detected the fact (i.e. the connection is going
886 : * to receive the process_hup() call after the event in which you are
887 : * working.)
888 : *
889 : * \return true if connected
890 : */
891 0 : bool tcp_client_permanent_message_connection::is_connected() const
892 : {
893 0 : return f_impl->is_connected();
894 : }
895 :
896 :
897 : /** \brief Disconnect the messenger now.
898 : *
899 : * This function kills the current connection.
900 : *
901 : * There are a few cases where two daemons communicate between each others
902 : * and at some point one of them wants to exit and needs to disconnect. This
903 : * function can be used in that one situation assuming that you have an
904 : * acknowledgement from the other daemon.
905 : *
906 : * Say you have daemon A and B. B wants to quit and before doing so sends
907 : * a form of "I'm quitting" message to A. In that situation, B is not closing
908 : * the messenger connection, A is responsible for that (i.e. A acknowledges
909 : * receipt of the "I'm quitting" message from B by closing the connection.)
910 : *
911 : * B also wants to call the mark_done() function to make sure that it
912 : * does not reconnected a split second later and instead the permanent
913 : * connection gets removed from the snap_communicator list of connections.
914 : */
915 0 : void tcp_client_permanent_message_connection::disconnect()
916 : {
917 0 : f_impl->disconnect();
918 0 : }
919 :
920 :
921 : /** \brief Overload so we do not have to use namespace everywhere.
922 : *
923 : * This function overloads the snap_connection::mark_done() function so
924 : * we can call it without the need to use snap_timer::mark_done()
925 : * everywhere.
926 : */
927 0 : void tcp_client_permanent_message_connection::mark_done()
928 : {
929 0 : timer::mark_done();
930 0 : }
931 :
932 :
933 : /** \brief Mark connection as done.
934 : *
935 : * This function allows you to mark the permanent connection and the
936 : * messenger as done.
937 : *
938 : * Note that calling this function with false is the same as calling the
939 : * base class mark_done() function.
940 : *
941 : * If the \p message parameter is set to true, we suggest you also call
942 : * the disconnect() function. That way the messenger will truly get
943 : * removed from everyone quickly.
944 : *
945 : * \param[in] messenger If true, also mark the messenger as done.
946 : */
947 0 : void tcp_client_permanent_message_connection::mark_done(bool messenger)
948 : {
949 0 : timer::mark_done();
950 0 : if(messenger)
951 : {
952 0 : f_impl->mark_done();
953 : }
954 0 : }
955 :
956 :
957 : /** \brief Retrieve a copy of the client's address.
958 : *
959 : * This function makes a copy of the address of this client connection
960 : * to the \p address parameter and returns the length.
961 : *
962 : * \param[in] address The reference to an address variable where the
963 : * address gets copied.
964 : *
965 : * \return Return the length of the address which may be smaller than
966 : * sizeof(struct sockaddr). If zero, then no address is defined.
967 : *
968 : * \sa get_addr()
969 : */
970 0 : size_t tcp_client_permanent_message_connection::get_client_address(sockaddr_storage & address) const
971 : {
972 0 : return f_impl->get_client_address(address);
973 : }
974 :
975 :
976 : /** \brief Retrieve the remote computer address as a string.
977 : *
978 : * This function returns the address of the remote computer as a string.
979 : * It will be a canonicalized IP address.
980 : *
981 : * \return The cacnonicalized IP address.
982 : */
983 0 : std::string tcp_client_permanent_message_connection::get_client_addr() const
984 : {
985 0 : return f_impl->get_client_addr();
986 : }
987 :
988 :
989 : /** \brief Internal timeout callback implementation.
990 : *
991 : * This callback implements the guts of this class: it attempts to connect
992 : * to the specified address and port, optionally after creating a thread
993 : * so the attempt can happen asynchroneously.
994 : *
995 : * When the connection fails, the timer is used to try again pause
996 : * microseconds later (pause as specified in the constructor).
997 : *
998 : * When a connection succeeds, the timer is disabled until you detect
999 : * an error while using the connection and re-enable the timer.
1000 : *
1001 : * \warning
1002 : * This function changes the timeout delay to the pause amount
1003 : * as defined with the constructor. If you want to change that
1004 : * amount, you can do so an any point after this function call
1005 : * using the set_timeout_delay() function. If the pause parameter
1006 : * was set to -1, then the timeout never gets changed.
1007 : * However, you should not use a permanent message timer as your
1008 : * own or you will interfer with the internal use of the timer.
1009 : */
1010 0 : void tcp_client_permanent_message_connection::process_timeout()
1011 : {
1012 : // got a spurious call when already marked done
1013 : //
1014 0 : if(is_done())
1015 : {
1016 0 : return;
1017 : }
1018 :
1019 : // change the timeout delay although we will not use it immediately
1020 : // if we start the thread or attempt an immediate connection, but
1021 : // that way the user can change it by calling set_timeout_delay()
1022 : // at any time after the first process_timeout() call
1023 : //
1024 0 : if(f_pause > 0)
1025 : {
1026 0 : set_timeout_delay(f_pause);
1027 0 : f_pause = 0;
1028 : }
1029 :
1030 0 : if(f_use_thread)
1031 : {
1032 : // in this case we create a thread, run it and know whether the
1033 : // connection succeeded only when the thread tells us it did
1034 : //
1035 : // TODO: the background_connect() may return false in two situations:
1036 : // 1) when the thread is already running and then the behavior
1037 : // we have below is INCORRECT
1038 : // 2) when the thread cannot be started (i.e. could not
1039 : // allocate the stack?) in which case the if() below
1040 : // is the correct behavior
1041 : //
1042 0 : if(f_impl->background_connect())
1043 : {
1044 : // we started the thread successfully, so block the timer
1045 : //
1046 0 : set_enable(false);
1047 : }
1048 : }
1049 : else
1050 : {
1051 : // the success is noted when we receive a call to
1052 : // process_connected(); there we do set_enable(false)
1053 : // so the timer stops
1054 : //
1055 0 : f_impl->connect();
1056 : }
1057 : }
1058 :
1059 :
1060 : /** \brief Process an error.
1061 : *
1062 : * When an error occurs, we restart the timer so we can attempt to reconnect
1063 : * to that server.
1064 : *
1065 : * If you overload this function, make sure to either call this
1066 : * implementation or enable the timer yourselves.
1067 : *
1068 : * \warning
1069 : * This function does not call the snap_timer::process_error() function
1070 : * which means that this connection is not automatically removed from
1071 : * the snapcommunicator object on failures.
1072 : */
1073 0 : void tcp_client_permanent_message_connection::process_error()
1074 : {
1075 0 : if(is_done())
1076 : {
1077 0 : timer::process_error();
1078 : }
1079 : else
1080 : {
1081 0 : f_impl->disconnect();
1082 0 : set_enable(true);
1083 : }
1084 0 : }
1085 :
1086 :
1087 : /** \brief Process a hang up.
1088 : *
1089 : * When a hang up occurs, we restart the timer so we can attempt to reconnect
1090 : * to that server.
1091 : *
1092 : * If you overload this function, make sure to either call this
1093 : * implementation or enable the timer yourselves.
1094 : *
1095 : * \warning
1096 : * This function does not call the snap_timer::process_hup() function
1097 : * which means that this connection is not automatically removed from
1098 : * the snapcommunicator object on failures.
1099 : */
1100 0 : void tcp_client_permanent_message_connection::process_hup()
1101 : {
1102 0 : if(is_done())
1103 : {
1104 0 : timer::process_hup();
1105 : }
1106 : else
1107 : {
1108 0 : f_impl->disconnect();
1109 0 : set_enable(true);
1110 : }
1111 0 : }
1112 :
1113 :
1114 : /** \brief Process an invalid signal.
1115 : *
1116 : * When an invalid signal occurs, we restart the timer so we can attempt
1117 : * to reconnect to that server.
1118 : *
1119 : * If you overload this function, make sure to either call this
1120 : * implementation or enable the timer yourselves.
1121 : *
1122 : * \warning
1123 : * This function does not call the snap_timer::process_invalid() function
1124 : * which means that this connection is not automatically removed from
1125 : * the snapcommunicator object on failures.
1126 : */
1127 0 : void tcp_client_permanent_message_connection::process_invalid()
1128 : {
1129 0 : if(is_done())
1130 : {
1131 0 : timer::process_invalid();
1132 : }
1133 : else
1134 : {
1135 0 : f_impl->disconnect();
1136 0 : set_enable(true);
1137 : }
1138 0 : }
1139 :
1140 :
1141 : /** \brief Make sure that the messenger connection gets removed.
1142 : *
1143 : * This function makes sure that the messenger sub-connection also gets
1144 : * removed from the snap communicator. Otherwise it would lock the system
1145 : * since connections are saved in the snap communicator object as shared
1146 : * pointers.
1147 : */
1148 0 : void tcp_client_permanent_message_connection::connection_removed()
1149 : {
1150 0 : f_impl->disconnect();
1151 0 : }
1152 :
1153 :
1154 : /** \brief Process a connection failed callback.
1155 : *
1156 : * When a connection attempt fails, we restart the timer so we can
1157 : * attempt to reconnect to that server.
1158 : *
1159 : * If you overload this function, make sure to either call this
1160 : * implementation or enable the timer yourselves.
1161 : *
1162 : * \param[in] error_message The error message that trigged this callback.
1163 : */
1164 0 : void tcp_client_permanent_message_connection::process_connection_failed(std::string const & error_message)
1165 : {
1166 0 : snap::NOTUSED(error_message);
1167 0 : set_enable(true);
1168 0 : }
1169 :
1170 :
1171 : /** \brief The connection is ready.
1172 : *
1173 : * This callback gets called whenever the connection succeeded and is
1174 : * ready to be used.
1175 : *
1176 : * You should implement this virtual function if you have to initiate
1177 : * the communication. For example, the snapserver has to send a
1178 : * REGISTER to the snapcommunicator system and thus implements this
1179 : * function.
1180 : *
1181 : * The default implementation makes sure that the timer gets turned off
1182 : * so we do not try to reconnect every minute or so.
1183 : */
1184 0 : void tcp_client_permanent_message_connection::process_connected()
1185 : {
1186 0 : set_enable(false);
1187 0 : }
1188 :
1189 :
1190 :
1191 6 : } // namespace ed
1192 : // vim: ts=4 sw=4 et
|