Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 : // to get the POLLRDHUP definition
40 : #ifndef _GNU_SOURCE
41 : #define _GNU_SOURCE
42 : #endif
43 :
44 :
45 : // self
46 : //
47 : #include "eventdispatcher/fd_buffer_connection.h"
48 :
49 : #include "eventdispatcher/utils.h"
50 :
51 :
52 : // snaplogger lib
53 : //
54 : #include "snaplogger/message.h"
55 :
56 :
57 : //// snapdev lib
58 : ////
59 : //#include "snapdev/not_reached.h"
60 : //#include "snapdev/not_used.h"
61 : //#include "snapdev/string_replace_many.h"
62 : //
63 : //
64 : //// libaddr lib
65 : ////
66 : //#include "libaddr/addr_parser.h"
67 : //
68 : //
69 : //// C++ lib
70 : ////
71 : //#include <sstream>
72 : //#include <limits>
73 : //#include <atomic>
74 :
75 :
76 : // C lib
77 : //
78 : //#include <fcntl.h>
79 : #include <string.h>
80 : //#include <unistd.h>
81 : //#include <sys/eventfd.h>
82 : //#include <sys/inotify.h>
83 : //#include <sys/ioctl.h>
84 : //#include <sys/resource.h>
85 : //#include <sys/syscall.h>
86 : //#include <sys/time.h>
87 :
88 :
89 : // last include
90 : //
91 : #include <snapdev/poison.h>
92 :
93 :
94 :
95 : namespace ed
96 : {
97 :
98 :
99 :
100 : /** \brief Initialize an fd connection with a buffer.
101 : *
102 : * The connection is initialized with the specified fd parameter.
103 : *
104 : * This initialization, so things work as expected in our environment,
105 : * marks the file descriptor as non-blocking. This is important for
106 : * the reader and writer capabilities.
107 : *
108 : * \param[in] fd The file descriptor (often a pipe).
109 : * \param[in] mode The mode describing the file descriptor (i.e. read-only
110 : * write-only, or read/write.)
111 : */
112 0 : fd_buffer_connection::fd_buffer_connection(int fd, mode_t mode)
113 0 : : fd_connection(fd, mode)
114 : {
115 0 : non_blocking();
116 0 : }
117 :
118 :
119 : /** \brief Check whether this file descriptor still has some buffered input.
120 : *
121 : * This function returns true if there is partial incoming data in this
122 : * object's buffer.
123 : *
124 : * \return true if some buffered input is waiting for completion.
125 : */
126 0 : bool fd_buffer_connection::has_input() const
127 : {
128 0 : return !f_line.empty();
129 : }
130 :
131 :
132 :
133 : /** \brief Check whether this file descriptor still has some buffered output.
134 : *
135 : * This function returns true if there is still some output data in the
136 : * output cache buffer. Output is added by the write() function, which is
137 : * called, for example, by the send_message() function.
138 : *
139 : * Note that if the fd was already closed, this function may still return
140 : * true in the event we have some cached data.
141 : *
142 : * \return true if some buffered output is waiting to be sent out.
143 : */
144 0 : bool fd_buffer_connection::has_output() const
145 : {
146 0 : return !f_output.empty();
147 : }
148 :
149 :
150 :
151 : /** \brief Tells that this file descriptor is a writer when we have data.
152 : *
153 : * This function checks to know whether there is output data to be writen
154 : * to this file descriptor. If so then the function returns true. Otherwise
155 : * it just returns false.
156 : *
157 : * This happens whenever you called the write() function and the connection
158 : * cache is not empty yet.
159 : *
160 : * Note that if the connection was closed or it not writable (as per the
161 : * fd mode specified when creating this connection) then this function
162 : * returns false.
163 : *
164 : * \return true if there is data to write to the fd, false otherwise.
165 : */
166 0 : bool fd_buffer_connection::is_writer() const
167 : {
168 0 : return !f_output.empty() && fd_connection::is_writer();
169 : }
170 :
171 :
172 : /** \brief Write data to the connection.
173 : *
174 : * This function can be used to send data to this file descriptor.
175 : * The data is bufferized and as soon as the connection file descriptor
176 : * can accept more data it gets written there. In other words, we cannot
177 : * just sleep and wait for an answer. The transfer of the data is therefore
178 : * asynchroneous.
179 : *
180 : * \todo
181 : * Determine whether we may end up with really large buffers that
182 : * grow for a long time. This function only inserts and the
183 : * process_signal() function only reads some of the bytes but it
184 : * does not reduce the size of the buffer until all the data was
185 : * sent.
186 : *
187 : * \note
188 : * The function returns -1 and sets errno to EBADF if the file
189 : * descriptor was closed (-1) or if it is not marked as a writer.
190 : *
191 : * \param[in] data The pointer to the buffer of data to be sent.
192 : * \param[out] length The number of bytes to send.
193 : *
194 : * \return The number of bytes written, either -1, 0, or \p length
195 : */
196 0 : ssize_t fd_buffer_connection::write(void const * data, size_t const length)
197 : {
198 : // WARNING: We MUST call the fd_connection version of the is_writer(),
199 : // because the fd_buffer_connection::is_writer() also checks
200 : // the f_output buffer which has unwanted side effects
201 : //
202 0 : if(get_socket() == -1
203 0 : || !fd_connection::is_writer()) // WARNING: see above
204 : {
205 0 : errno = EBADF;
206 0 : return -1;
207 : }
208 :
209 0 : if(data != nullptr
210 0 : && length > 0)
211 : {
212 0 : char const * d(reinterpret_cast<char const *>(data));
213 0 : f_output.insert(f_output.end(), d, d + length);
214 0 : return length;
215 : }
216 :
217 0 : return 0;
218 : }
219 :
220 :
221 : /** \brief Read and process as much data as possible.
222 : *
223 : * This function reads as much incoming data as possible and processes
224 : * it.
225 : *
226 : * If the input includes a newline character ('\n') then this function
227 : * calls the process_line() callback which can further process that
228 : * line of data.
229 : *
230 : * \todo
231 : * Look into a way, if possible, to have a single instantiation since
232 : * as far as I know this code matches the one written in the
233 : * process_read() of the snap_tcp_client_buffer_connection and
234 : * the snap_pipe_buffer_connection classes (and now snap_fd_buffer_connection).
235 : */
236 0 : void fd_buffer_connection::process_read()
237 : {
238 : // we read one character at a time until we get a '\n'
239 : // since we have a non-blocking socket we can read as
240 : // much as possible and then check for a '\n' and keep
241 : // any extra data in a cache.
242 : //
243 0 : if(get_socket() != -1)
244 : {
245 0 : int count_lines(0);
246 0 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
247 0 : std::vector<char> buffer;
248 0 : buffer.resize(1024);
249 0 : for(;;)
250 : {
251 0 : errno = 0;
252 0 : ssize_t const r(read(&buffer[0], buffer.size()));
253 0 : if(r > 0)
254 : {
255 0 : for(ssize_t position(0); position < r; )
256 : {
257 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
258 0 : if(it == buffer.begin() + r)
259 : {
260 : // no newline, just add the whole thing
261 0 : f_line += std::string(&buffer[position], r - position);
262 0 : break; // do not waste time, we know we are done
263 : }
264 :
265 : // retrieve the characters up to the newline
266 : // character and process the line
267 : //
268 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
269 0 : process_line(f_line);
270 0 : ++count_lines;
271 :
272 : // done with that line
273 : //
274 0 : f_line.clear();
275 :
276 : // we had a newline, we may still have some data
277 : // in that buffer; (+1 to skip the '\n' itself)
278 : //
279 0 : position = it - buffer.begin() + 1;
280 : }
281 :
282 : // when we reach here all the data read in `buffer` is
283 : // now either fully processed or in f_line
284 : //
285 : // TODO: change the way this works so we can test the
286 : // limit after each process_line() call
287 : //
288 0 : if(count_lines >= get_event_limit()
289 0 : || get_current_date() >= date_limit)
290 : {
291 : // we reach one or both limits, stop processing so
292 : // the other events have a chance to run
293 : //
294 0 : break;
295 : }
296 : }
297 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
298 : {
299 : // no more data available at this time
300 : break;
301 : }
302 : else //if(r < 0)
303 : {
304 0 : int const e(errno);
305 : SNAP_LOG_WARNING
306 0 : << "an error occurred while reading from socket (errno: "
307 0 : << e
308 0 : << " -- "
309 0 : << strerror(e)
310 0 : << ").";
311 0 : process_error();
312 0 : return;
313 : }
314 : }
315 : }
316 :
317 : // process next level too
318 0 : fd_connection::process_read();
319 : }
320 :
321 :
322 : /** \brief Write to the connection's socket.
323 : *
324 : * This function implementation writes as much data as possible to the
325 : * connection's socket.
326 : *
327 : * This function calls the process_empty_buffer() callback whenever the
328 : * output buffer goes empty.
329 : */
330 0 : void fd_buffer_connection::process_write()
331 : {
332 0 : if(get_socket() != -1)
333 : {
334 0 : errno = 0;
335 0 : ssize_t const r(fd_connection::write(&f_output[f_position], f_output.size() - f_position));
336 0 : if(r > 0)
337 : {
338 : // some data was written
339 0 : f_position += r;
340 0 : if(f_position >= f_output.size())
341 : {
342 0 : f_output.clear();
343 0 : f_position = 0;
344 0 : process_empty_buffer();
345 : }
346 : }
347 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
348 : {
349 : // connection is considered bad, get rid of it
350 : //
351 0 : int const e(errno);
352 : SNAP_LOG_ERROR
353 0 : << "an error occurred while writing to socket of \""
354 0 : << get_name()
355 0 : << "\" (errno: "
356 0 : << e
357 0 : << " -- "
358 0 : << strerror(e)
359 0 : << ").";
360 0 : process_error();
361 0 : return;
362 : }
363 : }
364 :
365 : // process next level too
366 0 : fd_connection::process_write();
367 : }
368 :
369 :
370 : /** \brief The remote used hanged up.
371 : *
372 : * This function makes sure that the connection gets closed properly.
373 : */
374 0 : void fd_buffer_connection::process_hup()
375 : {
376 : // this connection is dead...
377 : //
378 : //close(); -- we are not currently responsible for closing this FD
379 :
380 0 : fd_connection::process_hup();
381 0 : }
382 :
383 :
384 :
385 6 : } // namespace ed
386 : // vim: ts=4 sw=4 et
|