Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/inter_thread_message_connection.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 :
54 :
55 : // cppthread lib
56 : //
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // snapdev lib
61 : //
62 : #include <snapdev/not_reached.h>
63 :
64 :
65 : // C lib
66 : //
67 : #include <poll.h>
68 : #include <sys/eventfd.h>
69 : #include <sys/resource.h>
70 : #include <string.h>
71 :
72 :
73 : // last include
74 : //
75 : #include <snapdev/poison.h>
76 :
77 :
78 :
79 :
80 : namespace ed
81 : {
82 :
83 :
84 :
85 : /** \brief Initializes the inter-thread connection.
86 : *
87 : * This function creates two queues to communicate between two threads.
88 : * At this point, we expect such connections to only be used between
89 : * two threads because we cannot listen on more than one socket.
90 : *
91 : * The connection is expected to be created by "thread A". This means
92 : * the send_message() for "thread A" adds messages to the queue of
93 : * "thread B" and the process_message() for "thread A" reads
94 : * messages from the "thread A" queue, and vice versa.
95 : *
96 : * In order to know whether a queue has data in it, we use an eventfd().
97 : * One of them is for "thread A" and the other is for "thread B".
98 : *
99 : * \todo
100 : * To support all the features of a snap_connection on both sides
101 : * we would have to allocate a sub-connection object for thread B.
102 : * That sub-connection object would then be used just like a full
103 : * regular connection with all of its own parameters. Actually the
104 : * FIFO of messages could then clearly be segregated in each object.
105 : *
106 : * \exception event_dispatcher_initialization_error
107 : * This exception is raised if the pipes (socketpair) cannot be created.
108 : */
109 0 : inter_thread_message_connection::inter_thread_message_connection()
110 : {
111 0 : f_creator_id = cppthread::gettid();
112 :
113 0 : f_thread_a.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
114 0 : if(!f_thread_a)
115 : {
116 : // eventfd could not be created
117 : //
118 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread A");
119 : }
120 :
121 0 : f_thread_b.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
122 0 : if(!f_thread_b)
123 : {
124 0 : f_thread_a.reset();
125 :
126 : // eventfd could not be created
127 : //
128 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread B");
129 : }
130 0 : }
131 :
132 :
133 : /** \brief Make sure to close the eventfd objects.
134 : *
135 : * The destructor ensures that the eventfd objects allocated by the
136 : * constructor get closed.
137 : */
138 0 : inter_thread_message_connection::~inter_thread_message_connection()
139 : {
140 0 : }
141 :
142 :
143 : /** \brief Close the thread communication early.
144 : *
145 : * This function closes the pair of eventfd managed by this
146 : * inter-thread connection object.
147 : *
148 : * After this call, the inter-thread connection is closed and cannot be
149 : * used anymore. The read and write functions will return immediately
150 : * if called.
151 : */
152 0 : void inter_thread_message_connection::close()
153 : {
154 0 : f_thread_a.reset();
155 0 : f_thread_b.reset();
156 0 : }
157 :
158 :
159 : /** \brief Poll the connection in the child.
160 : *
161 : * There can be only one snap_communicator, therefore, the thread
162 : * cannot make use of it since it is only for the main application.
163 : * This poll() function can be used by the child to wait on the
164 : * connection.
165 : *
166 : * You may specify a timeout as usual.
167 : *
168 : * \exception snap_communicator_runtime_error
169 : * If an interrupt happens and stops the poll() then this exception is
170 : * raised. If not enough memory is available to run the poll() function,
171 : * this errors is raised.
172 : *
173 : * \exception snap_communicator_parameter_error
174 : * Somehow a buffer was moved out of our client's space (really that one
175 : * is not likely to happen...). Too many file descriptors in the list of
176 : * fds (not likely to happen since we just have one!)
177 : *
178 : * \exception snap_communicator_parameter_error
179 : *
180 : * \param[in] timeout The maximum amount of time to wait in microseconds.
181 : * Use zero (0) to not block at all.
182 : *
183 : * \return -1 if an error occurs, 0 on success
184 : */
185 0 : int inter_thread_message_connection::poll(int timeout)
186 : {
187 0 : for(;;)
188 : {
189 : // are we even enabled?
190 : //
191 : struct pollfd fd;
192 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
193 0 : fd.fd = get_socket();
194 :
195 0 : if(fd.fd < 0
196 0 : || !is_enabled())
197 : {
198 0 : return -1;
199 : }
200 :
201 : // we cannot use this connection timeout information; it would
202 : // otherwise be common to both threads; so instead we have
203 : // a parameter which is used by the caller to tell us how long
204 : // we have to wait
205 : //
206 : // convert microseconds to milliseconds for poll()
207 : //
208 0 : if(timeout > 0)
209 : {
210 0 : timeout /= 1000;
211 0 : if(timeout == 0)
212 : {
213 : // less than one is a waste of time (CPU intenssive
214 : // until the time is reached, we can be 1 ms off
215 : // instead...)
216 : //
217 0 : timeout = 1;
218 : }
219 : }
220 : else
221 : {
222 : // negative numbers are adjusted to zero.
223 : //
224 0 : timeout = 0;
225 : }
226 :
227 0 : int const r(::poll(&fd, 1, timeout));
228 0 : if(r < 0)
229 : {
230 : // r < 0 means an error occurred
231 : //
232 0 : int const e(errno);
233 :
234 0 : if(e == EINTR)
235 : {
236 : // Note: if the user wants to prevent this error, he should
237 : // use the snap_signal with the Unix signals that may
238 : // happen while calling poll().
239 : //
240 0 : throw event_dispatcher_runtime_error("EINTR occurred while in poll() -- interrupts are not supported yet though");
241 : }
242 0 : if(e == EFAULT)
243 : {
244 0 : throw event_dispatcher_parameter_error("buffer was moved out of our address space?");
245 : }
246 0 : if(e == EINVAL)
247 : {
248 : // if this is really because nfds is too large then it may be
249 : // a "soft" error that can be fixed; that being said, my
250 : // current version is 16K files which frankly when we reach
251 : // that level we have a problem...
252 : //
253 : struct rlimit rl;
254 0 : getrlimit(RLIMIT_NOFILE, &rl);
255 : throw event_dispatcher_parameter_error(
256 : "too many file fds for poll, limit is currently "
257 0 : + std::to_string(rl.rlim_cur)
258 0 : + ", your kernel top limit is "
259 0 : + std::to_string(rl.rlim_max));
260 : }
261 0 : if(e == ENOMEM)
262 : {
263 0 : throw event_dispatcher_runtime_error("poll() failed because of memory");
264 : }
265 : throw event_dispatcher_runtime_error(
266 : "poll() failed with error: "
267 0 : + std::to_string(e)
268 0 : + " -- "
269 0 : + strerror(e));
270 : }
271 :
272 0 : if(r == 0)
273 : {
274 : // poll() timed out, just return so the thread can do some
275 : // additional work
276 : //
277 0 : return 0;
278 : }
279 :
280 : // we reach here when there is something to read
281 : //
282 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
283 : {
284 0 : process_read();
285 : }
286 : // at this point we do not request POLLOUT and assume that the
287 : // write() function will never fail
288 : //
289 : //if((fd.revents & POLLOUT) != 0)
290 : //{
291 : // process_write();
292 : //}
293 0 : if((fd.revents & POLLERR) != 0)
294 : {
295 0 : process_error();
296 : }
297 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
298 : {
299 0 : process_hup();
300 : }
301 0 : if((fd.revents & POLLNVAL) != 0)
302 : {
303 0 : process_invalid();
304 : }
305 : }
306 : snap::NOTREACHED();
307 : }
308 :
309 :
310 : /** \brief Pipe connections accept reads.
311 : *
312 : * This function returns true meaning that the pipe connection can be
313 : * used to read data.
314 : *
315 : * \return true since a pipe connection is a reader.
316 : */
317 0 : bool inter_thread_message_connection::is_reader() const
318 : {
319 0 : return true;
320 : }
321 :
322 :
323 : /** \brief This function returns the pipe we want to listen on.
324 : *
325 : * This function returns the file descriptor of one of the two
326 : * sockets. The parent process returns the descriptor of socket
327 : * number 0. The child process returns the descriptor of socket
328 : * number 1.
329 : *
330 : * \note
331 : * If the close() function was called, this function returns -1.
332 : *
333 : * \return A pipe descriptor to listen on with poll().
334 : */
335 0 : int inter_thread_message_connection::get_socket() const
336 : {
337 0 : if(f_creator_id == cppthread::gettid())
338 : {
339 0 : return f_thread_a.get();
340 : }
341 :
342 0 : return f_thread_b.get();
343 : }
344 :
345 :
346 : /** \brief Read one message from the FIFO.
347 : *
348 : * This function reads one message from the FIFO specific to this
349 : * thread. If the FIFO is empty,
350 : *
351 : * The function makes sure to use the correct socket for the calling
352 : * process (i.e. depending on whether this is the parent or child.)
353 : *
354 : * Just like the system write(2) function, errno is set to the error
355 : * that happened when the function returns -1.
356 : *
357 : * \warning
358 : * At the moment this class does not support the dispatcher
359 : * extension.
360 : *
361 : * \return The number of bytes written to this pipe socket, or -1 on errors.
362 : */
363 0 : void inter_thread_message_connection::process_read()
364 : {
365 0 : message msg;
366 :
367 0 : bool const is_thread_a(f_creator_id == cppthread::gettid());
368 :
369 : // retrieve the message
370 : //
371 0 : bool const got_message((is_thread_a ? f_message_a : f_message_b).pop_front(msg, 0));
372 :
373 : // "remove" that one object from the semaphore counter
374 : //
375 0 : uint64_t value(1);
376 : //#pragma GCC diagnostic push
377 : //#pragma GCC diagnostic ignored "-Wunused-result"
378 0 : if(read(is_thread_a ? f_thread_a.get() : f_thread_b.get(), &value, sizeof(value)) != sizeof(value))
379 : {
380 0 : throw event_dispatcher_runtime_error("an error occurred while reading from inter-thread eventfd description.");
381 : }
382 : //#pragma GCC diagnostic pop
383 :
384 : // send the message for processing
385 : // got_message should always be true, but just in case...
386 : //
387 0 : if(got_message)
388 : {
389 0 : if(is_thread_a)
390 : {
391 0 : process_message_a(msg);
392 : }
393 : else
394 : {
395 0 : process_message_b(msg);
396 : }
397 : }
398 0 : }
399 :
400 :
401 : /** \brief Send a message to the other end of this connection.
402 : *
403 : * This function sends the specified \p message to the thread
404 : * on the other side of the connection.
405 : *
406 : * \note
407 : * We are not a writer. We directly write to the corresponding
408 : * thread eventfd() so it can wake up and read the message we
409 : * just sent. There is only one reason for which the write
410 : * would not be available, we already sent 2^64-2 messages,
411 : * which is not likely to happen since memory would not support
412 : * that many messages.
413 : *
414 : * \todo
415 : * One day we probably will want to be able to have support for a
416 : * process_write() callback... Maybe we should do the write there.
417 : * Only we need to know where the write() would have to happen...
418 : * That's a bit complicated right now for a feature that would not
419 : * get tested well...
420 : *
421 : * \param[in] message The message to send to the other side.
422 : * \param[in] cache These messages are always cached so this is ignored.
423 : *
424 : * \return true of the message was sent, false if it was cached or failed.
425 : */
426 0 : bool inter_thread_message_connection::send_message(message const & msg, bool cache)
427 : {
428 0 : snap::NOTUSED(cache);
429 :
430 0 : if(f_creator_id == cppthread::gettid())
431 : {
432 0 : f_message_b.push_back(msg);
433 0 : uint64_t const value(1);
434 0 : return write(f_thread_b.get(), &value, sizeof(value)) == sizeof(value);
435 : }
436 : else
437 : {
438 0 : f_message_a.push_back(msg);
439 0 : uint64_t const value(1);
440 0 : return write(f_thread_a.get(), &value, sizeof(value)) == sizeof(value);
441 : }
442 : }
443 :
444 :
445 :
446 6 : } // namespace ed
447 : // vim: ts=4 sw=4 et
|