Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // to get the POLLRDHUP definition
43 : #ifndef _GNU_SOURCE
44 : #define _GNU_SOURCE
45 : #endif
46 :
47 :
48 : // self
49 : //
50 : #include "eventdispatcher/inter_thread_message_connection.h"
51 :
52 : #include "eventdispatcher/exception.h"
53 :
54 :
55 : // cppthread lib
56 : //
57 : #include <cppthread/thread.h>
58 :
59 :
60 : // snapdev lib
61 : //
62 : #include <snapdev/not_reached.h>
63 :
64 :
65 : // C lib
66 : //
67 : #include <poll.h>
68 : #include <sys/eventfd.h>
69 : #include <sys/resource.h>
70 : #include <string.h>
71 :
72 :
73 : // last include
74 : //
75 : #include <snapdev/poison.h>
76 :
77 :
78 :
79 :
80 : namespace ed
81 : {
82 :
83 :
84 :
85 : /** \brief Initializes the inter-thread connection.
86 : *
87 : * This function creates two queues to communicate between two threads.
88 : * At this point, we expect such connections to only be used between
89 : * two threads because we cannot listen on more than one socket.
90 : *
91 : * The connection is expected to be created by "thread A". This means
92 : * the send_message() for "thread A" adds messages to the queue of
93 : * "thread B" and the process_message() for "thread A" reads
94 : * messages from the "thread A" queue, and vice versa.
95 : *
96 : * In order to know whether a queue has data in it, we use an eventfd().
97 : * One of them is for "thread A" and the other is for "thread B".
98 : *
99 : * \todo
100 : * To support all the features of a connection on both sides
101 : * we would have to allocate a sub-connection object for thread B.
102 : * That sub-connection object would then be used just like a full
103 : * regular connection with all of its own parameters. Actually the
104 : * FIFO of messages could then clearly be segregated in each object.
105 : *
106 : * \exception event_dispatcher_initialization_error
107 : * This exception is raised if the pipes (socketpair) cannot be created.
108 : */
109 0 : inter_thread_message_connection::inter_thread_message_connection()
110 : {
111 0 : f_creator_id = cppthread::gettid();
112 :
113 0 : f_thread_a.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
114 0 : if(!f_thread_a)
115 : {
116 : // eventfd could not be created
117 : //
118 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread A");
119 : }
120 :
121 0 : f_thread_b.reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE));
122 0 : if(!f_thread_b)
123 : {
124 0 : f_thread_a.reset();
125 :
126 : // eventfd could not be created
127 : //
128 0 : throw event_dispatcher_initialization_error("could not create eventfd for thread B");
129 : }
130 0 : }
131 :
132 :
133 : /** \brief Make sure to close the eventfd objects.
134 : *
135 : * The destructor ensures that the eventfd objects allocated by the
136 : * constructor get closed.
137 : */
138 0 : inter_thread_message_connection::~inter_thread_message_connection()
139 : {
140 0 : }
141 :
142 :
143 : /** \brief Close the thread communication early.
144 : *
145 : * This function closes the pair of eventfd managed by this
146 : * inter-thread connection object.
147 : *
148 : * After this call, the inter-thread connection is closed and cannot be
149 : * used anymore. The read and write functions will return immediately
150 : * if called.
151 : */
152 0 : void inter_thread_message_connection::close()
153 : {
154 0 : f_thread_a.reset();
155 0 : f_thread_b.reset();
156 0 : }
157 :
158 :
159 : /** \brief Poll the connection in the child.
160 : *
161 : * There can be only one communicator, therefore, the thread
162 : * cannot make use of it since it is only for the main application.
163 : * This poll() function can be used by the child to wait on the
164 : * connection.
165 : *
166 : * You may specify a timeout as usual.
167 : *
168 : * \exception event_dispatcher_runtime_error
169 : * If an interrupt happens and stops the poll() then this exception is
170 : * raised. If not enough memory is available to run the poll() function,
171 : * this errors is raised.
172 : *
173 : * \exception event_dispatcher_parameter_error
174 : * Somehow a buffer was moved out of our client's space (really that one
175 : * is not likely to happen...). Too many file descriptors in the list of
176 : * fds (not likely to happen since we just have one!)
177 : *
178 : * \param[in] timeout The maximum amount of time to wait in microseconds.
179 : * Use zero (0) to not block at all.
180 : *
181 : * \return -1 if an error occurs, 0 on success
182 : */
183 0 : int inter_thread_message_connection::poll(int timeout)
184 : {
185 : for(;;)
186 : {
187 : // are we even enabled?
188 : //
189 0 : struct pollfd fd;
190 0 : fd.events = POLLIN | POLLPRI | POLLRDHUP;
191 0 : fd.fd = get_socket();
192 :
193 0 : if(fd.fd < 0
194 0 : || !is_enabled())
195 : {
196 0 : return -1;
197 : }
198 :
199 : // we cannot use this connection timeout information; it would
200 : // otherwise be common to both threads; so instead we have
201 : // a parameter which is used by the caller to tell us how long
202 : // we have to wait
203 : //
204 : // convert microseconds to milliseconds for poll()
205 : //
206 0 : if(timeout > 0)
207 : {
208 0 : timeout /= 1000;
209 0 : if(timeout == 0)
210 : {
211 : // less than one is a waste of time (CPU intensive
212 : // until the time is reached, we can be 1 ms off
213 : // instead...)
214 : //
215 0 : timeout = 1;
216 : }
217 : }
218 : else
219 : {
220 : // negative numbers are adjusted to zero.
221 : //
222 0 : timeout = 0;
223 : }
224 :
225 0 : int const r(::poll(&fd, 1, timeout));
226 0 : if(r < 0)
227 : {
228 : // r < 0 means an error occurred
229 : //
230 0 : int const e(errno);
231 :
232 0 : if(e == EINTR)
233 : {
234 : // Note: if the user wants to prevent this error, he should
235 : // use the signal with the Unix signals that may
236 : // happen while calling poll().
237 : //
238 0 : throw event_dispatcher_runtime_error("EINTR occurred while in poll() -- interrupts are not supported yet though");
239 : }
240 0 : if(e == EFAULT)
241 : {
242 0 : throw event_dispatcher_parameter_error("buffer was moved out of our address space?");
243 : }
244 0 : if(e == EINVAL)
245 : {
246 : // if this is really because nfds is too large then it may be
247 : // a "soft" error that can be fixed; that being said, my
248 : // current version is 16K files which frankly when we reach
249 : // that level we have a problem...
250 : //
251 0 : struct rlimit rl;
252 0 : getrlimit(RLIMIT_NOFILE, &rl);
253 : throw event_dispatcher_parameter_error(
254 : "too many file fds for poll, limit is currently "
255 0 : + std::to_string(rl.rlim_cur)
256 0 : + ", your kernel top limit is "
257 0 : + std::to_string(rl.rlim_max));
258 : }
259 0 : if(e == ENOMEM)
260 : {
261 0 : throw event_dispatcher_runtime_error("poll() failed because of memory");
262 : }
263 : throw event_dispatcher_runtime_error(
264 : "poll() failed with error: "
265 0 : + std::to_string(e)
266 0 : + " -- "
267 0 : + strerror(e));
268 : }
269 :
270 0 : if(r == 0)
271 : {
272 : // poll() timed out, just return so the thread can do some
273 : // additional work
274 : //
275 0 : return 0;
276 : }
277 :
278 : // we reach here when there is something to read
279 : //
280 0 : if((fd.revents & (POLLIN | POLLPRI)) != 0)
281 : {
282 0 : process_read();
283 : }
284 : // at this point we do not request POLLOUT and assume that the
285 : // write() function will never fail
286 : //
287 : //if((fd.revents & POLLOUT) != 0)
288 : //{
289 : // process_write();
290 : //}
291 0 : if((fd.revents & POLLERR) != 0)
292 : {
293 0 : process_error();
294 : }
295 0 : if((fd.revents & (POLLHUP | POLLRDHUP)) != 0)
296 : {
297 0 : process_hup();
298 : }
299 0 : if((fd.revents & POLLNVAL) != 0)
300 : {
301 0 : process_invalid();
302 : }
303 0 : }
304 : snap::NOT_REACHED();
305 : }
306 :
307 :
308 : /** \brief Pipe connections accept reads.
309 : *
310 : * This function returns true meaning that the pipe connection can be
311 : * used to read data.
312 : *
313 : * \return true since a pipe connection is a reader.
314 : */
315 0 : bool inter_thread_message_connection::is_reader() const
316 : {
317 0 : return true;
318 : }
319 :
320 :
321 : /** \brief This function returns the pipe we want to listen on.
322 : *
323 : * This function returns the file descriptor of one of the two
324 : * sockets. The parent process returns the descriptor of socket
325 : * number 0. The child process returns the descriptor of socket
326 : * number 1.
327 : *
328 : * \note
329 : * If the close() function was called, this function returns -1.
330 : *
331 : * \return A pipe descriptor to listen on with poll().
332 : */
333 0 : int inter_thread_message_connection::get_socket() const
334 : {
335 0 : if(f_creator_id == cppthread::gettid())
336 : {
337 0 : return f_thread_a.get();
338 : }
339 :
340 0 : return f_thread_b.get();
341 : }
342 :
343 :
344 : /** \brief Read one message from the FIFO.
345 : *
346 : * This function reads one message from the FIFO specific to this
347 : * thread. If the FIFO is empty,
348 : *
349 : * The function makes sure to use the correct socket for the calling
350 : * process (i.e. depending on whether this is the parent or child.)
351 : *
352 : * Just like the system write(2) function, errno is set to the error
353 : * that happened when the function returns -1.
354 : *
355 : * \warning
356 : * At the moment this class does not support the dispatcher
357 : * extension.
358 : *
359 : * \return The number of bytes written to this pipe socket, or -1 on errors.
360 : */
361 0 : void inter_thread_message_connection::process_read()
362 : {
363 0 : message msg;
364 :
365 0 : bool const is_thread_a(f_creator_id == cppthread::gettid());
366 :
367 : // retrieve the message
368 : //
369 0 : bool const got_message((is_thread_a ? f_message_a : f_message_b).pop_front(msg, 0));
370 :
371 : // "remove" that one object from the semaphore counter
372 : //
373 0 : uint64_t value(1);
374 : //#pragma GCC diagnostic push
375 : //#pragma GCC diagnostic ignored "-Wunused-result"
376 0 : if(read(is_thread_a ? f_thread_a.get() : f_thread_b.get(), &value, sizeof(value)) != sizeof(value))
377 : {
378 0 : throw event_dispatcher_runtime_error("an error occurred while reading from inter-thread eventfd description.");
379 : }
380 : //#pragma GCC diagnostic pop
381 :
382 : // send the message for processing
383 : // got_message should always be true, but just in case...
384 : //
385 0 : if(got_message)
386 : {
387 0 : if(is_thread_a)
388 : {
389 0 : process_message_a(msg);
390 : }
391 : else
392 : {
393 0 : process_message_b(msg);
394 : }
395 : }
396 0 : }
397 :
398 :
399 : /** \brief Send a message to the other end of this connection.
400 : *
401 : * This function sends the specified \p message to the thread
402 : * on the other side of the connection.
403 : *
404 : * \note
405 : * We are not a writer. We directly write to the corresponding
406 : * thread eventfd() so it can wake up and read the message we
407 : * just sent. There is only one reason for which the write
408 : * would not be available, we already sent 2^64-2 messages,
409 : * which is not likely to happen since memory would not support
410 : * that many messages.
411 : *
412 : * \todo
413 : * One day we probably will want to be able to have support for a
414 : * process_write() callback... Maybe we should do the write there.
415 : * Only we need to know where the write() would have to happen...
416 : * That's a bit complicated right now for a feature that would not
417 : * get tested well...
418 : *
419 : * \param[in] message The message to send to the other side.
420 : * \param[in] cache These messages are always cached so this is ignored.
421 : *
422 : * \return true of the message was sent, false if it was cached or failed.
423 : */
424 0 : bool inter_thread_message_connection::send_message(message const & msg, bool cache)
425 : {
426 0 : snap::NOT_USED(cache);
427 :
428 0 : if(f_creator_id == cppthread::gettid())
429 : {
430 0 : f_message_b.push_back(msg);
431 0 : uint64_t const value(1);
432 0 : return write(f_thread_b.get(), &value, sizeof(value)) == sizeof(value);
433 : }
434 : else
435 : {
436 0 : f_message_a.push_back(msg);
437 0 : uint64_t const value(1);
438 0 : return write(f_thread_a.get(), &value, sizeof(value)) == sizeof(value);
439 : }
440 : }
441 :
442 :
443 :
444 6 : } // namespace ed
445 : // vim: ts=4 sw=4 et
|