Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 : // to get the POLLRDHUP definition
40 : #ifndef _GNU_SOURCE
41 : #define _GNU_SOURCE
42 : #endif
43 :
44 :
45 : // self
46 : //
47 : #include "eventdispatcher/inter_thread_message_connection.h"
48 :
49 : #include "eventdispatcher/exception.h"
50 :
51 :
52 : // cppthread lib
53 : //
54 : #include "cppthread/thread.h"
55 :
56 :
57 : // snapdev lib
58 : //
59 : #include "snapdev/not_reached.h"
60 :
61 :
62 : // C lib
63 : //
64 : #include <poll.h>
65 : #include <sys/eventfd.h>
66 : #include <sys/resource.h>
67 : #include <string.h>
68 :
69 :
70 : // last include
71 : //
72 : #include <snapdev/poison.h>
73 :
74 :
75 :
76 :
77 : namespace ed
78 : {
79 :
80 :
81 :
82 : /** \brief Initializes the inter-thread connection.
83 : *
84 : * This function creates two queues to communicate between two threads.
85 : * At this point, we expect such connections to only be used between
86 : * two threads because we cannot listen on more than one socket.
87 : *
88 : * The connection is expected to be created by "thread A". This means
89 : * the send_message() for "thread A" adds messages to the queue of
90 : * "thread B" and the process_message() for "thread A" reads
91 : * messages from the "thread A" queue, and vice versa.
92 : *
93 : * In order to know whether a queue has data in it, we use an eventfd().
94 : * One of them is for "thread A" and the other is for "thread B".
95 : *
96 : * \todo
97 : * To support all the features of a snap_connection on both sides
98 : * we would have to allocate a sub-connection object for thread B.
99 : * That sub-connection object would then be used just like a full
100 : * regular connection with all of its own parameters. Actually the
101 : * FIFO of messages could then clearly be segregated in each object.
102 : *
103 : * \exception event_dispatcher_initialization_error
104 : * This exception is raised if the pipes (socketpair) cannot be created.
105 : */
106 0 : inter_thread_message_connection::inter_thread_message_connection()
107 : {
108 0 : f_creator_id = cppthread::gettid();
109 :
110 0 : f_thread_a.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
111 0 : if(!f_thread_a)
112 : {
113 : // eventfd could not be created
114 : //
115 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread A");
116 : }
117 :
118 0 : f_thread_b.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
119 0 : if(!f_thread_b)
120 : {
121 0 : f_thread_a.reset();
122 :
123 : // eventfd could not be created
124 : //
125 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread B");
126 : }
127 0 : }
128 :
129 :
130 : /** \brief Make sure to close the eventfd objects.
131 : *
132 : * The destructor ensures that the eventfd objects allocated by the
133 : * constructor get closed.
134 : */
135 0 : inter_thread_message_connection::~inter_thread_message_connection()
136 : {
137 0 : }
138 :
139 :
140 : /** \brief Close the thread communication early.
141 : *
142 : * This function closes the pair of eventfd managed by this
143 : * inter-thread connection object.
144 : *
145 : * After this call, the inter-thread connection is closed and cannot be
146 : * used anymore. The read and write functions will return immediately
147 : * if called.
148 : */
149 0 : void inter_thread_message_connection::close()
150 : {
151 0 : f_thread_a.reset();
152 0 : f_thread_b.reset();
153 0 : }
154 :
155 :
156 : /** \brief Poll the connection in the child.
157 : *
158 : * There can be only one snap_communicator, therefore, the thread
159 : * cannot make use of it since it is only for the main application.
160 : * This poll() function can be used by the child to wait on the
161 : * connection.
162 : *
163 : * You may specify a timeout as usual.
164 : *
165 : * \exception snap_communicator_runtime_error
166 : * If an interrupt happens and stops the poll() then this exception is
167 : * raised. If not enough memory is available to run the poll() function,
168 : * this errors is raised.
169 : *
170 : * \exception snap_communicator_parameter_error
171 : * Somehow a buffer was moved out of our client's space (really that one
172 : * is not likely to happen...). Too many file descriptors in the list of
173 : * fds (not likely to happen since we just have one!)
174 : *
175 : * \exception snap_communicator_parameter_error
176 : *
177 : * \param[in] timeout The maximum amount of time to wait in microseconds.
178 : * Use zero (0) to not block at all.
179 : *
180 : * \return -1 if an error occurs, 0 on success
181 : */
182 0 : int inter_thread_message_connection::poll(int timeout)
183 : {
184 0 : for(;;)
185 : {
186 : // are we even enabled?
187 : //
188 : struct pollfd fd;
189 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
190 0 : fd.fd = get_socket();
191 :
192 0 : if(fd.fd < 0
193 0 : || !is_enabled())
194 : {
195 0 : return -1;
196 : }
197 :
198 : // we cannot use this connection timeout information; it would
199 : // otherwise be common to both threads; so instead we have
200 : // a parameter which is used by the caller to tell us how long
201 : // we have to wait
202 : //
203 : // convert microseconds to milliseconds for poll()
204 : //
205 0 : if(timeout > 0)
206 : {
207 0 : timeout /= 1000;
208 0 : if(timeout == 0)
209 : {
210 : // less than one is a waste of time (CPU intenssive
211 : // until the time is reached, we can be 1 ms off
212 : // instead...)
213 : //
214 0 : timeout = 1;
215 : }
216 : }
217 : else
218 : {
219 : // negative numbers are adjusted to zero.
220 : //
221 0 : timeout = 0;
222 : }
223 :
224 0 : int const r(::poll(&fd, 1, timeout));
225 0 : if(r < 0)
226 : {
227 : // r < 0 means an error occurred
228 : //
229 0 : int const e(errno);
230 :
231 0 : if(e == EINTR)
232 : {
233 : // Note: if the user wants to prevent this error, he should
234 : // use the snap_signal with the Unix signals that may
235 : // happen while calling poll().
236 : //
237 0 : throw event_dispatcher_runtime_error("EINTR occurred while in poll() -- interrupts are not supported yet though");
238 : }
239 0 : if(e == EFAULT)
240 : {
241 0 : throw event_dispatcher_parameter_error("buffer was moved out of our address space?");
242 : }
243 0 : if(e == EINVAL)
244 : {
245 : // if this is really because nfds is too large then it may be
246 : // a "soft" error that can be fixed; that being said, my
247 : // current version is 16K files which frankly when we reach
248 : // that level we have a problem...
249 : //
250 : struct rlimit rl;
251 0 : getrlimit(RLIMIT_NOFILE, &rl);
252 : throw event_dispatcher_parameter_error(
253 : "too many file fds for poll, limit is currently "
254 0 : + std::to_string(rl.rlim_cur)
255 0 : + ", your kernel top limit is "
256 0 : + std::to_string(rl.rlim_max));
257 : }
258 0 : if(e == ENOMEM)
259 : {
260 0 : throw event_dispatcher_runtime_error("poll() failed because of memory");
261 : }
262 : throw event_dispatcher_runtime_error(
263 : "poll() failed with error: "
264 0 : + std::to_string(e)
265 0 : + " -- "
266 0 : + strerror(e));
267 : }
268 :
269 0 : if(r == 0)
270 : {
271 : // poll() timed out, just return so the thread can do some
272 : // additional work
273 : //
274 0 : return 0;
275 : }
276 :
277 : // we reach here when there is something to read
278 : //
279 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
280 : {
281 0 : process_read();
282 : }
283 : // at this point we do not request POLLOUT and assume that the
284 : // write() function will never fail
285 : //
286 : //if((fd.revents & POLLOUT) != 0)
287 : //{
288 : // process_write();
289 : //}
290 0 : if((fd.revents & POLLERR) != 0)
291 : {
292 0 : process_error();
293 : }
294 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
295 : {
296 0 : process_hup();
297 : }
298 0 : if((fd.revents & POLLNVAL) != 0)
299 : {
300 0 : process_invalid();
301 : }
302 : }
303 : snap::NOTREACHED();
304 : }
305 :
306 :
307 : /** \brief Pipe connections accept reads.
308 : *
309 : * This function returns true meaning that the pipe connection can be
310 : * used to read data.
311 : *
312 : * \return true since a pipe connection is a reader.
313 : */
314 0 : bool inter_thread_message_connection::is_reader() const
315 : {
316 0 : return true;
317 : }
318 :
319 :
320 : /** \brief This function returns the pipe we want to listen on.
321 : *
322 : * This function returns the file descriptor of one of the two
323 : * sockets. The parent process returns the descriptor of socket
324 : * number 0. The child process returns the descriptor of socket
325 : * number 1.
326 : *
327 : * \note
328 : * If the close() function was called, this function returns -1.
329 : *
330 : * \return A pipe descriptor to listen on with poll().
331 : */
332 0 : int inter_thread_message_connection::get_socket() const
333 : {
334 0 : if(f_creator_id == cppthread::gettid())
335 : {
336 0 : return f_thread_a.get();
337 : }
338 :
339 0 : return f_thread_b.get();
340 : }
341 :
342 :
343 : /** \brief Read one message from the FIFO.
344 : *
345 : * This function reads one message from the FIFO specific to this
346 : * thread. If the FIFO is empty,
347 : *
348 : * The function makes sure to use the correct socket for the calling
349 : * process (i.e. depending on whether this is the parent or child.)
350 : *
351 : * Just like the system write(2) function, errno is set to the error
352 : * that happened when the function returns -1.
353 : *
354 : * \warning
355 : * At the moment this class does not support the dispatcher
356 : * extension.
357 : *
358 : * \return The number of bytes written to this pipe socket, or -1 on errors.
359 : */
360 0 : void inter_thread_message_connection::process_read()
361 : {
362 0 : message msg;
363 :
364 0 : bool const is_thread_a(f_creator_id == cppthread::gettid());
365 :
366 : // retrieve the message
367 : //
368 0 : bool const got_message((is_thread_a ? f_message_a : f_message_b).pop_front(msg, 0));
369 :
370 : // "remove" that one object from the semaphore counter
371 : //
372 0 : uint64_t value(1);
373 : //#pragma GCC diagnostic push
374 : //#pragma GCC diagnostic ignored "-Wunused-result"
375 0 : if(read(is_thread_a ? f_thread_a.get() : f_thread_b.get(), &value, sizeof(value)) != sizeof(value))
376 : {
377 0 : throw event_dispatcher_runtime_error("an error occurred while reading from inter-thread eventfd description.");
378 : }
379 : //#pragma GCC diagnostic pop
380 :
381 : // send the message for processing
382 : // got_message should always be true, but just in case...
383 : //
384 0 : if(got_message)
385 : {
386 0 : if(is_thread_a)
387 : {
388 0 : process_message_a(msg);
389 : }
390 : else
391 : {
392 0 : process_message_b(msg);
393 : }
394 : }
395 0 : }
396 :
397 :
398 : /** \brief Send a message to the other end of this connection.
399 : *
400 : * This function sends the specified \p message to the thread
401 : * on the other side of the connection.
402 : *
403 : * \note
404 : * We are not a writer. We directly write to the corresponding
405 : * thread eventfd() so it can wake up and read the message we
406 : * just sent. There is only one reason for which the write
407 : * would not be available, we already sent 2^64-2 messages,
408 : * which is not likely to happen since memory would not support
409 : * that many messages.
410 : *
411 : * \todo
412 : * One day we probably will want to be able to have support for a
413 : * process_write() callback... Maybe we should do the write there.
414 : * Only we need to know where the write() would have to happen...
415 : * That's a bit complicated right now for a feature that would not
416 : * get tested well...
417 : *
418 : * \param[in] message The message to send to the other side.
419 : * \param[in] cache These messages are always cached so this is ignored.
420 : *
421 : * \return true of the message was sent, false if it was cached or failed.
422 : */
423 0 : bool inter_thread_message_connection::send_message(message const & msg, bool cache)
424 : {
425 0 : snap::NOTUSED(cache);
426 :
427 0 : if(f_creator_id == cppthread::gettid())
428 : {
429 0 : f_message_b.push_back(msg);
430 0 : uint64_t const value(1);
431 0 : return write(f_thread_b.get(), &value, sizeof(value)) == sizeof(value);
432 : }
433 : else
434 : {
435 0 : f_message_a.push_back(msg);
436 0 : uint64_t const value(1);
437 0 : return write(f_thread_a.get(), &value, sizeof(value)) == sizeof(value);
438 : }
439 : }
440 :
441 :
442 :
443 6 : } // namespace ed
444 : // vim: ts=4 sw=4 et
|