Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/communicator.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 : #include "eventdispatcher/signal.h"
54 : #include "eventdispatcher/utils.h"
55 :
56 :
57 : // cppthread lib
58 : //
59 : #include <cppthread/guard.h>
60 : #include <cppthread/mutex.h>
61 : #include <cppthread/thread.h>
62 :
63 :
64 : // snapdev lib
65 : //
66 : #include <snapdev/safe_variable.h>
67 :
68 :
69 : // snaplogger lib
70 : //
71 : #include <snaplogger/message.h>
72 :
73 :
74 : // C++ lib
75 : //
76 : #include <algorithm>
77 : #include <cstring>
78 :
79 :
80 : // C lib
81 : //
82 : #include <poll.h>
83 : #include <sys/resource.h>
84 :
85 :
86 : // last include
87 : //
88 : #include <snapdev/poison.h>
89 :
90 :
91 :
92 : namespace ed
93 : {
94 : namespace
95 : {
96 :
97 :
98 : /** \brief The instance of the communicator singleton.
99 : *
100 : * This pointer is the one instance of the communicator
101 : * we create to run an event loop.
102 : */
103 : communicator::pointer_t * g_instance = nullptr;
104 :
105 :
106 : } // no name namespace
107 :
108 :
109 :
110 :
111 :
112 : /** \brief Initialize a communicator object.
113 : *
114 : * This function initializes the communicator object.
115 : */
116 1 : communicator::communicator()
117 : {
118 1 : }
119 :
120 :
121 : /** \brief Retrieve the instance() of the communicator.
122 : *
123 : * This function returns the instance of the communicator.
124 : * There is really no reason and it could also create all sorts
125 : * of problems to have more than one instance hence we created
126 : * the communicator as a singleton. It also means you cannot
127 : * actually delete the communicator.
128 : *
129 : * The initialization of the communicator instance is thread
130 : * safe.
131 : */
132 34 : communicator::pointer_t communicator::instance()
133 : {
134 68 : cppthread::guard g(*cppthread::g_system_mutex);
135 :
136 34 : if(g_instance == nullptr)
137 : {
138 : // `communicator` constructor is private so we can't use
139 : // the std::make_shared<>
140 : //
141 1 : g_instance = new communicator::pointer_t();
142 1 : g_instance->reset(new communicator());
143 : }
144 :
145 68 : return *g_instance;
146 : }
147 :
148 :
149 : /** \brief Retrieve a reference to the vector of connections.
150 : *
151 : * This function returns a reference to all the connections that are
152 : * currently attached to the communicator system.
153 : *
154 : * This is useful to search the array.
155 : *
156 : * \return The vector of connections.
157 : */
158 0 : connection::vector_t const & communicator::get_connections() const
159 : {
160 0 : return f_connections;
161 : }
162 :
163 :
164 : /** \brief Attach a connection to the communicator.
165 : *
166 : * This function attaches a connection to the communicator. This allows
167 : * us to execute code for that connection by having the process_signal()
168 : * function called.
169 : *
170 : * Connections are kept in the order in which they are added. This may
171 : * change the order in which connection callbacks are called. However,
172 : * events are received asynchronously so do not expect callbacks to be
173 : * called in any specific order.
174 : *
175 : * You may call this function with a null pointer. It simply returns
176 : * false immediately. This makes it easy to eventually allocate a
177 : * new connection and then use the return value of this function
178 : * to know whether the two step process worked or not.
179 : *
180 : * \note
181 : * A connection can only be added once to a communicator object.
182 : * Also it cannot be shared between multiple communicator objects.
183 : *
184 : * \param[in] connection The connection being added.
185 : *
186 : * \return true if the connection was added, false if the connection
187 : * was already present in the communicator list of connections.
188 : */
189 20 : bool communicator::add_connection(connection::pointer_t connection)
190 : {
191 20 : if(connection == nullptr)
192 : {
193 0 : return false;
194 : }
195 :
196 20 : if(!connection->valid_socket())
197 : {
198 : throw event_dispatcher_invalid_parameter(
199 : "communicator::add_connection(): connection without a socket"
200 0 : " cannot be added to a communicator object.");
201 : }
202 :
203 20 : auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
204 20 : if(it != f_connections.end())
205 : {
206 : // already added, can be added only once but we allow multiple
207 : // calls (however, we do not count those calls, so first call
208 : // to the remove_connection() does remove it!)
209 : //
210 0 : return false;
211 : }
212 :
213 20 : f_connections.push_back(connection);
214 :
215 20 : connection->connection_added();
216 :
217 20 : return true;
218 : }
219 :
220 :
221 : /** \brief Remove a connection from a communicator object.
222 : *
223 : * This function removes a connection from this communicator object.
224 : * Note that any one connection can only be added once.
225 : *
226 : * \param[in] connection The connection to remove from this communicator.
227 : *
228 : * \return true if the connection was removed, false if it was not found.
229 : */
230 22 : bool communicator::remove_connection(connection::pointer_t connection)
231 : {
232 22 : auto it(std::find(f_connections.begin(), f_connections.end(), connection));
233 22 : if(it == f_connections.end())
234 : {
235 2 : return false;
236 : }
237 :
238 40 : SNAP_LOG_TRACE
239 20 : << "removing 1 connection, \""
240 20 : << connection->get_name()
241 20 : << "\", of "
242 20 : << f_connections.size()
243 : << " connections (including this one.)"
244 : << SNAP_LOG_SEND;
245 :
246 20 : f_connections.erase(it);
247 :
248 20 : connection->connection_removed();
249 :
250 : // TODO: make this a flag so we can turn it on without having to recompile
251 : #if 0
252 : #ifdef _DEBUG
253 : std::for_each(
254 : f_connections.begin()
255 : , f_connections.end()
256 : , [](auto const & c)
257 : {
258 : SNAP_LOG_TRACE
259 : << "communicator::remove_connection(): remaining connection: \""
260 : << c->get_name()
261 : << "\""
262 : << SNAP_LOG_SEND;
263 : });
264 : #endif
265 : #endif
266 :
267 20 : return true;
268 : }
269 :
270 :
271 : /** \brief Set the Force Sort flag to \p status.
272 : *
273 : * This function can be called to force the run() function to sort (or not
274 : * sort) the list of connections.
275 : *
276 : * Since the sort function is somewhat expensive, the sort changes the
277 : * vector of connections in place. Then only a change of priority
278 : * triggers a request for the vector to be sorted again.
279 : *
280 : * This function can be used in the event you need to force a trigger.
281 : * It would be unlikely that you would call this function with false.
282 : *
283 : * \param[in] status The new status of the force sort flag.
284 : */
285 0 : void communicator::set_force_sort(bool status)
286 : {
287 0 : f_force_sort = status;
288 0 : }
289 :
290 :
291 : /** \brief Check whether the run() function is still going.
292 : *
293 : * The f_running internal flag is set to true while within the run()
294 : * function. This function tells you whether you already called the
295 : * run() function and are running within a callback or you are before
296 : * or after the call.
297 : *
298 : * \return true if the run() function is still running.
299 : */
300 6 : bool communicator::is_running() const
301 : {
302 6 : return f_running;
303 : }
304 :
305 :
306 : /** \brief Run until all connections are removed.
307 : *
308 : * This function "blocks" until all the connections added to this
309 : * communicator instance are removed. Until then, it wakes
310 : * up and run callback functions whenever an event occurs.
311 : *
312 : * In other words, you want to add_connection() before you call
313 : * this function otherwise the function returns immediately.
314 : *
315 : * Note that you can include timeout events so if you need to
316 : * run some code once in a while, you may just use a timeout
317 : * event and process your repetitive events that way.
318 : *
319 : * \note
320 : * Calling exit() or a similar function from within a callback
321 : * is not adviced, although it may work in most cases, it is
322 : * much better/cleaner to go through your list of connections
323 : * and remove them all once you are ready to quit. This also
324 : * allows for a 100% valid shutdown procedure.
325 : *
326 : * \return true if the loop exits because the list of connections is empty.
327 : */
328 8 : bool communicator::run()
329 : {
330 8 : if(f_running)
331 : {
332 0 : SNAP_LOG_FATAL
333 0 : << "communicator::run(): recursively called from within a callback."
334 : << SNAP_LOG_SEND;
335 0 : throw event_dispatcher_recursive_call("communicator::run(): recursively called from within a callback.");
336 : }
337 :
338 16 : snap::safe_variable running(f_running, true);
339 :
340 16 : std::vector<bool> enabled;
341 16 : std::vector<struct pollfd> fds;
342 8 : f_force_sort = true;
343 : for(;;)
344 : {
345 : // any connections?
346 37 : if(f_connections.empty())
347 : {
348 8 : return true;
349 : }
350 :
351 29 : if(f_force_sort)
352 : {
353 : // sort the connections by priority
354 : //
355 8 : std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
356 8 : f_force_sort = false;
357 : }
358 :
359 : // make a copy because the callbacks may end up making
360 : // changes to the main list and we would have problems
361 : // with that here...
362 : //
363 58 : connection::vector_t connections(f_connections);
364 29 : size_t max_connections(connections.size());
365 :
366 : // timeout is do not time out by default
367 : //
368 29 : std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
369 :
370 : // clear() is not supposed to delete the buffer of vectors
371 : //
372 29 : enabled.clear();
373 29 : fds.clear();
374 29 : fds.reserve(max_connections); // avoid more than 1 allocation
375 95 : for(size_t idx(0); idx < max_connections; ++idx)
376 : {
377 124 : connection::pointer_t c(connections[idx]);
378 66 : c->f_fds_position = -1;
379 :
380 : // is the connection enabled?
381 : //
382 : // note that we save that value for later use in our loop
383 : // below because otherwise we will miss many events and
384 : // it tends to break things; that means you may get your
385 : // callback called even while disabled
386 : //
387 66 : enabled.push_back(c->is_enabled());
388 66 : if(!enabled[idx])
389 : {
390 : //SNAP_LOG_TRACE
391 : // << "communicator::run(): connection '"
392 : // << c->get_name()
393 : // << "' has been disabled, so ignored."
394 : // << SNAP_LOG_SEND;
395 7 : continue;
396 : }
397 : //SNAP_LOG_TRACE
398 : // << "communicator::run(): handling connection "
399 : // << idx
400 : // << "/"
401 : // << max_connections
402 : // << ". '"
403 : // << c->get_name()
404 : // << "' since it is enabled..."
405 : // << SNAP_LOG_SEND;
406 :
407 : // check whether a timeout is defined in this connection
408 : //
409 59 : int64_t const timestamp(c->save_timeout_timestamp());
410 59 : if(timestamp != -1)
411 : {
412 : // the timeout event gives us a time when to tick
413 : //
414 1 : if(timestamp < next_timeout_timestamp)
415 : {
416 1 : next_timeout_timestamp = timestamp;
417 : }
418 : }
419 :
420 : // is there any events to listen on?
421 59 : int e(0);
422 59 : if(c->is_listener() || c->is_signal())
423 : {
424 26 : e |= POLLIN;
425 : }
426 59 : if(c->is_reader())
427 : {
428 29 : e |= POLLIN | POLLPRI | POLLRDHUP;
429 : }
430 59 : if(c->is_writer())
431 : {
432 6 : e |= POLLOUT | POLLRDHUP;
433 : }
434 59 : if(e == 0)
435 : {
436 : // this should only happen on timer objects
437 : //
438 1 : continue;
439 : }
440 :
441 : // do we have a currently valid socket? (i.e. the connection
442 : // may have been closed or we may be handling a timer or
443 : // signal object)
444 : //
445 58 : if(c->get_socket() < 0)
446 : {
447 0 : continue;
448 : }
449 :
450 : // this is considered valid, add this connection to the list
451 : //
452 : // save the position since we may skip some entries...
453 : // (otherwise we would have to use -1 as the socket to
454 : // allow for such dead entries, but avoiding such entries
455 : // saves time)
456 : //
457 58 : c->f_fds_position = fds.size();
458 :
459 : //SNAP_LOG_ERROR
460 : // << "*** still waiting on \""
461 : // << c->get_name()
462 : // << "\"."
463 : // << SNAP_LOG_SEND;
464 :
465 58 : struct pollfd fd;
466 58 : fd.fd = c->get_socket();
467 58 : fd.events = e;
468 58 : fd.revents = 0; // probably useless... (kernel should clear those)
469 58 : fds.push_back(fd);
470 : }
471 :
472 : // compute the right timeout
473 29 : std::int64_t timeout(-1);
474 29 : if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
475 : {
476 1 : std::int64_t const now(get_current_date());
477 1 : timeout = next_timeout_timestamp - now;
478 1 : if(timeout < 0)
479 : {
480 : // timeout is in the past so timeout immediately, but
481 : // still check for events if any
482 1 : timeout = 0;
483 : }
484 : else
485 : {
486 : // convert microseconds to milliseconds for poll()
487 0 : timeout /= 1000;
488 0 : if(timeout == 0)
489 : {
490 : // less than one is a waste of time (CPU intensive
491 : // until the time is reached, we can be 1 ms off
492 : // instead...)
493 0 : timeout = 1;
494 : }
495 : }
496 : }
497 28 : else if(fds.empty())
498 : {
499 0 : SNAP_LOG_FATAL
500 0 : << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
501 0 : << max_connections
502 : << " and exiting the run() loop anyway.)"
503 : << SNAP_LOG_SEND;
504 0 : return false;
505 : }
506 :
507 : //SNAP_LOG_TRACE << "communicator::run(): ready to poll(); "
508 : // << "count " << fds.size()
509 : // << " timeout " << timeout
510 : // << " (next was: " << next_timeout_timestamp
511 : // << ", current ~ " << get_current_date()
512 : // << ")"
513 : // << SNAP_LOG_SEND;
514 :
515 : // TODO: add support for ppoll() so we can support signals cleanly
516 : // with nearly no additional work from us
517 : //
518 29 : errno = 0;
519 29 : int const r(poll(fds.empty() ? nullptr : &fds[0], fds.size(), timeout));
520 29 : if(r >= 0)
521 : {
522 : // quick sanity check
523 : //
524 29 : if(static_cast<size_t>(r) > connections.size())
525 : {
526 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
527 : }
528 : //SNAP_LOG_TRACE
529 : // <<"tid="
530 : // << cppthread::gettid()
531 : // << ", communicator::run(): ------------------- new set of "
532 : // << r
533 : // << " events to handle"
534 : // << SNAP_LOG_SEND;
535 :
536 : // check each connection one by one for:
537 : //
538 : // 1) fds events, including signals
539 : // 2) timeouts
540 : //
541 : // and execute the corresponding callbacks
542 : //
543 95 : for(size_t idx(0); idx < connections.size(); ++idx)
544 : {
545 125 : connection::pointer_t c(connections[idx]);
546 :
547 : // is the connection enabled?
548 : //
549 : // note that we check whether that connection was enabled
550 : // before poll() was called; this is very important because
551 : // the last poll() events must be run even if a previous
552 : // callback call just disabled this very connection
553 : // (i.e. at the time we called poll() the connection was
554 : // still enabled and therefore we are expected to call
555 : // their callbacks even if it just got disabled by an
556 : // earlier callback)
557 : //
558 66 : if(!enabled[idx])
559 : {
560 : //SNAP_LOG_TRACE
561 : // << "communicator::run(): in loop, connection '"
562 : // << c->get_name()
563 : // << "' has been disabled, so ignored!"
564 : // << SNAP_LOG_SEND;
565 7 : continue;
566 : }
567 :
568 : // if we have a valid fds position then an event other
569 : // than a timeout occurred on that connection
570 : //
571 59 : if(c->f_fds_position >= 0)
572 : {
573 58 : struct pollfd * fd(&fds[c->f_fds_position]);
574 :
575 : // if any events were found by poll(), process them now
576 : //
577 : //SNAP_LOG_TRACE
578 : // <<"tid="
579 : // << cppthread::gettid()
580 : // << ", communicator::run(): events for "
581 : // << c->get_name()
582 : // << " = "
583 : // << fd->revents
584 : // << SNAP_LOG_SEND;
585 58 : if(fd->revents != 0)
586 : {
587 : // an event happened on this one
588 : //
589 28 : if((fd->revents & (POLLIN | POLLPRI)) != 0)
590 : {
591 : // we consider that Unix signals have the greater priority
592 : // and thus handle them first
593 : //
594 20 : if(c->is_signal())
595 : {
596 7 : signal * ss(dynamic_cast<signal *>(c.get()));
597 7 : if(ss != nullptr)
598 : {
599 7 : ss->process();
600 : }
601 : }
602 13 : else if(c->is_listener())
603 : {
604 : // a listener is a special case and we want
605 : // to call process_accept() instead
606 : //
607 1 : c->process_accept();
608 : }
609 : else
610 : {
611 12 : c->process_read();
612 : }
613 : }
614 28 : if((fd->revents & POLLOUT) != 0)
615 : {
616 6 : c->process_write();
617 : }
618 28 : if((fd->revents & POLLERR) != 0)
619 : {
620 0 : c->process_error();
621 : }
622 28 : if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
623 : {
624 5 : c->process_hup();
625 : }
626 28 : if((fd->revents & POLLNVAL) != 0)
627 : {
628 0 : c->process_invalid();
629 : }
630 : }
631 : }
632 :
633 : // now check whether we have a timeout on this connection
634 : //
635 59 : int64_t const timestamp(c->get_saved_timeout_timestamp());
636 59 : if(timestamp != -1)
637 : {
638 1 : int64_t const now(get_current_date());
639 1 : if(now >= timestamp)
640 : {
641 : //SNAP_LOG_TRACE
642 : // << "communicator::run(): timer of connection = '"<< c->get_name()
643 : // << "', timestamp = " << timestamp
644 : // << ", now = " << now
645 : // << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE")
646 : // << SNAP_LOG_SEND;
647 :
648 : // move the timeout as required first
649 : // (because the callback may move it again)
650 : //
651 1 : c->calculate_next_tick();
652 :
653 : // the timeout date needs to be reset if the tick
654 : // happened for that date
655 : //
656 1 : if(now >= c->get_timeout_date())
657 : {
658 1 : c->set_timeout_date(-1);
659 : }
660 :
661 : // then run the callback
662 : //
663 1 : c->process_timeout();
664 : }
665 : }
666 : }
667 : }
668 : else
669 : {
670 : // r < 0 means an error occurred
671 : //
672 0 : if(errno == EINTR)
673 : {
674 : // Note: if the user wants to prevent this error, he should
675 : // use the signal with the Unix signals that may
676 : // happen while calling poll().
677 : //
678 0 : throw event_dispatcher_runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
679 : }
680 0 : if(errno == EFAULT)
681 : {
682 0 : throw event_dispatcher_invalid_parameter("communicator::run(): buffer was moved out of our address space?");
683 : }
684 0 : if(errno == EINVAL)
685 : {
686 : // if this is really because nfds is too large then it may be
687 : // a "soft" error that can be fixed; that being said, my
688 : // current version is 16K files which frankly when we reach
689 : // that level we have a problem...
690 : //
691 0 : struct rlimit rl;
692 0 : getrlimit(RLIMIT_NOFILE, &rl);
693 : throw event_dispatcher_invalid_parameter(
694 : "communicator::run(): too many file fds for poll, limit is currently "
695 0 : + std::to_string(rl.rlim_cur)
696 0 : + ", your kernel top limit is "
697 0 : + std::to_string(rl.rlim_max));
698 : }
699 0 : if(errno == ENOMEM)
700 : {
701 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() failed because of memory");
702 : }
703 0 : int const e(errno);
704 : throw event_dispatcher_runtime_error(
705 : "communicator::run(): poll() failed with error "
706 0 : + std::to_string(e)
707 0 : + " -- "
708 0 : + strerror(e));
709 : }
710 29 : }
711 : }
712 :
713 :
714 :
715 6 : } // namespace ed
716 : // vim: ts=4 sw=4 et
|