Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 : // to get the POLLRDHUP definition
40 : #ifndef _GNU_SOURCE
41 : #define _GNU_SOURCE
42 : #endif
43 :
44 :
45 : // self
46 : //
47 : #include "eventdispatcher/tcp_blocking_client_message_connection.h"
48 :
49 : #include "eventdispatcher/exception.h"
50 :
51 :
52 : // snaplogger lib
53 : //
54 : #include "snaplogger/message.h"
55 :
56 :
57 : // snapdev lib
58 : //
59 : //#include "snapdev/not_reached.h"
60 : #include "snapdev/not_used.h"
61 : //#include "snapdev/string_replace_many.h"
62 :
63 :
64 : //// libaddr lib
65 : ////
66 : //#include "libaddr/addr_parser.h"
67 : //
68 : //
69 : //// C++ lib
70 : ////
71 : //#include <sstream>
72 : //#include <limits>
73 : //#include <atomic>
74 :
75 :
76 : // C lib
77 : //
78 : //#include <fcntl.h>
79 : #include <poll.h>
80 : //#include <unistd.h>
81 : //#include <sys/eventfd.h>
82 : //#include <sys/inotify.h>
83 : //#include <sys/ioctl.h>
84 : #include <sys/resource.h>
85 : //#include <sys/syscall.h>
86 : //#include <sys/time.h>
87 :
88 :
89 : // last include
90 : //
91 : #include <snapdev/poison.h>
92 :
93 :
94 :
95 : namespace ed
96 : {
97 :
98 :
99 :
100 : /** \brief Blocking client message connection.
101 : *
102 : * This object allows you to create a blocking, generally temporary
103 : * one message connection client. This is specifically used with
104 : * the snaplock daemon, but it can be used for other things too as
105 : * required.
106 : *
107 : * The connection is expected to be used as shown in the following
108 : * example which is how it is used to implement the LOCK through
109 : * our snaplock daemons.
110 : *
111 : * \code
112 : * class my_blocking_connection
113 : * : public ed::tcp_blocking_client_message_connection
114 : * {
115 : * public:
116 : * my_blocking_connection(std::string const & addr, int port, mode_t mode)
117 : * : tcp_blocking_client_message_connection(addr, port, mode)
118 : * {
119 : * // need to register with snap communicator
120 : * snap_communicator_message register_message;
121 : * register_message.set_command("REGISTER");
122 : * ...
123 : * blocking_connection.send_message(register_message);
124 : *
125 : * run();
126 : * }
127 : *
128 : * ~my_blocking_connection()
129 : * {
130 : * // done, send UNLOCK and then make sure to unregister
131 : * snap_communicator_message unlock_message;
132 : * unlock_message.set_command("UNLOCK");
133 : * ...
134 : * blocking_connection.send_message(unlock_message);
135 : *
136 : * snap_communicator_message unregister_message;
137 : * unregister_message.set_command("UNREGISTER");
138 : * ...
139 : * blocking_connection.send_message(unregister_message);
140 : * }
141 : *
142 : * // now that we have a dispatcher, this would probably use
143 : * // that mechanism instead of a list of if()/else if()
144 : * //
145 : * // Please, consider using the dispatcher instead
146 : * //
147 : * virtual void process_message(snap_communicator_message const & message)
148 : * {
149 : * QString const command(message.get_command());
150 : * if(command == "LOCKED")
151 : * {
152 : * // the lock worked, release hand back to the user
153 : * done();
154 : * }
155 : * else if(command == "READY")
156 : * {
157 : * // the REGISTER worked
158 : * // send the LOCK now
159 : * snap_communicator_message lock_message;
160 : * lock_message.set_command("LOCK");
161 : * ...
162 : * blocking_connection.send_message(lock_message);
163 : * }
164 : * else if(command == "HELP")
165 : * {
166 : * // snapcommunicator wants us to tell it what commands
167 : * // we accept
168 : * snap_communicator_message commands_message;
169 : * commands_message.set_command("COMMANDS");
170 : * ...
171 : * blocking_connection.send_message(commands_message);
172 : * }
173 : * }
174 : * };
175 : * my_blocking_connection blocking_connection("127.0.0.1", 4040);
176 : *
177 : * // then we can send a message to the service we are interested in
178 : * my_blocking_connection.send_message(my_message);
179 : *
180 : * // now we call run() waiting for a reply
181 : * my_blocking_connection.run();
182 : * \endcode
183 : *
184 : * \param[in] addr The address to connect to.
185 : * \param[in] port The port to connect at.
186 : * \param[in] mode The mode used to connect.
187 : */
188 0 : tcp_blocking_client_message_connection::tcp_blocking_client_message_connection(
189 : std::string const & addr
190 : , int const port
191 : , mode_t const mode)
192 0 : : tcp_client_message_connection(addr, port, mode, true)
193 : {
194 0 : }
195 :
196 :
197 : /** \brief Blocking run on the connection.
198 : *
199 : * This function reads the incoming messages and calls process_message()
200 : * on each one of them, in a blocking manner.
201 : *
202 : * If you called mark_done() before, the done flag is reset back to false.
203 : * You will have to call mark_done() again if you again receive a message
204 : * that is expected to end the loop.
205 : *
206 : * \note
207 : * Internally, the function actually calls process_line() which transforms
208 : * the line in a message and in turn calls process_message().
209 : */
210 0 : void tcp_blocking_client_message_connection::run()
211 : {
212 0 : mark_not_done();
213 :
214 0 : do
215 : {
216 0 : for(;;)
217 : {
218 : // TBD: can the socket become -1 within the read() loop?
219 : // (i.e. should not that be just outside of the for(;;)?)
220 : //
221 : struct pollfd fd;
222 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
223 0 : fd.fd = get_socket();
224 0 : if(fd.fd < 0
225 0 : || !is_enabled())
226 : {
227 : // invalid socket
228 0 : process_error();
229 0 : return;
230 : }
231 :
232 : // at this time, this class is used with the lock and
233 : // the lock has a timeout so we need to block at most
234 : // for that amount of time and not forever (presumably
235 : // the snaplock would send us a LOCKFAILED marked with
236 : // a "timeout" parameter, but we cannot rely on the
237 : // snaplock being there and responding as expected.)
238 : //
239 : // calculate the number of microseconds and then convert
240 : // them to milliseconds for poll()
241 : //
242 0 : std::int64_t const next_timeout_timestamp(save_timeout_timestamp());
243 0 : std::int64_t const now(get_current_date());
244 0 : std::int64_t const timeout((next_timeout_timestamp - now) / 1000);
245 0 : if(timeout <= 0)
246 : {
247 : // timed out
248 : //
249 0 : process_timeout();
250 0 : if(is_done())
251 : {
252 0 : return;
253 : }
254 : SNAP_LOG_FATAL
255 0 : << "blocking connection timed out.";
256 : throw event_dispatcher_runtime_error(
257 : "tcp_blocking_client_message_connection::run(): blocking"
258 0 : " connection timed out.");
259 : }
260 0 : errno = 0;
261 0 : fd.revents = 0; // probably useless... (kernel should clear those)
262 0 : int const r(::poll(&fd, 1, timeout));
263 0 : if(r < 0)
264 : {
265 : // r < 0 means an error occurred
266 : //
267 0 : if(errno == EINTR)
268 : {
269 : // Note: if the user wants to prevent this error, he should
270 : // use the snap_signal with the Unix signals that may
271 : // happen while calling poll().
272 : //
273 : throw event_dispatcher_runtime_error(
274 : "tcp_blocking_client_message_connection::run():"
275 : " EINTR occurred while in poll() -- interrupts"
276 0 : " are not supported yet though.");
277 : }
278 0 : if(errno == EFAULT)
279 : {
280 : throw event_dispatcher_runtime_error(
281 : "tcp_blocking_client_message_connection::run():"
282 0 : " buffer was moved out of our address space?");
283 : }
284 0 : if(errno == EINVAL)
285 : {
286 : // if this is really because nfds is too large then it may be
287 : // a "soft" error that can be fixed; that being said, my
288 : // current version is 16K files which frankly when we reach
289 : // that level we have a problem...
290 : //
291 : rlimit rl;
292 0 : getrlimit(RLIMIT_NOFILE, &rl);
293 : throw event_dispatcher_invalid_parameter(
294 : "tcp_blocking_client_message_connection::run():"
295 : " too many file fds for poll, limit is"
296 : " currently "
297 0 : + std::to_string(rl.rlim_cur)
298 0 : + ", your kernel top limit is "
299 0 : + std::to_string(rl.rlim_max)
300 0 : + ".");
301 : }
302 0 : if(errno == ENOMEM)
303 : {
304 : throw event_dispatcher_runtime_error(
305 : "tcp_blocking_client_message_connection::run():"
306 0 : " poll() failed because of memory.");
307 : }
308 0 : int const e(errno);
309 : throw event_dispatcher_invalid_parameter(
310 : "tcp_blocking_client_message_connection::run():"
311 : " poll() failed with error "
312 0 : + std::to_string(e)
313 0 : + " -- "
314 0 : + strerror(e));
315 : }
316 :
317 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
318 : {
319 : // read one character at a time otherwise we would be
320 : // blocked forever
321 : //
322 : char buf[2];
323 0 : int const size(::read(fd.fd, buf, 1));
324 0 : if(size != 1)
325 : {
326 : // invalid read
327 : //
328 0 : process_error();
329 : throw event_dispatcher_invalid_parameter(
330 : "tcp_blocking_client_message_connection::run():"
331 : " read() failed reading data from socket"
332 : " (return value = "
333 0 : + std::to_string(size)
334 0 : + ").");
335 : }
336 0 : if(buf[0] == '\n')
337 : {
338 : // end of a line, we got a whole message in our buffer
339 : // notice that we do not add the '\n' to line
340 : //
341 0 : break;
342 : }
343 0 : buf[1] = '\0';
344 0 : f_line += buf;
345 : }
346 0 : if((fd.revents & POLLERR) != 0)
347 : {
348 0 : process_error();
349 0 : return;
350 : }
351 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
352 : {
353 0 : process_hup();
354 0 : return;
355 : }
356 0 : if((fd.revents & POLLNVAL) != 0)
357 : {
358 0 : process_invalid();
359 0 : return;
360 : }
361 : }
362 0 : process_line(f_line);
363 0 : f_line.clear();
364 : }
365 0 : while(!is_done());
366 : }
367 :
368 :
369 : /** \brief Quick peek on the connection.
370 : *
371 : * This function checks for incoming messages and calls process_message()
372 : * on each one of them. If no messages are found on the pipe, then the
373 : * function returns immediately.
374 : *
375 : * \note
376 : * Internally, the function actually calls process_line() which transforms
377 : * the line in a message and in turn calls process_message().
378 : */
379 0 : void tcp_blocking_client_message_connection::peek()
380 : {
381 0 : do
382 : {
383 0 : for(;;)
384 : {
385 : pollfd fd;
386 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
387 0 : fd.fd = get_socket();
388 0 : if(fd.fd < 0
389 0 : || !is_enabled())
390 : {
391 : // invalid socket
392 0 : process_error();
393 0 : return;
394 : }
395 :
396 0 : errno = 0;
397 0 : fd.revents = 0; // probably useless... (kernel should clear those)
398 0 : int const r(::poll(&fd, 1, 0));
399 0 : if(r < 0)
400 : {
401 : // r < 0 means an error occurred
402 : //
403 0 : if(errno == EINTR)
404 : {
405 : // Note: if the user wants to prevent this error, he should
406 : // use the snap_signal with the Unix signals that may
407 : // happen while calling poll().
408 : //
409 : throw event_dispatcher_runtime_error(
410 : "tcp_blocking_client_message_connection::run():"
411 : " EINTR occurred while in poll() -- interrupts"
412 0 : " are not supported yet though");
413 : }
414 0 : if(errno == EFAULT)
415 : {
416 : throw event_dispatcher_invalid_parameter(
417 : "tcp_blocking_client_message_connection::run():"
418 0 : " buffer was moved out of our address space?");
419 : }
420 0 : if(errno == EINVAL)
421 : {
422 : // if this is really because nfds is too large then it may be
423 : // a "soft" error that can be fixed; that being said, my
424 : // current version is 16K files which frankly when we reach
425 : // that level we have a problem...
426 : //
427 : struct rlimit rl;
428 0 : getrlimit(RLIMIT_NOFILE, &rl);
429 : throw event_dispatcher_invalid_parameter(
430 : "tcp_blocking_client_message_connection::run():"
431 : " too many file fds for poll, limit is currently "
432 0 : + std::to_string(rl.rlim_cur)
433 0 : + ", your kernel top limit is "
434 0 : + std::to_string(rl.rlim_max));
435 : }
436 0 : if(errno == ENOMEM)
437 : {
438 : throw event_dispatcher_runtime_error(
439 : "tcp_blocking_client_message_connection::run():"
440 0 : " poll() failed because of memory");
441 : }
442 0 : int const e(errno);
443 : throw event_dispatcher_runtime_error(
444 : "tcp_blocking_client_message_connection::run():"
445 : " poll() failed with error "
446 0 : + std::to_string(e)
447 0 : + " -- "
448 0 : + strerror(e));
449 : }
450 :
451 0 : if(r == 0)
452 : {
453 0 : return;
454 : }
455 :
456 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
457 : {
458 : // read one character at a time otherwise we would be
459 : // blocked forever
460 : //
461 : char buf[2];
462 0 : int const size(::read(fd.fd, buf, 1));
463 0 : if(size != 1)
464 : {
465 : // invalid read
466 0 : process_error();
467 : throw event_dispatcher_runtime_error(
468 : "tcp_blocking_client_message_connection::run():"
469 : " read() failed reading data from socket (return"
470 : " value = "
471 0 : + std::to_string(size)
472 0 : + ")");
473 : }
474 0 : if(buf[0] == '\n')
475 : {
476 : // end of a line, we got a whole message in our buffer
477 : // notice that we do not add the '\n' to line
478 0 : break;
479 : }
480 0 : buf[1] = '\0';
481 0 : f_line += buf;
482 : }
483 0 : if((fd.revents & POLLERR) != 0)
484 : {
485 0 : process_error();
486 0 : return;
487 : }
488 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
489 : {
490 0 : process_hup();
491 0 : return;
492 : }
493 0 : if((fd.revents & POLLNVAL) != 0)
494 : {
495 0 : process_invalid();
496 0 : return;
497 : }
498 : }
499 0 : process_line(f_line);
500 0 : f_line.clear();
501 : }
502 0 : while(!is_done());
503 : }
504 :
505 :
506 : /** \brief Send the specified message to the connection on the other end.
507 : *
508 : * This function sends the specified message to the other side of the
509 : * socket connection. If the write somehow fails, then the function
510 : * returns false.
511 : *
512 : * The function blocks until the entire message was written to the
513 : * socket.
514 : *
515 : * \param[in] message The message to send to the connection.
516 : * \param[in] cache Whether to cache the message if it cannot be sent
517 : * immediately (ignored at the moment.)
518 : *
519 : * \return true if the message was sent succesfully, false otherwise.
520 : */
521 0 : bool tcp_blocking_client_message_connection::send_message(message const & msg, bool cache)
522 : {
523 0 : snap::NOTUSED(cache);
524 :
525 0 : int const s(get_socket());
526 0 : if(s >= 0)
527 : {
528 : // transform the message to a string and write to the socket
529 : // the writing is blocking and thus fully synchronous so the
530 : // function blocks until the message gets fully sent
531 : //
532 : // WARNING: we cannot use f_connection.write() because that one
533 : // is asynchronous (at least, it writes to a buffer
534 : // and not directly to the socket!)
535 : //
536 0 : std::string buf(msg.to_message());
537 0 : buf += '\n';
538 0 : return ::write(s, buf.c_str(), buf.length()) == static_cast<ssize_t>(buf.length());
539 : }
540 :
541 0 : return false;
542 : }
543 :
544 :
545 : /** \brief Overridden callback.
546 : *
547 : * This function is overriding the lower level process_error() to make
548 : * (mostly) sure that the remove_from_communicator() function does not
549 : * get called because that would generate the creation of a
550 : * snap_communicator object which we do not want with blocking
551 : * clients.
552 : */
553 0 : void tcp_blocking_client_message_connection::process_error()
554 : {
555 0 : }
556 :
557 :
558 :
559 6 : } // namespace ed
560 : // vim: ts=4 sw=4 et
|