Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/tcp_blocking_client_message_connection.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 :
54 :
55 : // snaplogger lib
56 : //
57 : #include <snaplogger/message.h>
58 :
59 :
60 : // snapdev lib
61 : //
62 : #include <snapdev/not_used.h>
63 :
64 :
65 : // C lib
66 : //
67 : #include <poll.h>
68 : #include <sys/resource.h>
69 :
70 :
71 : // last include
72 : //
73 : #include <snapdev/poison.h>
74 :
75 :
76 :
77 : namespace ed
78 : {
79 :
80 :
81 :
82 : /** \brief Blocking client message connection.
83 : *
84 : * This object allows you to create a blocking, generally temporary
85 : * one message connection client. This is specifically used with
86 : * the snaplock daemon, but it can be used for other things too as
87 : * required.
88 : *
89 : * The connection is expected to be used as shown in the following
90 : * example which is how it is used to implement the LOCK through
91 : * our snaplock daemons.
92 : *
93 : * \code
94 : * class my_blocking_connection
95 : * : public ed::tcp_blocking_client_message_connection
96 : * {
97 : * public:
98 : * my_blocking_connection(std::string const & addr, int port, mode_t mode)
99 : * : tcp_blocking_client_message_connection(addr, port, mode)
100 : * {
101 : * // need to register with snap communicator
102 : * snap_communicator_message register_message;
103 : * register_message.set_command("REGISTER");
104 : * ...
105 : * blocking_connection.send_message(register_message);
106 : *
107 : * run();
108 : * }
109 : *
110 : * ~my_blocking_connection()
111 : * {
112 : * // done, send UNLOCK and then make sure to unregister
113 : * snap_communicator_message unlock_message;
114 : * unlock_message.set_command("UNLOCK");
115 : * ...
116 : * blocking_connection.send_message(unlock_message);
117 : *
118 : * snap_communicator_message unregister_message;
119 : * unregister_message.set_command("UNREGISTER");
120 : * ...
121 : * blocking_connection.send_message(unregister_message);
122 : * }
123 : *
124 : * // now that we have a dispatcher, this would probably use
125 : * // that mechanism instead of a list of if()/else if()
126 : * //
127 : * // Please, consider using the dispatcher instead
128 : * //
129 : * virtual void process_message(snap_communicator_message const & message)
130 : * {
131 : * QString const command(message.get_command());
132 : * if(command == "LOCKED")
133 : * {
134 : * // the lock worked, release hand back to the user
135 : * done();
136 : * }
137 : * else if(command == "READY")
138 : * {
139 : * // the REGISTER worked
140 : * // send the LOCK now
141 : * snap_communicator_message lock_message;
142 : * lock_message.set_command("LOCK");
143 : * ...
144 : * blocking_connection.send_message(lock_message);
145 : * }
146 : * else if(command == "HELP")
147 : * {
148 : * // snapcommunicator wants us to tell it what commands
149 : * // we accept
150 : * snap_communicator_message commands_message;
151 : * commands_message.set_command("COMMANDS");
152 : * ...
153 : * blocking_connection.send_message(commands_message);
154 : * }
155 : * }
156 : * };
157 : * my_blocking_connection blocking_connection("127.0.0.1", 4040);
158 : *
159 : * // then we can send a message to the service we are interested in
160 : * my_blocking_connection.send_message(my_message);
161 : *
162 : * // now we call run() waiting for a reply
163 : * my_blocking_connection.run();
164 : * \endcode
165 : *
166 : * \param[in] addr The address to connect to.
167 : * \param[in] port The port to connect at.
168 : * \param[in] mode The mode used to connect.
169 : */
170 0 : tcp_blocking_client_message_connection::tcp_blocking_client_message_connection(
171 : std::string const & addr
172 : , int const port
173 : , mode_t const mode)
174 0 : : tcp_client_message_connection(addr, port, mode, true)
175 : {
176 0 : }
177 :
178 :
179 : /** \brief Blocking run on the connection.
180 : *
181 : * This function reads the incoming messages and calls process_message()
182 : * on each one of them, in a blocking manner.
183 : *
184 : * If you called mark_done() before, the done flag is reset back to false.
185 : * You will have to call mark_done() again if you again receive a message
186 : * that is expected to end the loop.
187 : *
188 : * \note
189 : * Internally, the function actually calls process_line() which transforms
190 : * the line in a message and in turn calls process_message().
191 : */
192 0 : void tcp_blocking_client_message_connection::run()
193 : {
194 0 : mark_not_done();
195 :
196 0 : do
197 : {
198 0 : for(;;)
199 : {
200 : // TBD: can the socket become -1 within the read() loop?
201 : // (i.e. should not that be just outside of the for(;;)?)
202 : //
203 : struct pollfd fd;
204 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
205 0 : fd.fd = get_socket();
206 0 : if(fd.fd < 0
207 0 : || !is_enabled())
208 : {
209 : // invalid socket
210 0 : process_error();
211 0 : return;
212 : }
213 :
214 : // at this time, this class is used with the lock and
215 : // the lock has a timeout so we need to block at most
216 : // for that amount of time and not forever (presumably
217 : // the snaplock would send us a LOCKFAILED marked with
218 : // a "timeout" parameter, but we cannot rely on the
219 : // snaplock being there and responding as expected.)
220 : //
221 : // calculate the number of microseconds and then convert
222 : // them to milliseconds for poll()
223 : //
224 0 : std::int64_t const next_timeout_timestamp(save_timeout_timestamp());
225 0 : std::int64_t const now(get_current_date());
226 0 : std::int64_t const timeout((next_timeout_timestamp - now) / 1000);
227 0 : if(timeout <= 0)
228 : {
229 : // timed out
230 : //
231 0 : process_timeout();
232 0 : if(is_done())
233 : {
234 0 : return;
235 : }
236 : SNAP_LOG_FATAL
237 0 : << "blocking connection timed out.";
238 : throw event_dispatcher_runtime_error(
239 : "tcp_blocking_client_message_connection::run(): blocking"
240 0 : " connection timed out.");
241 : }
242 0 : errno = 0;
243 0 : fd.revents = 0; // probably useless... (kernel should clear those)
244 0 : int const r(::poll(&fd, 1, timeout));
245 0 : if(r < 0)
246 : {
247 : // r < 0 means an error occurred
248 : //
249 0 : if(errno == EINTR)
250 : {
251 : // Note: if the user wants to prevent this error, he should
252 : // use the snap_signal with the Unix signals that may
253 : // happen while calling poll().
254 : //
255 : throw event_dispatcher_runtime_error(
256 : "tcp_blocking_client_message_connection::run():"
257 : " EINTR occurred while in poll() -- interrupts"
258 0 : " are not supported yet though.");
259 : }
260 0 : if(errno == EFAULT)
261 : {
262 : throw event_dispatcher_runtime_error(
263 : "tcp_blocking_client_message_connection::run():"
264 0 : " buffer was moved out of our address space?");
265 : }
266 0 : if(errno == EINVAL)
267 : {
268 : // if this is really because nfds is too large then it may be
269 : // a "soft" error that can be fixed; that being said, my
270 : // current version is 16K files which frankly when we reach
271 : // that level we have a problem...
272 : //
273 : rlimit rl;
274 0 : getrlimit(RLIMIT_NOFILE, &rl);
275 : throw event_dispatcher_invalid_parameter(
276 : "tcp_blocking_client_message_connection::run():"
277 : " too many file fds for poll, limit is"
278 : " currently "
279 0 : + std::to_string(rl.rlim_cur)
280 0 : + ", your kernel top limit is "
281 0 : + std::to_string(rl.rlim_max)
282 0 : + ".");
283 : }
284 0 : if(errno == ENOMEM)
285 : {
286 : throw event_dispatcher_runtime_error(
287 : "tcp_blocking_client_message_connection::run():"
288 0 : " poll() failed because of memory.");
289 : }
290 0 : int const e(errno);
291 : throw event_dispatcher_invalid_parameter(
292 : "tcp_blocking_client_message_connection::run():"
293 : " poll() failed with error "
294 0 : + std::to_string(e)
295 0 : + " -- "
296 0 : + strerror(e));
297 : }
298 :
299 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
300 : {
301 : // read one character at a time otherwise we would be
302 : // blocked forever
303 : //
304 : char buf[2];
305 0 : int const size(::read(fd.fd, buf, 1));
306 0 : if(size != 1)
307 : {
308 : // invalid read
309 : //
310 0 : process_error();
311 : throw event_dispatcher_invalid_parameter(
312 : "tcp_blocking_client_message_connection::run():"
313 : " read() failed reading data from socket"
314 : " (return value = "
315 0 : + std::to_string(size)
316 0 : + ").");
317 : }
318 0 : if(buf[0] == '\n')
319 : {
320 : // end of a line, we got a whole message in our buffer
321 : // notice that we do not add the '\n' to line
322 : //
323 0 : break;
324 : }
325 0 : buf[1] = '\0';
326 0 : f_line += buf;
327 : }
328 0 : if((fd.revents & POLLERR) != 0)
329 : {
330 0 : process_error();
331 0 : return;
332 : }
333 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
334 : {
335 0 : process_hup();
336 0 : return;
337 : }
338 0 : if((fd.revents & POLLNVAL) != 0)
339 : {
340 0 : process_invalid();
341 0 : return;
342 : }
343 : }
344 0 : process_line(f_line);
345 0 : f_line.clear();
346 : }
347 0 : while(!is_done());
348 : }
349 :
350 :
351 : /** \brief Quick peek on the connection.
352 : *
353 : * This function checks for incoming messages and calls process_message()
354 : * on each one of them. If no messages are found on the pipe, then the
355 : * function returns immediately.
356 : *
357 : * \note
358 : * Internally, the function actually calls process_line() which transforms
359 : * the line in a message and in turn calls process_message().
360 : */
361 0 : void tcp_blocking_client_message_connection::peek()
362 : {
363 0 : do
364 : {
365 0 : for(;;)
366 : {
367 : pollfd fd;
368 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
369 0 : fd.fd = get_socket();
370 0 : if(fd.fd < 0
371 0 : || !is_enabled())
372 : {
373 : // invalid socket
374 0 : process_error();
375 0 : return;
376 : }
377 :
378 0 : errno = 0;
379 0 : fd.revents = 0; // probably useless... (kernel should clear those)
380 0 : int const r(::poll(&fd, 1, 0));
381 0 : if(r < 0)
382 : {
383 : // r < 0 means an error occurred
384 : //
385 0 : if(errno == EINTR)
386 : {
387 : // Note: if the user wants to prevent this error, he should
388 : // use the snap_signal with the Unix signals that may
389 : // happen while calling poll().
390 : //
391 : throw event_dispatcher_runtime_error(
392 : "tcp_blocking_client_message_connection::run():"
393 : " EINTR occurred while in poll() -- interrupts"
394 0 : " are not supported yet though");
395 : }
396 0 : if(errno == EFAULT)
397 : {
398 : throw event_dispatcher_invalid_parameter(
399 : "tcp_blocking_client_message_connection::run():"
400 0 : " buffer was moved out of our address space?");
401 : }
402 0 : if(errno == EINVAL)
403 : {
404 : // if this is really because nfds is too large then it may be
405 : // a "soft" error that can be fixed; that being said, my
406 : // current version is 16K files which frankly when we reach
407 : // that level we have a problem...
408 : //
409 : struct rlimit rl;
410 0 : getrlimit(RLIMIT_NOFILE, &rl);
411 : throw event_dispatcher_invalid_parameter(
412 : "tcp_blocking_client_message_connection::run():"
413 : " too many file fds for poll, limit is currently "
414 0 : + std::to_string(rl.rlim_cur)
415 0 : + ", your kernel top limit is "
416 0 : + std::to_string(rl.rlim_max));
417 : }
418 0 : if(errno == ENOMEM)
419 : {
420 : throw event_dispatcher_runtime_error(
421 : "tcp_blocking_client_message_connection::run():"
422 0 : " poll() failed because of memory");
423 : }
424 0 : int const e(errno);
425 : throw event_dispatcher_runtime_error(
426 : "tcp_blocking_client_message_connection::run():"
427 : " poll() failed with error "
428 0 : + std::to_string(e)
429 0 : + " -- "
430 0 : + strerror(e));
431 : }
432 :
433 0 : if(r == 0)
434 : {
435 0 : return;
436 : }
437 :
438 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
439 : {
440 : // read one character at a time otherwise we would be
441 : // blocked forever
442 : //
443 : char buf[2];
444 0 : int const size(::read(fd.fd, buf, 1));
445 0 : if(size != 1)
446 : {
447 : // invalid read
448 0 : process_error();
449 : throw event_dispatcher_runtime_error(
450 : "tcp_blocking_client_message_connection::run():"
451 : " read() failed reading data from socket (return"
452 : " value = "
453 0 : + std::to_string(size)
454 0 : + ")");
455 : }
456 0 : if(buf[0] == '\n')
457 : {
458 : // end of a line, we got a whole message in our buffer
459 : // notice that we do not add the '\n' to line
460 0 : break;
461 : }
462 0 : buf[1] = '\0';
463 0 : f_line += buf;
464 : }
465 0 : if((fd.revents & POLLERR) != 0)
466 : {
467 0 : process_error();
468 0 : return;
469 : }
470 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
471 : {
472 0 : process_hup();
473 0 : return;
474 : }
475 0 : if((fd.revents & POLLNVAL) != 0)
476 : {
477 0 : process_invalid();
478 0 : return;
479 : }
480 : }
481 0 : process_line(f_line);
482 0 : f_line.clear();
483 : }
484 0 : while(!is_done());
485 : }
486 :
487 :
488 : /** \brief Send the specified message to the connection on the other end.
489 : *
490 : * This function sends the specified message to the other side of the
491 : * socket connection. If the write somehow fails, then the function
492 : * returns false.
493 : *
494 : * The function blocks until the entire message was written to the
495 : * socket.
496 : *
497 : * \param[in] message The message to send to the connection.
498 : * \param[in] cache Whether to cache the message if it cannot be sent
499 : * immediately (ignored at the moment.)
500 : *
501 : * \return true if the message was sent succesfully, false otherwise.
502 : */
503 0 : bool tcp_blocking_client_message_connection::send_message(message const & msg, bool cache)
504 : {
505 0 : snap::NOTUSED(cache);
506 :
507 0 : int const s(get_socket());
508 0 : if(s >= 0)
509 : {
510 : // transform the message to a string and write to the socket
511 : // the writing is blocking and thus fully synchronous so the
512 : // function blocks until the message gets fully sent
513 : //
514 : // WARNING: we cannot use f_connection.write() because that one
515 : // is asynchronous (at least, it writes to a buffer
516 : // and not directly to the socket!)
517 : //
518 0 : std::string buf(msg.to_message());
519 0 : buf += '\n';
520 0 : return ::write(s, buf.c_str(), buf.length()) == static_cast<ssize_t>(buf.length());
521 : }
522 :
523 0 : return false;
524 : }
525 :
526 :
527 : /** \brief Overridden callback.
528 : *
529 : * This function is overriding the lower level process_error() to make
530 : * (mostly) sure that the remove_from_communicator() function does not
531 : * get called because that would generate the creation of a
532 : * snap_communicator object which we do not want with blocking
533 : * clients.
534 : */
535 0 : void tcp_blocking_client_message_connection::process_error()
536 : {
537 0 : }
538 :
539 :
540 :
541 6 : } // namespace ed
542 : // vim: ts=4 sw=4 et
|