Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 :
40 : // self
41 : //
42 : #include "eventdispatcher/pipe_buffer_connection.h"
43 :
44 : #include "eventdispatcher/utils.h"
45 :
46 :
47 : // snaplogger lib
48 : //
49 : #include "snaplogger/message.h"
50 :
51 :
52 : //// snapdev lib
53 : ////
54 : //#include "snapdev/not_reached.h"
55 : //#include "snapdev/not_used.h"
56 : //#include "snapdev/string_replace_many.h"
57 : //
58 : //
59 : //// libaddr lib
60 : ////
61 : //#include "libaddr/addr_parser.h"
62 : //
63 : //
64 : // C++ lib
65 : //
66 : #include <algorithm>
67 : //#include <limits>
68 : //#include <atomic>
69 : //
70 : //
71 : //// C lib
72 : ////
73 : //#include <fcntl.h>
74 : //#include <poll.h>
75 : //#include <unistd.h>
76 : //#include <sys/eventfd.h>
77 : //#include <sys/inotify.h>
78 : //#include <sys/ioctl.h>
79 : //#include <sys/resource.h>
80 : //#include <sys/syscall.h>
81 : //#include <sys/time.h>
82 : //
83 : //
84 : //// last include
85 : ////
86 : //#include <snapdev/poison.h>
87 :
88 :
89 :
90 :
91 : namespace ed
92 : {
93 :
94 :
95 :
96 : /** \brief Pipe connections accept writes.
97 : *
98 : * This function returns true when there is some data in the pipe
99 : * connection buffer meaning that the pipe connection needs to
100 : * send data to the other side of the pipe.
101 : *
102 : * \return true if some data has to be written to the pipe.
103 : */
104 0 : bool pipe_buffer_connection::is_writer() const
105 : {
106 0 : return get_socket() != -1 && !f_output.empty();
107 : }
108 :
109 :
110 : /** \brief Write the specified data to the pipe buffer.
111 : *
112 : * This function writes the data specified by \p data to the pipe buffer.
113 : * Note that the data is not sent immediately. This will only happen
114 : * when the Snap Communicator loop is re-entered.
115 : *
116 : * \param[in] data The pointer to the data to write to the pipe.
117 : * \param[in] length The size of the data buffer.
118 : *
119 : * \return The number of bytes written. The function returns 0 when no
120 : * data can be written to that connection (i.e. it was already
121 : * closed or data is a null pointer.)
122 : */
123 0 : ssize_t pipe_buffer_connection::write(void const * data, size_t length)
124 : {
125 0 : if(get_socket() == -1)
126 : {
127 0 : errno = EBADF;
128 0 : return -1;
129 : }
130 :
131 0 : if(data != nullptr && length > 0)
132 : {
133 0 : char const * d(reinterpret_cast<char const *>(data));
134 0 : f_output.insert(f_output.end(), d, d + length);
135 0 : return length;
136 : }
137 :
138 0 : return 0;
139 : }
140 :
141 :
142 : /** \brief Read data that was received on this pipe.
143 : *
144 : * This function is used to read data whenever the process on
145 : * the other side sent us a message.
146 : */
147 0 : void pipe_buffer_connection::process_read()
148 : {
149 0 : if(get_socket() != -1)
150 : {
151 : // we could read one character at a time until we get a '\n'
152 : // but since we have a non-blocking socket we can read as
153 : // much as possible and then check for a '\n' and keep
154 : // any extra data in a cache.
155 : //
156 0 : int count_lines(0);
157 0 : int64_t const date_limit(get_current_date() + get_processing_time_limit());
158 0 : std::vector<char> buffer;
159 0 : buffer.resize(1024);
160 0 : for(;;)
161 : {
162 0 : errno = 0;
163 0 : ssize_t const r(read(&buffer[0], buffer.size()));
164 0 : if(r > 0)
165 : {
166 0 : for(ssize_t position(0); position < r; )
167 : {
168 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
169 0 : if(it == buffer.begin() + r)
170 : {
171 : // no newline, just add the whole thing
172 0 : f_line += std::string(&buffer[position], r - position);
173 0 : break; // do not waste time, we know we are done
174 : }
175 :
176 : // retrieve the characters up to the newline
177 : // character and process the line
178 : //
179 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
180 0 : process_line(f_line);
181 0 : ++count_lines;
182 :
183 : // done with that line
184 : //
185 0 : f_line.clear();
186 :
187 : // we had a newline, we may still have some data
188 : // in that buffer; (+1 to skip the '\n' itself)
189 : //
190 0 : position = it - buffer.begin() + 1;
191 : }
192 :
193 : // when we reach here all the data read in `buffer` is
194 : // now either fully processed or in f_line
195 : //
196 : // TODO: change the way this works so we can test the
197 : // limit after each process_line() call
198 : //
199 0 : if(count_lines >= get_event_limit()
200 0 : || get_current_date() >= date_limit)
201 : {
202 : // we reach one or both limits, stop processing so
203 : // the other events have a chance to run
204 : //
205 0 : break;
206 : }
207 : }
208 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
209 : {
210 : // no more data available at this time
211 : //
212 : break;
213 : }
214 : else //if(r < 0)
215 : {
216 : // this happens all the time (i.e. another process quits)
217 : // so we make it a debug and not a warning or an error...
218 : //
219 0 : int const e(errno);
220 : SNAP_LOG_DEBUG
221 0 : << "an error occurred while reading from socket (errno: "
222 0 : << e
223 0 : << " -- "
224 0 : << strerror(e)
225 0 : << ").";
226 0 : process_error();
227 0 : return;
228 : }
229 : }
230 : }
231 : //else -- TBD: should we at least log an error when process_read() is called without a valid socket?
232 :
233 : // process the next level
234 0 : pipe_connection::process_read();
235 : }
236 :
237 :
238 : /** \brief Write as much data as we can to the pipe.
239 : *
240 : * This function writes the data that was cached in our f_output
241 : * buffer to the pipe, as much as possible, then it returns.
242 : *
243 : * The is_writer() function takes care of returning true if more
244 : * data is present in the f_output buffer and thus the process_write()
245 : * needs to be called again.
246 : *
247 : * Once the write buffer goes empty, this function calls the
248 : * process_empty_buffer() callback.
249 : */
250 0 : void pipe_buffer_connection::process_write()
251 : {
252 0 : if(get_socket() != -1)
253 : {
254 0 : errno = 0;
255 0 : ssize_t const r(pipe_connection::write(&f_output[f_position], f_output.size() - f_position));
256 0 : if(r > 0)
257 : {
258 : // some data was written
259 0 : f_position += r;
260 0 : if(f_position >= f_output.size())
261 : {
262 0 : f_output.clear();
263 0 : f_position = 0;
264 0 : process_empty_buffer();
265 : }
266 : }
267 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
268 : {
269 : // connection is considered bad, get rid of it
270 : //
271 0 : int const e(errno);
272 : SNAP_LOG_ERROR
273 0 : << "an error occurred while writing to socket of \""
274 0 : << get_name()
275 0 : << "\" (errno: "
276 0 : << e
277 0 : << " -- "
278 0 : << strerror(e)
279 0 : << ").";
280 0 : process_error();
281 0 : return;
282 : }
283 : }
284 : //else -- TBD: should we generate an error when the socket is not valid?
285 :
286 : // process next level too
287 0 : pipe_connection::process_write();
288 : }
289 :
290 :
291 : /** \brief The process received a hanged up pipe.
292 : *
293 : * The pipe on the other end was closed, somehow.
294 : */
295 0 : void pipe_buffer_connection::process_hup()
296 : {
297 0 : close();
298 :
299 0 : pipe_connection::process_hup();
300 0 : }
301 :
302 :
303 :
304 6 : } // namespace ed
305 : // vim: ts=4 sw=4 et
|