Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/pipe_buffer_connection.h"
46 :
47 : #include "eventdispatcher/utils.h"
48 :
49 :
50 : // snaplogger lib
51 : //
52 : #include <snaplogger/message.h>
53 :
54 :
55 : // C++ lib
56 : //
57 : #include <algorithm>
58 : #include <cstring>
59 :
60 :
61 : // last include
62 : //
63 : #include <snapdev/poison.h>
64 :
65 :
66 :
67 : namespace ed
68 : {
69 :
70 :
71 :
72 : /** \brief Create a pipe with a buffer.
73 : *
74 : * This function creates a pipe with a buffer which can then be used to
75 : * manage lines of data. This version of the pipe forcibly uses the
76 : * bidirectional pipe. This is so because the buffered and message
77 : * classes are expected to be able to send and receive. The input
78 : * or ouptut only pipes are expected to be used as replacements to the
79 : * stdin, stdout, and stderr streams which are unidirectional.
80 : */
81 0 : pipe_buffer_connection::pipe_buffer_connection()
82 0 : : pipe_connection(pipe_t::PIPE_BIDIRECTIONAL)
83 : {
84 0 : }
85 :
86 :
87 : /** \brief Pipe connections accept writes.
88 : *
89 : * This function returns true when there is some data in the pipe
90 : * connection buffer meaning that the pipe connection needs to
91 : * send data to the other side of the pipe.
92 : *
93 : * \return true if some data has to be written to the pipe.
94 : */
95 0 : bool pipe_buffer_connection::is_writer() const
96 : {
97 0 : return get_socket() != -1 && !f_output.empty();
98 : }
99 :
100 :
101 : /** \brief Write the specified data to the pipe buffer.
102 : *
103 : * This function writes the data specified by \p data to the pipe buffer.
104 : * Note that the data is not sent immediately. This will only happen
105 : * when the Snap Communicator loop is re-entered.
106 : *
107 : * \param[in] data The pointer to the data to write to the pipe.
108 : * \param[in] length The size of the data buffer.
109 : *
110 : * \return The number of bytes written. The function returns 0 when no
111 : * data can be written to that connection (i.e. it was already
112 : * closed or data is a null pointer.)
113 : */
114 0 : ssize_t pipe_buffer_connection::write(void const * data, size_t length)
115 : {
116 0 : if(get_socket() == -1)
117 : {
118 0 : errno = EBADF;
119 0 : return -1;
120 : }
121 :
122 0 : if(data != nullptr && length > 0)
123 : {
124 0 : char const * d(reinterpret_cast<char const *>(data));
125 0 : f_output.insert(f_output.end(), d, d + length);
126 0 : return length;
127 : }
128 :
129 0 : return 0;
130 : }
131 :
132 :
133 : /** \brief Read data that was received on this pipe.
134 : *
135 : * This function is used to read data whenever the process on
136 : * the other side sent us a message.
137 : */
138 0 : void pipe_buffer_connection::process_read()
139 : {
140 0 : if(get_socket() != -1)
141 : {
142 : // we could read one character at a time until we get a '\n'
143 : // but since we have a non-blocking socket we can read as
144 : // much as possible and then check for a '\n' and keep
145 : // any extra data in a cache.
146 : //
147 0 : int count_lines(0);
148 0 : int64_t const date_limit(get_current_date() + get_processing_time_limit());
149 0 : std::vector<char> buffer;
150 0 : buffer.resize(1024);
151 : for(;;)
152 : {
153 0 : errno = 0;
154 0 : ssize_t const r(read(&buffer[0], buffer.size()));
155 0 : if(r > 0)
156 : {
157 0 : for(ssize_t position(0); position < r; )
158 : {
159 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
160 0 : if(it == buffer.begin() + r)
161 : {
162 : // no newline, just add the whole thing
163 0 : f_line += std::string(&buffer[position], r - position);
164 0 : break; // do not waste time, we know we are done
165 : }
166 :
167 : // retrieve the characters up to the newline
168 : // character and process the line
169 : //
170 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
171 0 : process_line(f_line);
172 0 : ++count_lines;
173 :
174 : // done with that line
175 : //
176 0 : f_line.clear();
177 :
178 : // we had a newline, we may still have some data
179 : // in that buffer; (+1 to skip the '\n' itself)
180 : //
181 0 : position = it - buffer.begin() + 1;
182 : }
183 :
184 : // when we reach here all the data read in `buffer` is
185 : // now either fully processed or in f_line
186 : //
187 : // TODO: change the way this works so we can test the
188 : // limit after each process_line() call
189 : //
190 0 : if(count_lines >= get_event_limit()
191 0 : || get_current_date() >= date_limit)
192 : {
193 : // we reach one or both limits, stop processing so
194 : // the other events have a chance to run
195 : //
196 0 : break;
197 : }
198 : }
199 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
200 : {
201 : // no more data available at this time
202 : //
203 : break;
204 : }
205 : else //if(r < 0)
206 : {
207 : // this happens all the time (i.e. another process quits)
208 : // so we make it a debug and not a warning or an error...
209 : //
210 0 : int const e(errno);
211 0 : SNAP_LOG_DEBUG
212 0 : << "an error occurred while reading from socket (errno: "
213 : << e
214 : << " -- "
215 0 : << strerror(e)
216 : << ")."
217 : << SNAP_LOG_SEND;
218 0 : process_error();
219 0 : return;
220 : }
221 0 : }
222 : }
223 : //else -- TBD: should we at least log an error when process_read() is called without a valid socket?
224 :
225 : // process the next level
226 0 : pipe_connection::process_read();
227 : }
228 :
229 :
230 : /** \brief Write as much data as we can to the pipe.
231 : *
232 : * This function writes the data that was cached in our f_output
233 : * buffer to the pipe, as much as possible, then it returns.
234 : *
235 : * The is_writer() function takes care of returning true if more
236 : * data is present in the f_output buffer and thus the process_write()
237 : * needs to be called again.
238 : *
239 : * Once the write buffer goes empty, this function calls the
240 : * process_empty_buffer() callback.
241 : */
242 0 : void pipe_buffer_connection::process_write()
243 : {
244 0 : if(get_socket() != -1)
245 : {
246 0 : errno = 0;
247 0 : ssize_t const r(pipe_connection::write(&f_output[f_position], f_output.size() - f_position));
248 0 : if(r > 0)
249 : {
250 : // some data was written
251 0 : f_position += r;
252 0 : if(f_position >= f_output.size())
253 : {
254 0 : f_output.clear();
255 0 : f_position = 0;
256 0 : process_empty_buffer();
257 : }
258 : }
259 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
260 : {
261 : // connection is considered bad, get rid of it
262 : //
263 0 : int const e(errno);
264 0 : SNAP_LOG_ERROR
265 0 : << "an error occurred while writing to socket of \""
266 0 : << get_name()
267 0 : << "\" (errno: "
268 : << e
269 : << " -- "
270 0 : << strerror(e)
271 : << ")."
272 : << SNAP_LOG_SEND;
273 0 : process_error();
274 0 : return;
275 : }
276 : }
277 : //else -- TBD: should we generate an error when the socket is not valid?
278 :
279 : // process next level too
280 0 : pipe_connection::process_write();
281 : }
282 :
283 :
284 : /** \brief The process received a hanged up pipe.
285 : *
286 : * The pipe on the other end was closed, somehow.
287 : */
288 0 : void pipe_buffer_connection::process_hup()
289 : {
290 0 : close();
291 :
292 0 : pipe_connection::process_hup();
293 0 : }
294 :
295 :
296 :
297 6 : } // namespace ed
298 : // vim: ts=4 sw=4 et
|