Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/communicator.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 : #include "eventdispatcher/signal.h"
54 : #include "eventdispatcher/utils.h"
55 :
56 :
57 : // cppthread lib
58 : //
59 : #include <cppthread/guard.h>
60 : #include <cppthread/mutex.h>
61 :
62 :
63 : // snaplogger lib
64 : //
65 : #include <snaplogger/message.h>
66 :
67 :
68 : // C++ lib
69 : //
70 : #include <algorithm>
71 :
72 :
73 : // C lib
74 : //
75 : #include <poll.h>
76 : #include <sys/resource.h>
77 :
78 :
79 : // last include
80 : //
81 : #include <snapdev/poison.h>
82 :
83 :
84 :
85 : namespace ed
86 : {
87 : namespace
88 : {
89 :
90 :
91 : /** \brief The instance of the communicator singleton.
92 : *
93 : * This pointer is the one instance of the communicator
94 : * we create to run an event loop.
95 : */
96 : communicator::pointer_t * g_instance = nullptr;
97 :
98 :
99 : } // no name namespace
100 :
101 :
102 :
103 :
104 :
105 : /** \brief Initialize a snap communicator object.
106 : *
107 : * This function initializes the communicator object.
108 : */
109 0 : communicator::communicator()
110 : //: f_connections() -- auto-init
111 : //, f_force_sort(true) -- auto-init
112 : {
113 0 : }
114 :
115 :
116 : /** \brief Retrieve the instance() of the communicator.
117 : *
118 : * This function returns the instance of the communicator.
119 : * There is really no reason and it could also create all sorts
120 : * of problems to have more than one instance hence we created
121 : * the communicator as a singleton. It also means you cannot
122 : * actually delete the communicator.
123 : *
124 : * The initialization fo the communicator instance is thread
125 : * safe.
126 : */
127 0 : communicator::pointer_t communicator::instance()
128 : {
129 0 : cppthread::guard g(*cppthread::g_system_mutex);
130 :
131 0 : if(g_instance == nullptr)
132 : {
133 0 : g_instance = new communicator::pointer_t();
134 0 : g_instance->reset(new communicator());
135 : }
136 :
137 0 : return *g_instance;
138 : }
139 :
140 :
141 : /** \brief Retrieve a reference to the vector of connections.
142 : *
143 : * This function returns a reference to all the connections that are
144 : * currently attached to the communicator system.
145 : *
146 : * This is useful to search the array.
147 : *
148 : * \return The vector of connections.
149 : */
150 0 : connection::vector_t const & communicator::get_connections() const
151 : {
152 0 : return f_connections;
153 : }
154 :
155 :
156 : /** \brief Attach a connection to the communicator.
157 : *
158 : * This function attaches a connection to the communicator. This allows
159 : * us to execute code for that connection by having the process_signal()
160 : * function called.
161 : *
162 : * Connections are kept in the order in which they are added. This may
163 : * change the order in which connection callbacks are called. However,
164 : * events are received asynchronously so do not expect callbacks to be
165 : * called in any specific order.
166 : *
167 : * You may call this function with a null pointer. It simply returns
168 : * false immediately. This makes it easy to eventually allocate a
169 : * new connection and then use the return value of this function
170 : * to know whether the two step process worked or not.
171 : *
172 : * \note
173 : * A connection can only be added once to a communicator object.
174 : * Also it cannot be shared between multiple communicator objects.
175 : *
176 : * \param[in] connection The connection being added.
177 : *
178 : * \return true if the connection was added, false if the connection
179 : * was already present in the communicator list of connections.
180 : */
181 0 : bool communicator::add_connection(connection::pointer_t connection)
182 : {
183 0 : if(connection == nullptr)
184 : {
185 0 : return false;
186 : }
187 :
188 0 : if(!connection->valid_socket())
189 : {
190 : throw event_dispatcher_invalid_parameter(
191 : "communicator::add_connection(): connection without a socket"
192 0 : " cannot be added to a communicator object.");
193 : }
194 :
195 0 : auto const it(std::find(f_connections.begin(), f_connections.end(), connection));
196 0 : if(it != f_connections.end())
197 : {
198 : // already added, can be added only once but we allow multiple
199 : // calls (however, we do not count those calls, so first call
200 : // to the remove_connection() does remove it!)
201 : //
202 0 : return false;
203 : }
204 :
205 0 : f_connections.push_back(connection);
206 :
207 0 : connection->connection_added();
208 :
209 0 : return true;
210 : }
211 :
212 :
213 : /** \brief Remove a connection from a communicator object.
214 : *
215 : * This function removes a connection from this communicator object.
216 : * Note that any one connection can only be added once.
217 : *
218 : * \param[in] connection The connection to remove from this communicator.
219 : *
220 : * \return true if the connection was removed, false if it was not found.
221 : */
222 0 : bool communicator::remove_connection(connection::pointer_t connection)
223 : {
224 0 : auto it(std::find(f_connections.begin(), f_connections.end(), connection));
225 0 : if(it == f_connections.end())
226 : {
227 0 : return false;
228 : }
229 :
230 : SNAP_LOG_TRACE
231 0 : << "removing 1 connection, \""
232 0 : << connection->get_name()
233 0 : << "\", of "
234 0 : << f_connections.size()
235 0 : << " connections (including this one.)";
236 :
237 0 : f_connections.erase(it);
238 :
239 0 : connection->connection_removed();
240 :
241 : #if 0
242 : #ifdef _DEBUG
243 : std::for_each(
244 : f_connections.begin()
245 : , f_connections.end()
246 : , [](auto const & c)
247 : {
248 : SNAP_LOG_TRACE("communicator::remove_connection(): remaining connection: \"")(c->get_name())("\"");
249 : });
250 : #endif
251 : #endif
252 :
253 0 : return true;
254 : }
255 :
256 :
257 : /** \brief Set the Force Sort flag to \p status.
258 : *
259 : * This function can be called to force the run() function to sort (or not
260 : * sort) the list of connections.
261 : *
262 : * Since the sort function is somewhat expensive, the sort changes the
263 : * vector of connections in place. Then only a change of priority
264 : * triggers a request for the vector to be sorted again.
265 : *
266 : * This function can be used in the event you need to force a trigger.
267 : * It would be unlikely that you would call this function with false.
268 : *
269 : * \param[in] status The new status of the force sort flag.
270 : */
271 0 : void communicator::set_force_sort(bool status)
272 : {
273 0 : f_force_sort = status;
274 0 : }
275 :
276 :
277 : /** \brief Run until all connections are removed.
278 : *
279 : * This function "blocks" until all the events added to this
280 : * communicator instance are removed. Until then, it
281 : * wakes up and run callback functions whenever an event occurs.
282 : *
283 : * In other words, you want to add_connection() before you call
284 : * this function otherwise the function returns immediately.
285 : *
286 : * Note that you can include timeout events so if you need to
287 : * run some code once in a while, you may just use a timeout
288 : * event and process your repetitive events that way.
289 : *
290 : * \return true if the loop exits because the list of connections is empty.
291 : */
292 0 : bool communicator::run()
293 : {
294 : // the loop promises to exit once the even_base object has no
295 : // more connections attached to it
296 : //
297 0 : std::vector<bool> enabled;
298 0 : std::vector<struct pollfd> fds;
299 0 : f_force_sort = true;
300 0 : for(;;)
301 : {
302 : // any connections?
303 0 : if(f_connections.empty())
304 : {
305 0 : return true;
306 : }
307 :
308 0 : if(f_force_sort)
309 : {
310 : // sort the connections by priority
311 : //
312 0 : std::stable_sort(f_connections.begin(), f_connections.end(), connection::compare);
313 0 : f_force_sort = false;
314 : }
315 :
316 : // make a copy because the callbacks may end up making
317 : // changes to the main list and we would have problems
318 : // with that here...
319 : //
320 0 : connection::vector_t connections(f_connections);
321 0 : size_t max_connections(connections.size());
322 :
323 : // timeout is do not time out by default
324 : //
325 0 : std::int64_t next_timeout_timestamp(std::numeric_limits<std::int64_t>::max());
326 :
327 : // clear() is not supposed to delete the buffer of vectors
328 : //
329 0 : enabled.clear();
330 0 : fds.clear();
331 0 : fds.reserve(max_connections); // avoid more than 1 allocation
332 0 : for(size_t idx(0); idx < max_connections; ++idx)
333 : {
334 0 : connection::pointer_t c(connections[idx]);
335 0 : c->f_fds_position = -1;
336 :
337 : // is the connection enabled?
338 : //
339 : // note that we save that value for later use in our loop
340 : // below because otherwise we will miss many events and
341 : // it tends to break things; that means you may get your
342 : // callback called even while disabled
343 : //
344 0 : enabled.push_back(c->is_enabled());
345 0 : if(!enabled[idx])
346 : {
347 : //SNAP_LOG_TRACE("communicator::run(): connection '")(c->get_name())("' has been disabled, so ignored.");
348 0 : continue;
349 : }
350 : //SNAP_LOG_TRACE("communicator::run(): handling connection ")(idx)("/")(max_connections)(". '")(c->get_name())("' since it is enabled...");
351 :
352 : // check whether a timeout is defined in this connection
353 : //
354 0 : int64_t const timestamp(c->save_timeout_timestamp());
355 0 : if(timestamp != -1)
356 : {
357 : // the timeout event gives us a time when to tick
358 : //
359 0 : if(timestamp < next_timeout_timestamp)
360 : {
361 0 : next_timeout_timestamp = timestamp;
362 : }
363 : }
364 :
365 : // is there any events to listen on?
366 0 : int e(0);
367 0 : if(c->is_listener() || c->is_signal())
368 : {
369 0 : e |= POLLIN;
370 : }
371 0 : if(c->is_reader())
372 : {
373 0 : e |= POLLIN | POLLPRI | POLLRDHUP;
374 : }
375 0 : if(c->is_writer())
376 : {
377 0 : e |= POLLOUT | POLLRDHUP;
378 : }
379 0 : if(e == 0)
380 : {
381 : // this should only happend on snap_timer objects
382 : //
383 0 : continue;
384 : }
385 :
386 : // do we have a currently valid socket? (i.e. the connection
387 : // may have been closed or we may be handling a timer or
388 : // signal object)
389 : //
390 0 : if(c->get_socket() < 0)
391 : {
392 0 : continue;
393 : }
394 :
395 : // this is considered valid, add this connection to the list
396 : //
397 : // save the position since we may skip some entries...
398 : // (otherwise we would have to use -1 as the socket to
399 : // allow for such dead entries, but avoiding such entries
400 : // saves time)
401 : //
402 0 : c->f_fds_position = fds.size();
403 :
404 : //SNAP_LOG_ERROR("*** still waiting on \"")(c->get_name())("\".");
405 : struct pollfd fd;
406 0 : fd.fd = c->get_socket();
407 0 : fd.events = e;
408 0 : fd.revents = 0; // probably useless... (kernel should clear those)
409 0 : fds.push_back(fd);
410 : }
411 :
412 : // compute the right timeout
413 0 : std::int64_t timeout(-1);
414 0 : if(next_timeout_timestamp != std::numeric_limits<int64_t>::max())
415 : {
416 0 : std::int64_t const now(get_current_date());
417 0 : timeout = next_timeout_timestamp - now;
418 0 : if(timeout < 0)
419 : {
420 : // timeout is in the past so timeout immediately, but
421 : // still check for events if any
422 0 : timeout = 0;
423 : }
424 : else
425 : {
426 : // convert microseconds to milliseconds for poll()
427 0 : timeout /= 1000;
428 0 : if(timeout == 0)
429 : {
430 : // less than one is a waste of time (CPU intenssive
431 : // until the time is reached, we can be 1 ms off
432 : // instead...)
433 0 : timeout = 1;
434 : }
435 : }
436 : }
437 0 : else if(fds.empty())
438 : {
439 : SNAP_LOG_FATAL
440 0 : << "communicator::run(): nothing to poll() on. All connections are disabled? (Ignoring "
441 0 : << max_connections
442 0 : << " and exiting the run() loop anyway.)";
443 0 : return false;
444 : }
445 :
446 : //SNAP_LOG_TRACE << "communicator::run(): "
447 : // << "count " << fds.size()
448 : // << "timeout " << timeout
449 : // << " (next was: " << next_timeout_timestamp
450 : // << ", current ~ " << get_current_date()
451 : // << ")";
452 :
453 : // TODO: add support for ppoll() so we can support signals cleanly
454 : // with nearly no additional work from us
455 : //
456 0 : errno = 0;
457 0 : int const r(poll(&fds[0], fds.size(), timeout));
458 0 : if(r >= 0)
459 : {
460 : // quick sanity check
461 : //
462 0 : if(static_cast<size_t>(r) > connections.size())
463 : {
464 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() returned a number of events to handle larger than the input allows");
465 : }
466 : //SNAP_LOG_TRACE
467 : // <<"tid="
468 : // << gettid()
469 : // << ", communicator::run(): ------------------- new set of "
470 : // << r
471 : // << " events to handle";
472 :
473 : // check each connection one by one for:
474 : //
475 : // 1) fds events, including signals
476 : // 2) timeouts
477 : //
478 : // and execute the corresponding callbacks
479 : //
480 0 : for(size_t idx(0); idx < connections.size(); ++idx)
481 : {
482 0 : connection::pointer_t c(connections[idx]);
483 :
484 : // is the connection enabled?
485 : //
486 : // note that we check whether that connection was enabled
487 : // before poll() was called; this is very important because
488 : // the last poll() events must be run even if a previous
489 : // callback call just disabled this very connection
490 : // (i.e. at the time we called poll() the connection was
491 : // still enabled and therefore we are expected to call
492 : // their callbacks even if it just got disabled by an
493 : // earlier callback)
494 : //
495 0 : if(!enabled[idx])
496 : {
497 : //SNAP_LOG_TRACE
498 : // << "communicator::run(): in loop, connection '"
499 : // << c->get_name()
500 : // << "' has been disabled, so ignored!";
501 0 : continue;
502 : }
503 :
504 : // if we have a valid fds position then an event other
505 : // than a timeout occurred on that connection
506 : //
507 0 : if(c->f_fds_position >= 0)
508 : {
509 0 : struct pollfd * fd(&fds[c->f_fds_position]);
510 :
511 : // if any events were found by poll(), process them now
512 : //
513 0 : if(fd->revents != 0)
514 : {
515 : // an event happened on this one
516 : //
517 0 : if((fd->revents & (POLLIN | POLLPRI)) != 0)
518 : {
519 : // we consider that Unix signals have the greater priority
520 : // and thus handle them first
521 : //
522 0 : if(c->is_signal())
523 : {
524 0 : signal * ss(dynamic_cast<signal *>(c.get()));
525 0 : if(ss)
526 : {
527 0 : ss->process();
528 : }
529 : }
530 0 : else if(c->is_listener())
531 : {
532 : // a listener is a special case and we want
533 : // to call process_accept() instead
534 : //
535 0 : c->process_accept();
536 : }
537 : else
538 : {
539 0 : c->process_read();
540 : }
541 : }
542 0 : if((fd->revents & POLLOUT) != 0)
543 : {
544 0 : c->process_write();
545 : }
546 0 : if((fd->revents & POLLERR) != 0)
547 : {
548 0 : c->process_error();
549 : }
550 0 : if((fd->revents & (POLLHUP | POLLRDHUP)) != 0)
551 : {
552 0 : c->process_hup();
553 : }
554 0 : if((fd->revents & POLLNVAL) != 0)
555 : {
556 0 : c->process_invalid();
557 : }
558 : }
559 : }
560 :
561 : // now check whether we have a timeout on this connection
562 : //
563 0 : int64_t const timestamp(c->get_saved_timeout_timestamp());
564 0 : if(timestamp != -1)
565 : {
566 0 : int64_t const now(get_current_date());
567 0 : if(now >= timestamp)
568 : {
569 : //SNAP_LOG_TRACE
570 : // << "communicator::run(): timer of connection = '"<< c->get_name()
571 : // << "', timestamp = " << timestamp
572 : // << ", now = " << now
573 : // << ", now >= timestamp --> " << (now >= timestamp ? "TRUE (timed out!)" : "FALSE");
574 :
575 : // move the timeout as required first
576 : // (because the callback may move it again)
577 : //
578 0 : c->calculate_next_tick();
579 :
580 : // the timeout date needs to be reset if the tick
581 : // happened for that date
582 : //
583 0 : if(now >= c->get_timeout_date())
584 : {
585 0 : c->set_timeout_date(-1);
586 : }
587 :
588 : // then run the callback
589 : //
590 0 : c->process_timeout();
591 : }
592 : }
593 : }
594 : }
595 : else
596 : {
597 : // r < 0 means an error occurred
598 : //
599 0 : if(errno == EINTR)
600 : {
601 : // Note: if the user wants to prevent this error, he should
602 : // use the snap_signal with the Unix signals that may
603 : // happen while calling poll().
604 : //
605 0 : throw event_dispatcher_runtime_error("communicator::run(): EINTR occurred while in poll() -- interrupts are not supported yet though");
606 : }
607 0 : if(errno == EFAULT)
608 : {
609 0 : throw event_dispatcher_invalid_parameter("communicator::run(): buffer was moved out of our address space?");
610 : }
611 0 : if(errno == EINVAL)
612 : {
613 : // if this is really because nfds is too large then it may be
614 : // a "soft" error that can be fixed; that being said, my
615 : // current version is 16K files which frankly when we reach
616 : // that level we have a problem...
617 : //
618 : struct rlimit rl;
619 0 : getrlimit(RLIMIT_NOFILE, &rl);
620 : throw event_dispatcher_invalid_parameter(
621 : "communicator::run(): too many file fds for poll, limit is currently "
622 0 : + std::to_string(rl.rlim_cur)
623 0 : + ", your kernel top limit is "
624 0 : + std::to_string(rl.rlim_max));
625 : }
626 0 : if(errno == ENOMEM)
627 : {
628 0 : throw event_dispatcher_runtime_error("communicator::run(): poll() failed because of memory");
629 : }
630 0 : int const e(errno);
631 : throw event_dispatcher_runtime_error(
632 : "communicator::run(): poll() failed with error "
633 0 : + std::to_string(e)
634 0 : + " -- "
635 0 : + strerror(e));
636 : }
637 : }
638 : }
639 :
640 :
641 :
642 6 : } // namespace ed
643 : // vim: ts=4 sw=4 et
|