Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 : // to get the POLLRDHUP definition
40 : #ifndef _GNU_SOURCE
41 : #define _GNU_SOURCE
42 : #endif
43 :
44 :
45 : // self
46 : //
47 : #include "eventdispatcher/communicator.h"
48 :
49 : #include "eventdispatcher/exception.h"
50 : #include "eventdispatcher/signal.h"
51 : #include "eventdispatcher/utils.h"
52 :
53 :
54 : // cppthread lib
55 : //
56 : #include <cppthread/guard.h>
57 : #include <cppthread/mutex.h>
58 :
59 :
60 : // snaplogger lib
61 : //
62 : #include <snaplogger/message.h>
63 :
64 :
65 : //// snapdev lib
66 : ////
67 : //#include "snapdev/not_reached.h"
68 : //#include "snapdev/not_used.h"
69 : //#include "snapdev/string_replace_many.h"
70 : //
71 : //
72 : //// libaddr lib
73 : ////
74 : //#include "libaddr/addr_parser.h"
75 :
76 :
77 : // C++ lib
78 : //
79 : #include <algorithm>
80 : //#include <limits>
81 : //#include <atomic>
82 :
83 : // C lib
84 : //
85 : //#include <fcntl.h>
86 : #include <poll.h>
87 : //#include <unistd.h>
88 : //#include <sys/eventfd.h>
89 : //#include <sys/inotify.h>
90 : //#include <sys/ioctl.h>
91 : #include <sys/resource.h>
92 : //#include <sys/syscall.h>
93 : //#include <sys/time.h>
94 :
95 :
96 : // last include
97 : //
98 : #include <snapdev/poison.h>
99 :
100 :
101 :
102 : namespace ed
103 : {
104 : namespace
105 : {
106 :
107 :
108 : /** \brief The instance of the communicator singleton.
109 : *
110 : * This pointer is the one instance of the communicator
111 : * we create to run an event loop.
112 : */
113 : communicator::pointer_t * g_instance = nullptr;
114 :
115 :
116 : } // no name namespace
117 :
118 :
119 :
120 :
121 :
122 : /** \brief Initialize a snap communicator object.
123 : *
124 : * This function initializes the communicator object.
125 : */
126 0 : communicator::communicator()
127 : //: f_connections() -- auto-init
128 : //, f_force_sort(true) -- auto-init
129 : {
130 0 : }
131 :
132 :
133 : /** \brief Retrieve the instance() of the communicator.
134 : *
135 : * This function returns the instance of the communicator.
136 : * There is really no reason and it could also create all sorts
137 : * of problems to have more than one instance hence we created
138 : * the communicator as a singleton. It also means you cannot
139 : * actually delete the communicator.
140 : *
141 : * The initialization fo the communicator instance is thread
142 : * safe.
143 : */
144 0 : communicator::pointer_t communicator::instance()
145 : {
146 0 : cppthread::guard g(*cppthread::g_system_mutex);
147 :
148 0 : if(g_instance == nullptr)
149 : {
150 0 : g_instance = new communicator::pointer_t();
151 0 : g_instance->reset(new communicator());
152 : }
153 :
154 0 : return *g_instance;
155 : }
156 :
157 :
158 : /** \brief Retrieve a reference to the vector of connections.
159 : *
160 : * This function returns a reference to all the connections that are
161 : * currently attached to the communicator system.
162 : *
163 : * This is useful to search the array.
164 : *
165 : * \return The vector of connections.
166 : */
167 0 : connection::vector_t const & communicator::get_connections() const
168 : {
169 0 : return f_connections;
170 : }
171 :
172 :
173 : /** \brief Attach a connection to the communicator.
174 : *
175 : * This function attaches a connection to the communicator. This allows
176 : * us to execute code for that connection by having the process_signal()
177 : * function called.
178 : *
179 : * Connections are kept in the order in which they are added. This may
180 : * change the order in which connection callbacks are called. However,
181 : * events are received asynchronously so do not expect callbacks to be
182 : * called in any specific order.
183 : *
184 : * You may call this function with a null pointer. It simply returns
185 : * false immediately. This makes it easy to eventually allocate a
186 : * new connection and then use the return value of this function
187 : * to know whether the two step process worked or not.
188 : *
189 : * \note
190 : * A connection can only be added once to a communicator object.
191 : * Also it cannot be shared between multiple communicator objects.
192 : *
193 : * \param[in] connection The connection being added.
194 : *
195 : * \return true if the connection was added, false if the connection
196 : * was already present in the communicator list of connections.
197 : */
198 0 : bool communicator::add_connection(connection::pointer_t connection)
199 : {
200 0 : if(connection == nullptr)
201 : {
202 0 : return false;
203 : }
204 :
205 0 : if(!connection->valid_socket())
206 : {
207 : throw event_dispatcher_invalid_parameter(
208 : "communicator::add_connection(): connection without a socket"
209 0 : " cannot be added to a communicator object.");
210 : }
211 :
212 0 : auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
213 0 : if(it != f_connections.end())
214 : {
215 : // already added, can be added only once but we allow multiple
216 : // calls (however, we do not count those calls, so first call
217 : // to the remove_connection() does remove it!)
218 : //
219 0 : return false;
220 : }
221 :
222 0 : f_connections.push_back(connection);
223 :
224 0 : connection->connection_added();
225 :
226 0 : return true;
227 : }
228 :
229 :
230 : /** \brief Remove a connection from a communicator object.
231 : *
232 : * This function removes a connection from this communicator object.
233 : * Note that any one connection can only be added once.
234 : *
235 : * \param[in] connection The connection to remove from this communicator.
236 : *
237 : * \return true if the connection was removed, false if it was not found.
238 : */
239 0 : bool communicator::remove_connection(connection::pointer_t connection)
240 : {
241 0 : auto it(std::find(f_connections.begin(), f_connections.end(), connection));
242 0 : if(it == f_connections.end())
243 : {
244 0 : return false;
245 : }
246 :
247 : SNAP_LOG_TRACE
248 0 : << "removing 1 connection, \""
249 0 : << connection->get_name()
250 0 : << "\", of "
251 0 : << f_connections.size()
252 0 : << " connections (including this one.)";
253 :
254 0 : f_connections.erase(it);
255 :
256 0 : connection->connection_removed();
257 :
258 : #if 0
259 : #ifdef _DEBUG
260 : std::for_each(
261 : f_connections.begin()
262 : , f_connections.end()
263 : , [](auto const & c)
264 : {
265 : SNAP_LOG_TRACE("communicator::remove_connection(): remaining connection: \"")(c->get_name())("\"");
266 : });
267 : #endif
268 : #endif
269 :
270 0 : return true;
271 : }
272 :
273 :
274 : /** \brief Set the Force Sort flag to \p status.
275 : *
276 : * This function can be called to force the run() function to sort (or not
277 : * sort) the list of connections.
278 : *
279 : * Since the sort function is somewhat expensive, the sort changes the
280 : * vector of connections in place. Then only a change of priority
281 : * triggers a request for the vector to be sorted again.
282 : *
283 : * This function can be used in the event you need to force a trigger.
284 : * It would be unlikely that you would call this function with false.
285 : *
286 : * \param[in] status The new status of the force sort flag.
287 : */
288 0 : void communicator::set_force_sort(bool status)
289 : {
290 0 : f_force_sort = status;
291 0 : }
292 :
293 :
294 : /** \brief Run until all connections are removed.
295 : *
296 : * This function "blocks" until all the events added to this
297 : * communicator instance are removed. Until then, it
298 : * wakes up and run callback functions whenever an event occurs.
299 : *
300 : * In other words, you want to add_connection() before you call
301 : * this function otherwise the function returns immediately.
302 : *
303 : * Note that you can include timeout events so if you need to
304 : * run some code once in a while, you may just use a timeout
305 : * event and process your repetitive events that way.
306 : *
307 : * \return true if the loop exits because the list of connections is empty.
308 : */
309 0 : bool communicator::run()
310 : {
311 : // the loop promises to exit once the even_base object has no
312 : // more connections attached to it
313 : //
314 0 : std::vector<bool> enabled;
315 0 : std::vector<struct pollfd> fds;
316 0 : f_force_sort = true;
317 0 : for(;;)
318 : {
319 : // any connections?
320 0 : if(f_connections.empty())
321 : {
322 0 : return true;
323 : }
324 :
325 0 : if(f_force_sort)
326 : {
327 : // sort the connections by priority
328 : //
329 0 : std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
330 0 : f_force_sort = false;
331 : }
332 :
333 : // make a copy because the callbacks may end up making
334 : // changes to the main list and we would have problems
335 : // with that here...
336 : //
337 0 : connection::vector_t connections(f_connections);
338 0 : size_t max_connections(connections.size());
339 :
340 : // timeout is do not time out by default
341 : //
342 0 : std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
343 :
344 : // clear() is not supposed to delete the buffer of vectors
345 : //
346 0 : enabled.clear();
347 0 : fds.clear();
348 0 : fds.reserve(max_connections); // avoid more than 1 allocation
349 0 : for(size_t idx(0); idx < max_connections; ++idx)
350 : {
351 0 : connection::pointer_t c(connections[idx]);
352 0 : c->f_fds_position = -1;
353 :
354 : // is the connection enabled?
355 : //
356 : // note that we save that value for later use in our loop
357 : // below because otherwise we will miss many events and
358 : // it tends to break things; that means you may get your
359 : // callback called even while disabled
360 : //
361 0 : enabled.push_back(c->is_enabled());
362 0 : if(!enabled[idx])
363 : {
364 : //SNAP_LOG_TRACE("communicator::run(): connection '")(c->get_name())("' has been disabled, so ignored.");
365 0 : continue;
366 : }
367 : //SNAP_LOG_TRACE("communicator::run(): handling connection ")(idx)("/")(max_connections)(". '")(c->get_name())("' since it is enabled...");
368 :
369 : // check whether a timeout is defined in this connection
370 : //
371 0 : int64_t const timestamp(c->save_timeout_timestamp());
372 0 : if(timestamp != -1)
373 : {
374 : // the timeout event gives us a time when to tick
375 : //
376 0 : if(timestamp < next_timeout_timestamp)
377 : {
378 0 : next_timeout_timestamp = timestamp;
379 : }
380 : }
381 :
382 : // is there any events to listen on?
383 0 : int e(0);
384 0 : if(c->is_listener() || c->is_signal())
385 : {
386 0 : e |= POLLIN;
387 : }
388 0 : if(c->is_reader())
389 : {
390 0 : e |= POLLIN | POLLPRI | POLLRDHUP;
391 : }
392 0 : if(c->is_writer())
393 : {
394 0 : e |= POLLOUT | POLLRDHUP;
395 : }
396 0 : if(e == 0)
397 : {
398 : // this should only happend on snap_timer objects
399 : //
400 0 : continue;
401 : }
402 :
403 : // do we have a currently valid socket? (i.e. the connection
404 : // may have been closed or we may be handling a timer or
405 : // signal object)
406 : //
407 0 : if(c->get_socket() < 0)
408 : {
409 0 : continue;
410 : }
411 :
412 : // this is considered valid, add this connection to the list
413 : //
414 : // save the position since we may skip some entries...
415 : // (otherwise we would have to use -1 as the socket to
416 : // allow for such dead entries, but avoiding such entries
417 : // saves time)
418 : //
419 0 : c->f_fds_position = fds.size();
420 :
421 : //SNAP_LOG_ERROR("*** still waiting on \"")(c->get_name())("\".");
422 : struct pollfd fd;
423 0 : fd.fd = c->get_socket();
424 0 : fd.events = e;
425 0 : fd.revents = 0; // probably useless... (kernel should clear those)
426 0 : fds.push_back(fd);
427 : }
428 :
429 : // compute the right timeout
430 0 : std::int64_t timeout(-1);
431 0 : if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
432 : {
433 0 : std::int64_t const now(get_current_date());
434 0 : timeout = next_timeout_timestamp - now;
435 0 : if(timeout < 0)
436 : {
437 : // timeout is in the past so timeout immediately, but
438 : // still check for events if any
439 0 : timeout = 0;
440 : }
441 : else
442 : {
443 : // convert microseconds to milliseconds for poll()
444 0 : timeout /= 1000;
445 0 : if(timeout == 0)
446 : {
447 : // less than one is a waste of time (CPU intenssive
448 : // until the time is reached, we can be 1 ms off
449 : // instead...)
450 0 : timeout = 1;
451 : }
452 : }
453 : }
454 0 : else if(fds.empty())
455 : {
456 : SNAP_LOG_FATAL
457 0 : << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
458 0 : << max_connections
459 0 : << " and exiting the run() loop anyway.)";
460 0 : return false;
461 : }
462 :
463 : //SNAP_LOG_TRACE << "communicator::run(): "
464 : // << "count " << fds.size()
465 : // << "timeout " << timeout
466 : // << " (next was: " << next_timeout_timestamp
467 : // << ", current ~ " << get_current_date()
468 : // << ")";
469 :
470 : // TODO: add support for ppoll() so we can support signals cleanly
471 : // with nearly no additional work from us
472 : //
473 0 : errno = 0;
474 0 : int const r(poll(&fds[0], fds.size(), timeout));
475 0 : if(r >= 0)
476 : {
477 : // quick sanity check
478 : //
479 0 : if(static_cast<size_t>(r) > connections.size())
480 : {
481 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
482 : }
483 : //SNAP_LOG_TRACE
484 : // <<"tid="
485 : // << gettid()
486 : // << ", communicator::run(): ------------------- new set of "
487 : // << r
488 : // << " events to handle";
489 :
490 : // check each connection one by one for:
491 : //
492 : // 1) fds events, including signals
493 : // 2) timeouts
494 : //
495 : // and execute the corresponding callbacks
496 : //
497 0 : for(size_t idx(0); idx < connections.size(); ++idx)
498 : {
499 0 : connection::pointer_t c(connections[idx]);
500 :
501 : // is the connection enabled?
502 : //
503 : // note that we check whether that connection was enabled
504 : // before poll() was called; this is very important because
505 : // the last poll() events must be run even if a previous
506 : // callback call just disabled this very connection
507 : // (i.e. at the time we called poll() the connection was
508 : // still enabled and therefore we are expected to call
509 : // their callbacks even if it just got disabled by an
510 : // earlier callback)
511 : //
512 0 : if(!enabled[idx])
513 : {
514 : //SNAP_LOG_TRACE
515 : // << "communicator::run(): in loop, connection '"
516 : // << c->get_name()
517 : // << "' has been disabled, so ignored!";
518 0 : continue;
519 : }
520 :
521 : // if we have a valid fds position then an event other
522 : // than a timeout occurred on that connection
523 : //
524 0 : if(c->f_fds_position >= 0)
525 : {
526 0 : struct pollfd * fd(&fds[c->f_fds_position]);
527 :
528 : // if any events were found by poll(), process them now
529 : //
530 0 : if(fd->revents != 0)
531 : {
532 : // an event happened on this one
533 : //
534 0 : if((fd->revents & (POLLIN | POLLPRI)) != 0)
535 : {
536 : // we consider that Unix signals have the greater priority
537 : // and thus handle them first
538 : //
539 0 : if(c->is_signal())
540 : {
541 0 : signal * ss(dynamic_cast<signal *>(c.get()));
542 0 : if(ss)
543 : {
544 0 : ss->process();
545 : }
546 : }
547 0 : else if(c->is_listener())
548 : {
549 : // a listener is a special case and we want
550 : // to call process_accept() instead
551 : //
552 0 : c->process_accept();
553 : }
554 : else
555 : {
556 0 : c->process_read();
557 : }
558 : }
559 0 : if((fd->revents & POLLOUT) != 0)
560 : {
561 0 : c->process_write();
562 : }
563 0 : if((fd->revents & POLLERR) != 0)
564 : {
565 0 : c->process_error();
566 : }
567 0 : if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
568 : {
569 0 : c->process_hup();
570 : }
571 0 : if((fd->revents & POLLNVAL) != 0)
572 : {
573 0 : c->process_invalid();
574 : }
575 : }
576 : }
577 :
578 : // now check whether we have a timeout on this connection
579 : //
580 0 : int64_t const timestamp(c->get_saved_timeout_timestamp());
581 0 : if(timestamp != -1)
582 : {
583 0 : int64_t const now(get_current_date());
584 0 : if(now >= timestamp)
585 : {
586 : //SNAP_LOG_TRACE
587 : // << "communicator::run(): timer of connection = '"<< c->get_name()
588 : // << "', timestamp = " << timestamp
589 : // << ", now = " << now
590 : // << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE");
591 :
592 : // move the timeout as required first
593 : // (because the callback may move it again)
594 : //
595 0 : c->calculate_next_tick();
596 :
597 : // the timeout date needs to be reset if the tick
598 : // happened for that date
599 : //
600 0 : if(now >= c->get_timeout_date())
601 : {
602 0 : c->set_timeout_date(-1);
603 : }
604 :
605 : // then run the callback
606 : //
607 0 : c->process_timeout();
608 : }
609 : }
610 : }
611 : }
612 : else
613 : {
614 : // r < 0 means an error occurred
615 : //
616 0 : if(errno == EINTR)
617 : {
618 : // Note: if the user wants to prevent this error, he should
619 : // use the snap_signal with the Unix signals that may
620 : // happen while calling poll().
621 : //
622 0 : throw event_dispatcher_runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
623 : }
624 0 : if(errno == EFAULT)
625 : {
626 0 : throw event_dispatcher_invalid_parameter("communicator::run(): buffer was moved out of our address space?");
627 : }
628 0 : if(errno == EINVAL)
629 : {
630 : // if this is really because nfds is too large then it may be
631 : // a "soft" error that can be fixed; that being said, my
632 : // current version is 16K files which frankly when we reach
633 : // that level we have a problem...
634 : //
635 : struct rlimit rl;
636 0 : getrlimit(RLIMIT_NOFILE, &rl);
637 : throw event_dispatcher_invalid_parameter(
638 : "communicator::run(): too many file fds for poll, limit is currently "
639 0 : + std::to_string(rl.rlim_cur)
640 0 : + ", your kernel top limit is "
641 0 : + std::to_string(rl.rlim_max));
642 : }
643 0 : if(errno == ENOMEM)
644 : {
645 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() failed because of memory");
646 : }
647 0 : int const e(errno);
648 : throw event_dispatcher_runtime_error(
649 : "communicator::run(): poll() failed with error "
650 0 : + std::to_string(e)
651 0 : + " -- "
652 0 : + strerror(e));
653 : }
654 : }
655 : }
656 :
657 :
658 :
659 6 : } // namespace ed
660 : // vim: ts=4 sw=4 et
|