Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/tcp_blocking_client_message_connection.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 :
54 :
55 : // snaplogger lib
56 : //
57 : #include <snaplogger/message.h>
58 :
59 :
60 : // snapdev lib
61 : //
62 : #include <snapdev/not_used.h>
63 :
64 :
65 : // C++ lib
66 : //
67 : #include <cstring>
68 :
69 :
70 : // C lib
71 : //
72 : #include <poll.h>
73 : #include <sys/resource.h>
74 :
75 :
76 : // last include
77 : //
78 : #include <snapdev/poison.h>
79 :
80 :
81 :
82 : namespace ed
83 : {
84 :
85 :
86 :
87 : /** \brief Blocking client message connection.
88 : *
89 : * This object allows you to create a blocking, generally temporary
90 : * one message connection client. This is specifically used with
91 : * the snaplock daemon, but it can be used for other things too as
92 : * required.
93 : *
94 : * The connection is expected to be used as shown in the following
95 : * example which is how it is used to implement the LOCK through
96 : * our snaplock daemons.
97 : *
98 : * \code
99 : * class my_blocking_connection
100 : * : public ed::tcp_blocking_client_message_connection
101 : * {
102 : * public:
103 : * my_blocking_connection(std::string const & addr, int port, mode_t mode)
104 : * : tcp_blocking_client_message_connection(addr, port, mode)
105 : * {
106 : * // need to register with communicator
107 : * message register_message;
108 : * register_message.set_command("REGISTER");
109 : * ...
110 : * blocking_connection.send_message(register_message);
111 : *
112 : * run();
113 : * }
114 : *
115 : * ~my_blocking_connection()
116 : * {
117 : * // done, send UNLOCK and then make sure to unregister
118 : * message unlock_message;
119 : * unlock_message.set_command("UNLOCK");
120 : * ...
121 : * blocking_connection.send_message(unlock_message);
122 : *
123 : * message unregister_message;
124 : * unregister_message.set_command("UNREGISTER");
125 : * ...
126 : * blocking_connection.send_message(unregister_message);
127 : * }
128 : *
129 : * // now that we have a dispatcher, this would probably use
130 : * // that mechanism instead of a list of if()/else if()
131 : * //
132 : * // Please, consider using the dispatcher instead
133 : * //
134 : * virtual void process_message(message const & message)
135 : * {
136 : * QString const command(message.get_command());
137 : * if(command == "LOCKED")
138 : * {
139 : * // the lock worked, release hand back to the user
140 : * done();
141 : * }
142 : * else if(command == "READY")
143 : * {
144 : * // the REGISTER worked
145 : * // send the LOCK now
146 : * message lock_message;
147 : * lock_message.set_command("LOCK");
148 : * ...
149 : * blocking_connection.send_message(lock_message);
150 : * }
151 : * else if(command == "HELP")
152 : * {
153 : * // snapcommunicator wants us to tell it what commands
154 : * // we accept
155 : * message commands_message;
156 : * commands_message.set_command("COMMANDS");
157 : * ...
158 : * blocking_connection.send_message(commands_message);
159 : * }
160 : * }
161 : * };
162 : * my_blocking_connection blocking_connection("127.0.0.1", 4040);
163 : *
164 : * // then we can send a message to the service we are interested in
165 : * my_blocking_connection.send_message(my_message);
166 : *
167 : * // now we call run() waiting for a reply
168 : * my_blocking_connection.run();
169 : * \endcode
170 : *
171 : * \param[in] addr The address to connect to.
172 : * \param[in] port The port to connect at.
173 : * \param[in] mode The mode used to connect.
174 : */
175 0 : tcp_blocking_client_message_connection::tcp_blocking_client_message_connection(
176 : std::string const & addr
177 : , int const port
178 0 : , mode_t const mode)
179 0 : : tcp_client_message_connection(addr, port, mode, true)
180 : {
181 0 : }
182 :
183 :
184 : /** \brief Blocking run on the connection.
185 : *
186 : * This function reads the incoming messages and calls process_message()
187 : * on each one of them, in a blocking manner.
188 : *
189 : * If you called mark_done() before, the done flag is reset back to false.
190 : * You will have to call mark_done() again if you again receive a message
191 : * that is expected to end the loop.
192 : *
193 : * \note
194 : * Internally, the function actually calls process_line() which transforms
195 : * the line in a message and in turn calls process_message().
196 : */
197 0 : void tcp_blocking_client_message_connection::run()
198 : {
199 0 : mark_not_done();
200 :
201 0 : do
202 : {
203 : for(;;)
204 : {
205 : // TBD: can the socket become -1 within the read() loop?
206 : // (i.e. should not that be just outside of the for(;;)?)
207 : //
208 0 : struct pollfd fd;
209 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
210 0 : fd.fd = get_socket();
211 0 : if(fd.fd < 0
212 0 : || !is_enabled())
213 : {
214 : // invalid socket
215 0 : process_error();
216 0 : return;
217 : }
218 :
219 : // at this time, this class is used with the lock and
220 : // the lock has a timeout so we need to block at most
221 : // for that amount of time and not forever (presumably
222 : // the snaplock would send us a LOCKFAILED marked with
223 : // a "timeout" parameter, but we cannot rely on the
224 : // snaplock being there and responding as expected.)
225 : //
226 : // calculate the number of microseconds and then convert
227 : // them to milliseconds for poll()
228 : //
229 0 : std::int64_t const next_timeout_timestamp(save_timeout_timestamp());
230 0 : std::int64_t const now(get_current_date());
231 0 : std::int64_t const timeout((next_timeout_timestamp - now) / 1000);
232 0 : if(timeout <= 0)
233 : {
234 : // timed out
235 : //
236 0 : process_timeout();
237 0 : if(is_done())
238 : {
239 0 : return;
240 : }
241 0 : SNAP_LOG_FATAL
242 0 : << "blocking connection timed out."
243 : << SNAP_LOG_SEND;
244 : throw event_dispatcher_runtime_error(
245 : "tcp_blocking_client_message_connection::run(): blocking"
246 0 : " connection timed out.");
247 : }
248 0 : errno = 0;
249 0 : fd.revents = 0; // probably useless... (kernel should clear those)
250 0 : int const r(::poll(&fd, 1, timeout));
251 0 : if(r < 0)
252 : {
253 : // r < 0 means an error occurred
254 : //
255 0 : if(errno == EINTR)
256 : {
257 : // Note: if the user wants to prevent this error, he should
258 : // use the signal with the Unix signals that may
259 : // happen while calling poll().
260 : //
261 : throw event_dispatcher_runtime_error(
262 : "tcp_blocking_client_message_connection::run():"
263 : " EINTR occurred while in poll() -- interrupts"
264 0 : " are not supported yet though.");
265 : }
266 0 : if(errno == EFAULT)
267 : {
268 : throw event_dispatcher_runtime_error(
269 : "tcp_blocking_client_message_connection::run():"
270 0 : " buffer was moved out of our address space?");
271 : }
272 0 : if(errno == EINVAL)
273 : {
274 : // if this is really because nfds is too large then it may be
275 : // a "soft" error that can be fixed; that being said, my
276 : // current version is 16K files which frankly when we reach
277 : // that level we have a problem...
278 : //
279 0 : rlimit rl;
280 0 : getrlimit(RLIMIT_NOFILE, &rl);
281 : throw event_dispatcher_invalid_parameter(
282 : "tcp_blocking_client_message_connection::run():"
283 : " too many file fds for poll, limit is"
284 : " currently "
285 0 : + std::to_string(rl.rlim_cur)
286 0 : + ", your kernel top limit is "
287 0 : + std::to_string(rl.rlim_max)
288 0 : + ".");
289 : }
290 0 : if(errno == ENOMEM)
291 : {
292 : throw event_dispatcher_runtime_error(
293 : "tcp_blocking_client_message_connection::run():"
294 0 : " poll() failed because of memory.");
295 : }
296 0 : int const e(errno);
297 : throw event_dispatcher_invalid_parameter(
298 : "tcp_blocking_client_message_connection::run():"
299 : " poll() failed with error "
300 0 : + std::to_string(e)
301 0 : + " -- "
302 0 : + strerror(e));
303 : }
304 :
305 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
306 : {
307 : // read one character at a time otherwise we would be
308 : // blocked forever
309 : //
310 0 : char buf[2];
311 0 : int const size(::read(fd.fd, buf, 1));
312 0 : if(size != 1)
313 : {
314 : // invalid read
315 : //
316 0 : process_error();
317 : throw event_dispatcher_invalid_parameter(
318 : "tcp_blocking_client_message_connection::run():"
319 : " read() failed reading data from socket"
320 : " (return value = "
321 0 : + std::to_string(size)
322 0 : + ").");
323 : }
324 0 : if(buf[0] == '\n')
325 : {
326 : // end of a line, we got a whole message in our buffer
327 : // notice that we do not add the '\n' to line
328 : //
329 0 : break;
330 : }
331 0 : buf[1] = '\0';
332 0 : f_line += buf;
333 : }
334 0 : if((fd.revents & POLLERR) != 0)
335 : {
336 0 : process_error();
337 0 : return;
338 : }
339 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
340 : {
341 0 : process_hup();
342 0 : return;
343 : }
344 0 : if((fd.revents & POLLNVAL) != 0)
345 : {
346 0 : process_invalid();
347 0 : return;
348 : }
349 0 : }
350 0 : process_line(f_line);
351 0 : f_line.clear();
352 : }
353 0 : while(!is_done());
354 : }
355 :
356 :
357 : /** \brief Quick peek on the connection.
358 : *
359 : * This function checks for incoming messages and calls process_message()
360 : * on each one of them. If no messages are found on the pipe, then the
361 : * function returns immediately.
362 : *
363 : * \note
364 : * Internally, the function actually calls process_line() which transforms
365 : * the line in a message and in turn calls process_message().
366 : */
367 0 : void tcp_blocking_client_message_connection::peek()
368 : {
369 0 : do
370 : {
371 : for(;;)
372 : {
373 0 : pollfd fd;
374 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
375 0 : fd.fd = get_socket();
376 0 : if(fd.fd < 0
377 0 : || !is_enabled())
378 : {
379 : // invalid socket
380 0 : process_error();
381 0 : return;
382 : }
383 :
384 0 : errno = 0;
385 0 : fd.revents = 0; // probably useless... (kernel should clear those)
386 0 : int const r(::poll(&fd, 1, 0));
387 0 : if(r < 0)
388 : {
389 : // r < 0 means an error occurred
390 : //
391 0 : if(errno == EINTR)
392 : {
393 : // Note: if the user wants to prevent this error, he should
394 : // use the signal with the Unix signals that may
395 : // happen while calling poll().
396 : //
397 : throw event_dispatcher_runtime_error(
398 : "tcp_blocking_client_message_connection::run():"
399 : " EINTR occurred while in poll() -- interrupts"
400 0 : " are not supported yet though");
401 : }
402 0 : if(errno == EFAULT)
403 : {
404 : throw event_dispatcher_invalid_parameter(
405 : "tcp_blocking_client_message_connection::run():"
406 0 : " buffer was moved out of our address space?");
407 : }
408 0 : if(errno == EINVAL)
409 : {
410 : // if this is really because nfds is too large then it may be
411 : // a "soft" error that can be fixed; that being said, my
412 : // current version is 16K files which frankly when we reach
413 : // that level we have a problem...
414 : //
415 0 : struct rlimit rl;
416 0 : getrlimit(RLIMIT_NOFILE, &rl);
417 : throw event_dispatcher_invalid_parameter(
418 : "tcp_blocking_client_message_connection::run():"
419 : " too many file fds for poll, limit is currently "
420 0 : + std::to_string(rl.rlim_cur)
421 0 : + ", your kernel top limit is "
422 0 : + std::to_string(rl.rlim_max));
423 : }
424 0 : if(errno == ENOMEM)
425 : {
426 : throw event_dispatcher_runtime_error(
427 : "tcp_blocking_client_message_connection::run():"
428 0 : " poll() failed because of memory");
429 : }
430 0 : int const e(errno);
431 : throw event_dispatcher_runtime_error(
432 : "tcp_blocking_client_message_connection::run():"
433 : " poll() failed with error "
434 0 : + std::to_string(e)
435 0 : + " -- "
436 0 : + strerror(e));
437 : }
438 :
439 0 : if(r == 0)
440 : {
441 0 : return;
442 : }
443 :
444 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
445 : {
446 : // read one character at a time otherwise we would be
447 : // blocked forever
448 : //
449 0 : char buf[2];
450 0 : int const size(::read(fd.fd, buf, 1));
451 0 : if(size != 1)
452 : {
453 : // invalid read
454 0 : process_error();
455 : throw event_dispatcher_runtime_error(
456 : "tcp_blocking_client_message_connection::run():"
457 : " read() failed reading data from socket (return"
458 : " value = "
459 0 : + std::to_string(size)
460 0 : + ")");
461 : }
462 0 : if(buf[0] == '\n')
463 : {
464 : // end of a line, we got a whole message in our buffer
465 : // notice that we do not add the '\n' to line
466 0 : break;
467 : }
468 0 : buf[1] = '\0';
469 0 : f_line += buf;
470 : }
471 0 : if((fd.revents & POLLERR) != 0)
472 : {
473 0 : process_error();
474 0 : return;
475 : }
476 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
477 : {
478 0 : process_hup();
479 0 : return;
480 : }
481 0 : if((fd.revents & POLLNVAL) != 0)
482 : {
483 0 : process_invalid();
484 0 : return;
485 : }
486 0 : }
487 0 : process_line(f_line);
488 0 : f_line.clear();
489 : }
490 0 : while(!is_done());
491 : }
492 :
493 :
494 : /** \brief Send the specified message to the connection on the other end.
495 : *
496 : * This function sends the specified message to the other side of the
497 : * socket connection. If the write somehow fails, then the function
498 : * returns false.
499 : *
500 : * The function blocks until the entire message was written to the
501 : * socket.
502 : *
503 : * \param[in] message The message to send to the connection.
504 : * \param[in] cache Whether to cache the message if it cannot be sent
505 : * immediately (ignored at the moment.)
506 : *
507 : * \return true if the message was sent successfully, false otherwise.
508 : */
509 0 : bool tcp_blocking_client_message_connection::send_message(message const & msg, bool cache)
510 : {
511 0 : snap::NOT_USED(cache);
512 :
513 0 : int const s(get_socket());
514 0 : if(s >= 0)
515 : {
516 : // transform the message to a string and write to the socket
517 : // the writing is blocking and thus fully synchronous so the
518 : // function blocks until the message gets fully sent
519 : //
520 : // WARNING: we cannot use f_connection.write() because that one
521 : // is asynchronous (at least, it writes to a buffer
522 : // and not directly to the socket!)
523 : //
524 0 : std::string buf(msg.to_message());
525 0 : buf += '\n';
526 0 : return ::write(s, buf.c_str(), buf.length()) == static_cast<ssize_t>(buf.length());
527 : }
528 :
529 0 : return false;
530 : }
531 :
532 :
533 : /** \brief Overridden callback.
534 : *
535 : * This function is overriding the lower level process_error() to make
536 : * (mostly) sure that the remove_from_communicator() function does not
537 : * get called because that would generate the creation of a
538 : * communicator object which we do not want with blocking
539 : * clients.
540 : */
541 0 : void tcp_blocking_client_message_connection::process_error()
542 : {
543 0 : }
544 :
545 :
546 :
547 6 : } // namespace ed
548 : // vim: ts=4 sw=4 et
|