Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/communicator.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 : #include "eventdispatcher/signal.h"
54 : #include "eventdispatcher/utils.h"
55 :
56 :
57 : // cppthread lib
58 : //
59 : #include <cppthread/guard.h>
60 : #include <cppthread/mutex.h>
61 : #include <cppthread/thread.h>
62 :
63 :
64 : // snaplogger lib
65 : //
66 : #include <snaplogger/message.h>
67 :
68 :
69 : // C++ lib
70 : //
71 : #include <algorithm>
72 : #include <cstring>
73 :
74 :
75 : // C lib
76 : //
77 : #include <poll.h>
78 : #include <sys/resource.h>
79 :
80 :
81 : // last include
82 : //
83 : #include <snapdev/poison.h>
84 :
85 :
86 :
87 : namespace ed
88 : {
89 : namespace
90 : {
91 :
92 :
93 : /** \brief The instance of the communicator singleton.
94 : *
95 : * This pointer is the one instance of the communicator
96 : * we create to run an event loop.
97 : */
98 : communicator::pointer_t * g_instance = nullptr;
99 :
100 :
101 : } // no name namespace
102 :
103 :
104 :
105 :
106 :
107 : /** \brief Initialize a communicator object.
108 : *
109 : * This function initializes the communicator object.
110 : */
111 1 : communicator::communicator()
112 : {
113 1 : }
114 :
115 :
116 : /** \brief Retrieve the instance() of the communicator.
117 : *
118 : * This function returns the instance of the communicator.
119 : * There is really no reason and it could also create all sorts
120 : * of problems to have more than one instance hence we created
121 : * the communicator as a singleton. It also means you cannot
122 : * actually delete the communicator.
123 : *
124 : * The initialization of the communicator instance is thread
125 : * safe.
126 : */
127 14 : communicator::pointer_t communicator::instance()
128 : {
129 28 : cppthread::guard g(*cppthread::g_system_mutex);
130 :
131 14 : if(g_instance == nullptr)
132 : {
133 : // `communicator` constructor is private so we can't use
134 : // the std::make_shared<>
135 : //
136 1 : g_instance = new communicator::pointer_t();
137 1 : g_instance->reset(new communicator());
138 : }
139 :
140 28 : return *g_instance;
141 : }
142 :
143 :
144 : /** \brief Retrieve a reference to the vector of connections.
145 : *
146 : * This function returns a reference to all the connections that are
147 : * currently attached to the communicator system.
148 : *
149 : * This is useful to search the array.
150 : *
151 : * \return The vector of connections.
152 : */
153 0 : connection::vector_t const & communicator::get_connections() const
154 : {
155 0 : return f_connections;
156 : }
157 :
158 :
159 : /** \brief Attach a connection to the communicator.
160 : *
161 : * This function attaches a connection to the communicator. This allows
162 : * us to execute code for that connection by having the process_signal()
163 : * function called.
164 : *
165 : * Connections are kept in the order in which they are added. This may
166 : * change the order in which connection callbacks are called. However,
167 : * events are received asynchronously so do not expect callbacks to be
168 : * called in any specific order.
169 : *
170 : * You may call this function with a null pointer. It simply returns
171 : * false immediately. This makes it easy to eventually allocate a
172 : * new connection and then use the return value of this function
173 : * to know whether the two step process worked or not.
174 : *
175 : * \note
176 : * A connection can only be added once to a communicator object.
177 : * Also it cannot be shared between multiple communicator objects.
178 : *
179 : * \param[in] connection The connection being added.
180 : *
181 : * \return true if the connection was added, false if the connection
182 : * was already present in the communicator list of connections.
183 : */
184 7 : bool communicator::add_connection(connection::pointer_t connection)
185 : {
186 7 : if(connection == nullptr)
187 : {
188 0 : return false;
189 : }
190 :
191 7 : if(!connection->valid_socket())
192 : {
193 : throw event_dispatcher_invalid_parameter(
194 : "communicator::add_connection(): connection without a socket"
195 0 : " cannot be added to a communicator object.");
196 : }
197 :
198 7 : auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
199 7 : if(it != f_connections.end())
200 : {
201 : // already added, can be added only once but we allow multiple
202 : // calls (however, we do not count those calls, so first call
203 : // to the remove_connection() does remove it!)
204 : //
205 0 : return false;
206 : }
207 :
208 7 : f_connections.push_back(connection);
209 :
210 7 : connection->connection_added();
211 :
212 7 : return true;
213 : }
214 :
215 :
216 : /** \brief Remove a connection from a communicator object.
217 : *
218 : * This function removes a connection from this communicator object.
219 : * Note that any one connection can only be added once.
220 : *
221 : * \param[in] connection The connection to remove from this communicator.
222 : *
223 : * \return true if the connection was removed, false if it was not found.
224 : */
225 9 : bool communicator::remove_connection(connection::pointer_t connection)
226 : {
227 9 : auto it(std::find(f_connections.begin(), f_connections.end(), connection));
228 9 : if(it == f_connections.end())
229 : {
230 2 : return false;
231 : }
232 :
233 14 : SNAP_LOG_TRACE
234 7 : << "removing 1 connection, \""
235 7 : << connection->get_name()
236 7 : << "\", of "
237 7 : << f_connections.size()
238 : << " connections (including this one.)"
239 : << SNAP_LOG_SEND;
240 :
241 7 : f_connections.erase(it);
242 :
243 7 : connection->connection_removed();
244 :
245 : #if 0
246 : #ifdef _DEBUG
247 : std::for_each(
248 : f_connections.begin()
249 : , f_connections.end()
250 : , [](auto const & c)
251 : {
252 : SNAP_LOG_TRACE
253 : << "communicator::remove_connection(): remaining connection: \""
254 : << c->get_name()
255 : << "\""
256 : << SNAP_LOG_SEND;
257 : });
258 : #endif
259 : #endif
260 :
261 7 : return true;
262 : }
263 :
264 :
265 : /** \brief Set the Force Sort flag to \p status.
266 : *
267 : * This function can be called to force the run() function to sort (or not
268 : * sort) the list of connections.
269 : *
270 : * Since the sort function is somewhat expensive, the sort changes the
271 : * vector of connections in place. Then only a change of priority
272 : * triggers a request for the vector to be sorted again.
273 : *
274 : * This function can be used in the event you need to force a trigger.
275 : * It would be unlikely that you would call this function with false.
276 : *
277 : * \param[in] status The new status of the force sort flag.
278 : */
279 0 : void communicator::set_force_sort(bool status)
280 : {
281 0 : f_force_sort = status;
282 0 : }
283 :
284 :
285 : /** \brief Run until all connections are removed.
286 : *
287 : * This function "blocks" until all the events added to this
288 : * communicator instance are removed. Until then, it
289 : * wakes up and run callback functions whenever an event occurs.
290 : *
291 : * In other words, you want to add_connection() before you call
292 : * this function otherwise the function returns immediately.
293 : *
294 : * Note that you can include timeout events so if you need to
295 : * run some code once in a while, you may just use a timeout
296 : * event and process your repetitive events that way.
297 : *
298 : * \return true if the loop exits because the list of connections is empty.
299 : */
300 2 : bool communicator::run()
301 : {
302 : // the loop promises to exit once the even_base object has no
303 : // more connections attached to it
304 : //
305 4 : std::vector<bool> enabled;
306 4 : std::vector<struct pollfd> fds;
307 2 : f_force_sort = true;
308 : for(;;)
309 : {
310 : // any connections?
311 14 : if(f_connections.empty())
312 : {
313 2 : return true;
314 : }
315 :
316 12 : if(f_force_sort)
317 : {
318 : // sort the connections by priority
319 : //
320 2 : std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
321 2 : f_force_sort = false;
322 : }
323 :
324 : // make a copy because the callbacks may end up making
325 : // changes to the main list and we would have problems
326 : // with that here...
327 : //
328 24 : connection::vector_t connections(f_connections);
329 12 : size_t max_connections(connections.size());
330 :
331 : // timeout is do not time out by default
332 : //
333 12 : std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
334 :
335 : // clear() is not supposed to delete the buffer of vectors
336 : //
337 12 : enabled.clear();
338 12 : fds.clear();
339 12 : fds.reserve(max_connections); // avoid more than 1 allocation
340 48 : for(size_t idx(0); idx < max_connections; ++idx)
341 : {
342 64 : connection::pointer_t c(connections[idx]);
343 36 : c->f_fds_position = -1;
344 :
345 : // is the connection enabled?
346 : //
347 : // note that we save that value for later use in our loop
348 : // below because otherwise we will miss many events and
349 : // it tends to break things; that means you may get your
350 : // callback called even while disabled
351 : //
352 36 : enabled.push_back(c->is_enabled());
353 36 : if(!enabled[idx])
354 : {
355 : //SNAP_LOG_TRACE
356 : // << "communicator::run(): connection '"
357 : // << c->get_name()
358 : // << "' has been disabled, so ignored."
359 : // << SNAP_LOG_SEND;
360 7 : continue;
361 : }
362 : //SNAP_LOG_TRACE
363 : // << "communicator::run(): handling connection "
364 : // << idx
365 : // << "/"
366 : // << max_connections
367 : // << ". '"
368 : // << c->get_name()
369 : // << "' since it is enabled..."
370 : // << SNAP_LOG_SEND;
371 :
372 : // check whether a timeout is defined in this connection
373 : //
374 29 : int64_t const timestamp(c->save_timeout_timestamp());
375 29 : if(timestamp != -1)
376 : {
377 : // the timeout event gives us a time when to tick
378 : //
379 1 : if(timestamp < next_timeout_timestamp)
380 : {
381 1 : next_timeout_timestamp = timestamp;
382 : }
383 : }
384 :
385 : // is there any events to listen on?
386 29 : int e(0);
387 29 : if(c->is_listener() || c->is_signal())
388 : {
389 9 : e |= POLLIN;
390 : }
391 29 : if(c->is_reader())
392 : {
393 19 : e |= POLLIN | POLLPRI | POLLRDHUP;
394 : }
395 29 : if(c->is_writer())
396 : {
397 3 : e |= POLLOUT | POLLRDHUP;
398 : }
399 29 : if(e == 0)
400 : {
401 : // this should only happen on timer objects
402 : //
403 1 : continue;
404 : }
405 :
406 : // do we have a currently valid socket? (i.e. the connection
407 : // may have been closed or we may be handling a timer or
408 : // signal object)
409 : //
410 28 : if(c->get_socket() < 0)
411 : {
412 0 : continue;
413 : }
414 :
415 : // this is considered valid, add this connection to the list
416 : //
417 : // save the position since we may skip some entries...
418 : // (otherwise we would have to use -1 as the socket to
419 : // allow for such dead entries, but avoiding such entries
420 : // saves time)
421 : //
422 28 : c->f_fds_position = fds.size();
423 :
424 : //SNAP_LOG_ERROR
425 : // << "*** still waiting on \""
426 : // << c->get_name()
427 : // << "\"."
428 : // << SNAP_LOG_SEND;
429 :
430 28 : struct pollfd fd;
431 28 : fd.fd = c->get_socket();
432 28 : fd.events = e;
433 28 : fd.revents = 0; // probably useless... (kernel should clear those)
434 28 : fds.push_back(fd);
435 : }
436 :
437 : // compute the right timeout
438 12 : std::int64_t timeout(-1);
439 12 : if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
440 : {
441 1 : std::int64_t const now(get_current_date());
442 1 : timeout = next_timeout_timestamp - now;
443 1 : if(timeout < 0)
444 : {
445 : // timeout is in the past so timeout immediately, but
446 : // still check for events if any
447 1 : timeout = 0;
448 : }
449 : else
450 : {
451 : // convert microseconds to milliseconds for poll()
452 0 : timeout /= 1000;
453 0 : if(timeout == 0)
454 : {
455 : // less than one is a waste of time (CPU intensive
456 : // until the time is reached, we can be 1 ms off
457 : // instead...)
458 0 : timeout = 1;
459 : }
460 : }
461 : }
462 11 : else if(fds.empty())
463 : {
464 0 : SNAP_LOG_FATAL
465 0 : << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
466 0 : << max_connections
467 : << " and exiting the run() loop anyway.)"
468 : << SNAP_LOG_SEND;
469 0 : return false;
470 : }
471 :
472 : //SNAP_LOG_TRACE << "communicator::run(): ready to poll(); "
473 : // << "count " << fds.size()
474 : // << " timeout " << timeout
475 : // << " (next was: " << next_timeout_timestamp
476 : // << ", current ~ " << get_current_date()
477 : // << ")"
478 : // << SNAP_LOG_SEND;
479 :
480 : // TODO: add support for ppoll() so we can support signals cleanly
481 : // with nearly no additional work from us
482 : //
483 12 : errno = 0;
484 12 : int const r(poll(fds.empty() ? nullptr : &fds[0], fds.size(), timeout));
485 12 : if(r >= 0)
486 : {
487 : // quick sanity check
488 : //
489 12 : if(static_cast<size_t>(r) > connections.size())
490 : {
491 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
492 : }
493 : //SNAP_LOG_TRACE
494 : // <<"tid="
495 : // << cppthread::gettid()
496 : // << ", communicator::run(): ------------------- new set of "
497 : // << r
498 : // << " events to handle"
499 : // << SNAP_LOG_SEND;
500 :
501 : // check each connection one by one for:
502 : //
503 : // 1) fds events, including signals
504 : // 2) timeouts
505 : //
506 : // and execute the corresponding callbacks
507 : //
508 48 : for(size_t idx(0); idx < connections.size(); ++idx)
509 : {
510 65 : connection::pointer_t c(connections[idx]);
511 :
512 : // is the connection enabled?
513 : //
514 : // note that we check whether that connection was enabled
515 : // before poll() was called; this is very important because
516 : // the last poll() events must be run even if a previous
517 : // callback call just disabled this very connection
518 : // (i.e. at the time we called poll() the connection was
519 : // still enabled and therefore we are expected to call
520 : // their callbacks even if it just got disabled by an
521 : // earlier callback)
522 : //
523 36 : if(!enabled[idx])
524 : {
525 : //SNAP_LOG_TRACE
526 : // << "communicator::run(): in loop, connection '"
527 : // << c->get_name()
528 : // << "' has been disabled, so ignored!"
529 : // << SNAP_LOG_SEND;
530 7 : continue;
531 : }
532 :
533 : // if we have a valid fds position then an event other
534 : // than a timeout occurred on that connection
535 : //
536 29 : if(c->f_fds_position >= 0)
537 : {
538 28 : struct pollfd * fd(&fds[c->f_fds_position]);
539 :
540 : // if any events were found by poll(), process them now
541 : //
542 : //SNAP_LOG_TRACE
543 : // <<"tid="
544 : // << cppthread::gettid()
545 : // << ", communicator::run(): events for "
546 : // << c->get_name()
547 : // << " = "
548 : // << fd->revents
549 : // << SNAP_LOG_SEND;
550 28 : if(fd->revents != 0)
551 : {
552 : // an event happened on this one
553 : //
554 11 : if((fd->revents & (POLLIN | POLLPRI)) != 0)
555 : {
556 : // we consider that Unix signals have the greater priority
557 : // and thus handle them first
558 : //
559 8 : if(c->is_signal())
560 : {
561 0 : signal * ss(dynamic_cast<signal *>(c.get()));
562 0 : if(ss != nullptr)
563 : {
564 0 : ss->process();
565 : }
566 : }
567 8 : else if(c->is_listener())
568 : {
569 : // a listener is a special case and we want
570 : // to call process_accept() instead
571 : //
572 1 : c->process_accept();
573 : }
574 : else
575 : {
576 7 : c->process_read();
577 : }
578 : }
579 11 : if((fd->revents & POLLOUT) != 0)
580 : {
581 3 : c->process_write();
582 : }
583 11 : if((fd->revents & POLLERR) != 0)
584 : {
585 0 : c->process_error();
586 : }
587 11 : if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
588 : {
589 1 : c->process_hup();
590 : }
591 11 : if((fd->revents & POLLNVAL) != 0)
592 : {
593 0 : c->process_invalid();
594 : }
595 : }
596 : }
597 :
598 : // now check whether we have a timeout on this connection
599 : //
600 29 : int64_t const timestamp(c->get_saved_timeout_timestamp());
601 29 : if(timestamp != -1)
602 : {
603 1 : int64_t const now(get_current_date());
604 1 : if(now >= timestamp)
605 : {
606 : //SNAP_LOG_TRACE
607 : // << "communicator::run(): timer of connection = '"<< c->get_name()
608 : // << "', timestamp = " << timestamp
609 : // << ", now = " << now
610 : // << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE")
611 : // << SNAP_LOG_SEND;
612 :
613 : // move the timeout as required first
614 : // (because the callback may move it again)
615 : //
616 1 : c->calculate_next_tick();
617 :
618 : // the timeout date needs to be reset if the tick
619 : // happened for that date
620 : //
621 1 : if(now >= c->get_timeout_date())
622 : {
623 1 : c->set_timeout_date(-1);
624 : }
625 :
626 : // then run the callback
627 : //
628 1 : c->process_timeout();
629 : }
630 : }
631 : }
632 : }
633 : else
634 : {
635 : // r < 0 means an error occurred
636 : //
637 0 : if(errno == EINTR)
638 : {
639 : // Note: if the user wants to prevent this error, he should
640 : // use the signal with the Unix signals that may
641 : // happen while calling poll().
642 : //
643 0 : throw event_dispatcher_runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
644 : }
645 0 : if(errno == EFAULT)
646 : {
647 0 : throw event_dispatcher_invalid_parameter("communicator::run(): buffer was moved out of our address space?");
648 : }
649 0 : if(errno == EINVAL)
650 : {
651 : // if this is really because nfds is too large then it may be
652 : // a "soft" error that can be fixed; that being said, my
653 : // current version is 16K files which frankly when we reach
654 : // that level we have a problem...
655 : //
656 0 : struct rlimit rl;
657 0 : getrlimit(RLIMIT_NOFILE, &rl);
658 : throw event_dispatcher_invalid_parameter(
659 : "communicator::run(): too many file fds for poll, limit is currently "
660 0 : + std::to_string(rl.rlim_cur)
661 0 : + ", your kernel top limit is "
662 0 : + std::to_string(rl.rlim_max));
663 : }
664 0 : if(errno == ENOMEM)
665 : {
666 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() failed because of memory");
667 : }
668 0 : int const e(errno);
669 : throw event_dispatcher_runtime_error(
670 : "communicator::run(): poll() failed with error "
671 0 : + std::to_string(e)
672 0 : + " -- "
673 0 : + strerror(e));
674 : }
675 12 : }
676 : }
677 :
678 :
679 :
680 6 : } // namespace ed
681 : // vim: ts=4 sw=4 et
|