Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/socket_events.h"
46 :
47 : #include "eventdispatcher/communicator.h"
48 : #include "eventdispatcher/exception.h"
49 : #include "eventdispatcher/timer.h"
50 :
51 : // #include "eventdispatcher/tcp_bio_client.h"
52 : // #include "eventdispatcher/tcp_server_client_message_connection.h"
53 : // #include "eventdispatcher/thread_done_signal.h"
54 :
55 :
56 : // snaplogger lib
57 : //
58 : #include <snaplogger/message.h>
59 :
60 :
61 : // snapdev lib
62 : //
63 : #include <snapdev/raii_generic_deleter.h>
64 :
65 :
66 : // cppthread lib
67 : //
68 : // #include <cppthread/exception.h>
69 : #include <cppthread/guard.h>
70 : #include <cppthread/mutex.h>
71 : // #include <cppthread/thread.h>
72 :
73 :
74 : // libaddr lib
75 : //
76 : #include <libaddr/addr_parser.h>
77 :
78 :
79 : // C++ lib
80 : //
81 : #include <algorithm>
82 : #include <deque>
83 :
84 :
85 : // C lib
86 : //
87 : // #include <sys/socket.h>
88 : #include <linux/inet_diag.h>
89 : #include <linux/netlink.h>
90 : #include <linux/sock_diag.h>
91 :
92 :
93 : // last include
94 : //
95 : #include <snapdev/poison.h>
96 :
97 :
98 :
99 : namespace ed
100 : {
101 :
102 :
103 :
104 : namespace
105 : {
106 :
107 :
108 :
109 : #pragma GCC diagnostic push
110 : //#pragma GCC diagnostic ignored "-Weff-c++"
111 0 : struct socket_evt
112 : {
113 : typedef std::shared_ptr<socket_evt> pointer_t;
114 : typedef std::deque<pointer_t> deque_t;
115 :
116 : bool f_listening = false;
117 : socket_events * f_socket_events = nullptr;
118 : };
119 : #pragma GCC diagnostic pop
120 :
121 :
122 :
123 : class socket_listener
124 : : public timer
125 : {
126 : public:
127 : typedef std::shared_ptr<socket_listener> pointer_t;
128 :
129 : static constexpr std::size_t const RECEIVE_BUFFER_SIZE = 1'000 * (sizeof(nlmsghdr) + sizeof(inet_diag_msg));
130 : static constexpr int TCP_LISTEN_STATE = 10;
131 :
132 : socket_listener(cppthread::mutex & socket_mutex);
133 : virtual ~socket_listener();
134 :
135 : static pointer_t instance();
136 :
137 : void add_socket_events(socket_events * evts);
138 : void lost_connection(socket_events * evts);
139 : void remove_socket_events(socket_events * evts);
140 :
141 : // connection implementation
142 : virtual bool is_reader() const override;
143 : virtual bool is_writer() const override;
144 : virtual int get_socket() const override;
145 : virtual void process_timeout() override;
146 : virtual void process_read() override;
147 : virtual void process_write() override;
148 : virtual void process_error() override;
149 : virtual void process_hup() override;
150 : virtual void process_invalid() override;
151 :
152 : private:
153 : cppthread::mutex & f_socket_mutex;
154 : snap::raii_fd_t f_netlink_socket = snap::raii_fd_t();
155 : socket_evt::deque_t f_socket_events = socket_evt::deque_t();
156 : };
157 :
158 :
159 2 : socket_listener::pointer_t g_socket_listener = socket_listener::pointer_t();
160 :
161 :
162 0 : socket_listener::socket_listener(cppthread::mutex & socket_mutex)
163 : : timer(1'000'000)
164 : , f_socket_mutex(socket_mutex)
165 : , f_netlink_socket(socket(
166 : AF_NETLINK
167 : , SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK
168 0 : , NETLINK_SOCK_DIAG))
169 : {
170 0 : if(f_netlink_socket < 0)
171 : {
172 0 : throw event_dispatcher_runtime_error("opening SOCK_RAW failed in socket_listener.");
173 : }
174 :
175 : // increase our changes to avoid memory issues
176 : //
177 0 : int const sndbuf(32 * 1'024);
178 0 : if(setsockopt(
179 0 : f_netlink_socket.get()
180 : , SOL_SOCKET
181 : , SO_SNDBUF
182 : , &sndbuf
183 0 : , sizeof(sndbuf)) != 0)
184 : {
185 0 : SNAP_LOG_WARNING
186 0 : << "the SO_SNDBUF failed against the NETLINK socket."
187 : << SNAP_LOG_SEND;
188 : }
189 :
190 : // enough space to support up to about 1,000 messages max.
191 : //
192 0 : int const rcvbuf(RECEIVE_BUFFER_SIZE);
193 0 : if(setsockopt(
194 0 : f_netlink_socket.get()
195 : , SOL_SOCKET
196 : , SO_RCVBUF
197 : , &rcvbuf
198 0 : , sizeof(rcvbuf)) != 0)
199 : {
200 0 : SNAP_LOG_WARNING
201 0 : << "the SO_RCVBUF failed against the NETLINK socket."
202 : << SNAP_LOG_SEND;
203 : }
204 :
205 : #if 0
206 : struct sockaddr_nl addr = {};
207 : addr.nl_family = AF_NETLINK;
208 : addr.nl_pid = getpid();
209 :
210 : // You can find the "groups" flags in Linux source:
211 : // (change the version as required with your current version)
212 : //
213 : // "/usr/src/linux-headers-4.15.0-147/include/net/tcp_states.h
214 : //
215 : addr.nl_groups = TCPF_LISTEN;
216 :
217 : if(bind(d, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)) != 0)
218 : {
219 : throw event_dispatcher_runtime_error("could not bind() the SOCK_RAW of socket_listener.");
220 : }
221 : #endif
222 0 : }
223 :
224 :
225 0 : socket_listener::~socket_listener()
226 : {
227 0 : }
228 :
229 :
230 0 : socket_listener::pointer_t socket_listener::instance()
231 : {
232 0 : static cppthread::mutex g_mutex;
233 :
234 0 : cppthread::guard g(g_mutex);
235 :
236 0 : if(g_socket_listener == nullptr)
237 : {
238 0 : g_socket_listener.reset(new socket_listener(g_mutex));
239 :
240 0 : communicator::instance()->add_connection(g_socket_listener);
241 : }
242 :
243 0 : return g_socket_listener;
244 : }
245 :
246 :
247 0 : void socket_listener::add_socket_events(socket_events * evts)
248 : {
249 0 : if(!evts->get_addr().is_ipv4())
250 : {
251 0 : throw event_dispatcher_invalid_parameter("at this time, the socket listener is limited to IPv4 addresses.");
252 : }
253 :
254 0 : cppthread::guard g(f_socket_mutex);
255 :
256 0 : socket_evt::pointer_t evt(std::make_shared<socket_evt>());
257 0 : evt->f_socket_events = evts;
258 :
259 0 : f_socket_events.push_back(evt);
260 :
261 0 : set_enable(true);
262 0 : }
263 :
264 :
265 0 : void socket_listener::lost_connection(socket_events * evts)
266 : {
267 0 : cppthread::guard g(f_socket_mutex);
268 :
269 0 : auto it(std::find_if(
270 0 : f_socket_events.begin()
271 0 : , f_socket_events.end()
272 0 : , [&evts](socket_evt::pointer_t evt)
273 0 : {
274 0 : return evt->f_socket_events == evts;
275 0 : }));
276 0 : if(it != f_socket_events.end())
277 : {
278 : // if we lost the connection we assume that the other end is not
279 : // listening
280 : //
281 0 : (*it)->f_listening = false;
282 : }
283 :
284 0 : set_enable(true);
285 0 : }
286 :
287 :
288 0 : void socket_listener::remove_socket_events(socket_events * evts)
289 : {
290 0 : cppthread::guard g(f_socket_mutex);
291 :
292 0 : auto it(std::find_if(
293 0 : f_socket_events.begin()
294 0 : , f_socket_events.end()
295 0 : , [&evts](socket_evt::pointer_t evt)
296 0 : {
297 0 : return evt->f_socket_events == evts;
298 0 : }));
299 0 : if(it != f_socket_events.end())
300 : {
301 0 : f_socket_events.erase(it);
302 :
303 0 : if(f_socket_events.empty())
304 : {
305 0 : communicator::instance()->remove_connection(g_socket_listener);
306 :
307 0 : g_socket_listener.reset();
308 : }
309 : }
310 0 : }
311 :
312 :
313 0 : bool socket_listener::is_reader() const
314 : {
315 0 : cppthread::guard g(f_socket_mutex);
316 :
317 0 : return true;
318 : }
319 :
320 :
321 0 : bool socket_listener::is_writer() const
322 : {
323 0 : cppthread::guard g(f_socket_mutex);
324 :
325 0 : for(auto it : f_socket_events)
326 : {
327 0 : if(!it->f_listening)
328 : {
329 0 : return true;
330 : }
331 : }
332 :
333 0 : return false;
334 : }
335 :
336 :
337 0 : int socket_listener::get_socket() const
338 : {
339 0 : return f_netlink_socket.get();
340 : }
341 :
342 :
343 0 : void socket_listener::process_timeout()
344 : {
345 0 : cppthread::guard g(f_socket_mutex);
346 :
347 0 : for(auto it : f_socket_events)
348 : {
349 0 : if(!it->f_listening)
350 : {
351 0 : return;
352 : }
353 : }
354 :
355 : // nothing to check, go to sleep
356 : //
357 0 : set_enable(false);
358 : }
359 :
360 :
361 0 : void socket_listener::process_read()
362 : {
363 0 : sockaddr_nl nladdr = {};
364 :
365 0 : nladdr.nl_family = AF_NETLINK;
366 :
367 0 : char buf[RECEIVE_BUFFER_SIZE * 2];
368 0 : struct iovec vec = {};
369 0 : vec.iov_base = buf;
370 0 : vec.iov_len = sizeof(buf);
371 :
372 : for(;;)
373 : {
374 0 : struct msghdr msg = {};
375 :
376 0 : msg.msg_name = &nladdr;
377 0 : msg.msg_namelen = sizeof(nladdr);
378 0 : msg.msg_iov = &vec;
379 0 : msg.msg_iovlen = 1;
380 :
381 0 : ssize_t size(recvmsg(f_netlink_socket.get(), &msg, 0));
382 0 : if(size < 0)
383 : {
384 0 : int const e(errno);
385 0 : SNAP_LOG_ERROR
386 0 : << "recvmsg() returned with an error: "
387 : << e
388 : << " ("
389 0 : << strerror(e)
390 : << ")."
391 : << SNAP_LOG_SEND;
392 0 : return;
393 : }
394 :
395 0 : if(size == 0)
396 : {
397 : // found end of message stream for now
398 : //
399 0 : return;
400 : }
401 :
402 0 : for(nlmsghdr * h(reinterpret_cast<nlmsghdr *>(buf));
403 0 : NLMSG_OK(h, size);
404 0 : h = NLMSG_NEXT(h, size))
405 : {
406 0 : switch(h->nlmsg_type)
407 : {
408 0 : case NLMSG_DONE:
409 0 : return;
410 :
411 0 : case NLMSG_ERROR:
412 0 : if(h->nlmsg_len < NLMSG_LENGTH(sizeof(nlmsgerr)))
413 : {
414 0 : SNAP_LOG_ERROR
415 0 : << "unknown NLMSG_ERROR received (data buffer too small)."
416 : << SNAP_LOG_SEND;
417 : }
418 : else
419 : {
420 0 : nlmsgerr const * err(reinterpret_cast<nlmsgerr const *>(NLMSG_DATA(h)));
421 0 : int const e(-err->error);
422 0 : if(e != ENOENT)
423 : {
424 0 : SNAP_LOG_ERROR
425 0 : << "NETLINK error: "
426 : << e
427 : << " ("
428 0 : << strerror(e)
429 : << ")."
430 : << SNAP_LOG_SEND;
431 : }
432 : }
433 0 : break;
434 :
435 0 : case SOCK_DIAG_BY_FAMILY:
436 0 : if(h->nlmsg_len < NLMSG_LENGTH(sizeof(inet_diag_msg)))
437 : {
438 0 : SNAP_LOG_WARNING
439 0 : << "NETLINK length (h->nlmsg_len = "
440 0 : << h->nlmsg_len
441 0 : << ", expected at least "
442 0 : << sizeof(inet_diag_msg)
443 : << ") too small for a SOCK_DIAG_BY_FAMILY object."
444 : << SNAP_LOG_SEND;
445 0 : return;
446 : }
447 : else
448 : {
449 0 : inet_diag_msg const * diag(reinterpret_cast<inet_diag_msg const *>(NLMSG_DATA(h)));
450 0 : if(diag->idiag_state == TCP_LISTEN_STATE)
451 : {
452 : // got a listen(), look for which connection this is
453 : // and mark it as valid (open/listening)
454 : //
455 0 : for(auto it : f_socket_events)
456 : {
457 0 : if(!it->f_listening)
458 : {
459 0 : addr::addr a(it->f_socket_events->get_addr());
460 0 : if(a.get_port() == diag->id.idiag_sport)
461 : {
462 0 : sockaddr_in in = {};
463 0 : a.get_ipv4(in);
464 0 : if(in.sin_addr.s_addr == diag->id.idiag_src[0])
465 : {
466 : // got it!
467 : //
468 0 : it->f_socket_events->process_listening();
469 0 : it->f_listening = true;
470 :
471 : // TBD: if we add two connections with the same IP:port combo,
472 : // we get two separate socket_events but I do not know
473 : // whether we'll get one or two replies... so at this
474 : // time do not break this loop
475 : //break;
476 : }
477 : }
478 : }
479 : }
480 : }
481 : }
482 0 : break;
483 :
484 0 : default:
485 0 : SNAP_LOG_WARNING
486 0 : << "unexpected message type (h->nlmsg_type) "
487 0 : << h->nlmsg_type
488 : << SNAP_LOG_SEND;
489 0 : break;
490 :
491 : }
492 : }
493 0 : }
494 : }
495 :
496 :
497 0 : void socket_listener::process_write()
498 : {
499 0 : cppthread::guard g(f_socket_mutex);
500 :
501 : // count the number of requests we have to send
502 : //
503 0 : int const count(std::count_if(
504 0 : f_socket_events.begin()
505 0 : , f_socket_events.end()
506 0 : , [](auto const & evt)
507 : {
508 0 : return !evt->f_listening;
509 0 : }));
510 :
511 : struct nl_request
512 : {
513 : struct nlmsghdr f_nlh;
514 : struct inet_diag_req_v2 f_inet;
515 : };
516 :
517 : // preallocation means that the pointers won't change
518 : // which is important here
519 : //
520 0 : std::vector<nl_request> req(count);
521 0 : std::vector<iovec> vec(count);
522 :
523 0 : int idx(0);
524 0 : for(auto it : f_socket_events)
525 : {
526 0 : if(!it->f_listening)
527 : {
528 0 : addr::addr const & a(it->f_socket_events->get_addr());
529 :
530 0 : sockaddr_in in = {};
531 0 : a.get_ipv4(in);
532 :
533 0 : req[idx].f_nlh.nlmsg_len = sizeof(nl_request);
534 0 : req[idx].f_nlh.nlmsg_type = SOCK_DIAG_BY_FAMILY;
535 0 : req[idx].f_nlh.nlmsg_flags = NLM_F_REQUEST;
536 0 : req[idx].f_inet.sdiag_family = AF_INET;
537 0 : req[idx].f_inet.sdiag_protocol = IPPROTO_TCP;
538 0 : req[idx].f_inet.idiag_ext = 0;
539 0 : req[idx].f_inet.pad = 0;
540 0 : req[idx].f_inet.idiag_states = 0;
541 0 : req[idx].f_inet.id.idiag_sport = in.sin_port;
542 0 : req[idx].f_inet.id.idiag_dport = 0;
543 0 : req[idx].f_inet.id.idiag_src[0] = in.sin_addr.s_addr;
544 0 : req[idx].f_inet.id.idiag_dst[0] = 0;
545 0 : req[idx].f_inet.id.idiag_if = 0;
546 0 : req[idx].f_inet.id.idiag_cookie[0] = INET_DIAG_NOCOOKIE;
547 0 : req[idx].f_inet.id.idiag_cookie[1] = INET_DIAG_NOCOOKIE;
548 :
549 0 : vec[idx].iov_base = &req[idx];
550 0 : vec[idx].iov_len = sizeof(nl_request);
551 :
552 0 : ++idx;
553 : }
554 : }
555 0 : if(idx != count)
556 : {
557 : throw event_dispatcher_implementation_error(
558 : "somehow the number of requests counted ("
559 0 : + std::to_string(count)
560 0 : + ") did not match the number of request created ("
561 0 : + std::to_string(idx)
562 0 : + ").");
563 : }
564 :
565 0 : sockaddr_nl nladdr = {};
566 :
567 0 : nladdr.nl_family = AF_NETLINK;
568 :
569 0 : msghdr msg = {};
570 :
571 0 : msg.msg_name = &nladdr;
572 0 : msg.msg_namelen = sizeof(nladdr);
573 0 : msg.msg_iov = vec.data();
574 0 : msg.msg_iovlen = count;
575 :
576 0 : int const r(sendmsg(f_netlink_socket.get(), &msg, 0));
577 0 : if(r < 0)
578 : {
579 0 : process_error();
580 0 : return;
581 : }
582 : }
583 :
584 :
585 0 : void socket_listener::process_error()
586 : {
587 0 : cppthread::guard g(f_socket_mutex);
588 :
589 0 : socket_evt::deque_t evts(f_socket_events);
590 0 : for(auto it : evts)
591 : {
592 0 : if(!it->f_listening)
593 : {
594 0 : it->f_socket_events->process_error();
595 : }
596 : }
597 0 : }
598 :
599 :
600 0 : void socket_listener::process_hup()
601 : {
602 0 : cppthread::guard g(f_socket_mutex);
603 :
604 0 : for(auto it : f_socket_events)
605 : {
606 0 : if(!it->f_listening)
607 : {
608 0 : it->f_socket_events->process_hup();
609 : }
610 : }
611 0 : }
612 :
613 :
614 0 : void socket_listener::process_invalid()
615 : {
616 0 : cppthread::guard g(f_socket_mutex);
617 :
618 0 : for(auto it : f_socket_events)
619 : {
620 0 : if(!it->f_listening)
621 : {
622 0 : it->f_socket_events->process_invalid();
623 : }
624 : }
625 0 : }
626 :
627 :
628 :
629 :
630 :
631 :
632 : }
633 : // no name detail
634 :
635 :
636 :
637 : /** \brief Initializes this socket event object.
638 : *
639 : * This function initializes the socket_events with the specified address.
640 : * The address will be used to listen for a `listen()` call from any process
641 : * on this system.
642 : *
643 : * \warning
644 : * This only works for local services. Services that run on a remote computer
645 : * must attempt to connect and fail on the connect until the remote service
646 : * is available.
647 : *
648 : * \param[in] a The address and port to poll for a `listen()`.
649 : */
650 0 : socket_events::socket_events(addr::addr const & a)
651 0 : : f_addr(a)
652 : {
653 0 : socket_listener::instance()->add_socket_events(this);
654 0 : }
655 :
656 :
657 : /** \brief Initializes this socket events object.
658 : *
659 : * Initialize a socket_events object to listen to the specified address
660 : * and port for a new connection (i.e. for a process to call `listen()`
661 : * on that address:port combo).
662 : *
663 : * If this is the first socket_events created, then a new socket listener
664 : * is also created.
665 : *
666 : * Then this socket_events object gets added to that socket listener.
667 : *
668 : * \param[in] address The address of the port to wait on.
669 : * \param[in] port The port to poll for a `listen()`.
670 : */
671 0 : socket_events::socket_events(
672 : std::string const & address
673 0 : , int port)
674 : : f_addr(addr::string_to_addr(
675 : address
676 : , "127.0.0.1"
677 : , port
678 0 : , "tcp")) // we really only support TCP at the moment
679 : {
680 0 : socket_listener::instance()->add_socket_events(this);
681 0 : }
682 :
683 :
684 : /** \brief Destroy instance.
685 : *
686 : * This function cleans up this socket event instance. This means the socket
687 : * address and port are removed from the actual socket listener and if that
688 : * was the last socket_events object, the socket listener is also destroyed.
689 : */
690 0 : socket_events::~socket_events()
691 : {
692 0 : socket_listener::instance()->remove_socket_events(this);
693 0 : }
694 :
695 :
696 : /** \brief This higher level connection has no socket.
697 : *
698 : * This function always returns -1 as there is no socket in this connection.
699 : *
700 : * \note
701 : * The one socket is found in the socket_listener which gets created with
702 : * the first socket_events (and destroyed with the last deleted
703 : * socket_events).
704 : *
705 : * \return Always -1.
706 : */
707 0 : int socket_events::get_socket() const
708 : {
709 0 : return -1;
710 : }
711 :
712 :
713 : /** \brief This function gives you access to the address of this connection.
714 : *
715 : * The loops used to create the NETLINK_SOCK_DIAG requests makes use of this
716 : * function to filter on this specific IP address and port.
717 : *
718 : * \return The address we are listening on.
719 : */
720 0 : addr::addr const & socket_events::get_addr() const
721 : {
722 0 : return f_addr;
723 : }
724 :
725 :
726 : /** \brief This function is called whenever you lose a connection.
727 : *
728 : * In most cases, you lose a connection because the service breaks (crashes
729 : * or was restarted) so you need to poll for a `listen()` again. This
730 : * function lets the socket_listener internal object know that you expect
731 : * a call to the process_listening() once the service is available again.
732 : */
733 0 : void socket_events::lost_connection()
734 : {
735 0 : socket_listener::instance()->lost_connection(this);
736 0 : }
737 :
738 :
739 :
740 6 : } // namespace ed
741 : // vim: ts=4 sw=4 et
|