Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/pipe_buffer_connection.h"
46 :
47 : #include "eventdispatcher/utils.h"
48 :
49 :
50 : // snaplogger lib
51 : //
52 : #include <snaplogger/message.h>
53 :
54 :
55 : // C++ lib
56 : //
57 : #include <algorithm>
58 : #include <cstring>
59 :
60 :
61 : // last include
62 : //
63 : #include <snapdev/poison.h>
64 :
65 :
66 :
67 : namespace ed
68 : {
69 :
70 :
71 :
72 : /** \brief Pipe connections accept writes.
73 : *
74 : * This function returns true when there is some data in the pipe
75 : * connection buffer meaning that the pipe connection needs to
76 : * send data to the other side of the pipe.
77 : *
78 : * \return true if some data has to be written to the pipe.
79 : */
80 0 : bool pipe_buffer_connection::is_writer() const
81 : {
82 0 : return get_socket() != -1 && !f_output.empty();
83 : }
84 :
85 :
86 : /** \brief Write the specified data to the pipe buffer.
87 : *
88 : * This function writes the data specified by \p data to the pipe buffer.
89 : * Note that the data is not sent immediately. This will only happen
90 : * when the Snap Communicator loop is re-entered.
91 : *
92 : * \param[in] data The pointer to the data to write to the pipe.
93 : * \param[in] length The size of the data buffer.
94 : *
95 : * \return The number of bytes written. The function returns 0 when no
96 : * data can be written to that connection (i.e. it was already
97 : * closed or data is a null pointer.)
98 : */
99 0 : ssize_t pipe_buffer_connection::write(void const * data, size_t length)
100 : {
101 0 : if(get_socket() == -1)
102 : {
103 0 : errno = EBADF;
104 0 : return -1;
105 : }
106 :
107 0 : if(data != nullptr && length > 0)
108 : {
109 0 : char const * d(reinterpret_cast<char const *>(data));
110 0 : f_output.insert(f_output.end(), d, d + length);
111 0 : return length;
112 : }
113 :
114 0 : return 0;
115 : }
116 :
117 :
118 : /** \brief Read data that was received on this pipe.
119 : *
120 : * This function is used to read data whenever the process on
121 : * the other side sent us a message.
122 : */
123 0 : void pipe_buffer_connection::process_read()
124 : {
125 0 : if(get_socket() != -1)
126 : {
127 : // we could read one character at a time until we get a '\n'
128 : // but since we have a non-blocking socket we can read as
129 : // much as possible and then check for a '\n' and keep
130 : // any extra data in a cache.
131 : //
132 0 : int count_lines(0);
133 0 : int64_t const date_limit(get_current_date() + get_processing_time_limit());
134 0 : std::vector<char> buffer;
135 0 : buffer.resize(1024);
136 : for(;;)
137 : {
138 0 : errno = 0;
139 0 : ssize_t const r(read(&buffer[0], buffer.size()));
140 0 : if(r > 0)
141 : {
142 0 : for(ssize_t position(0); position < r; )
143 : {
144 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
145 0 : if(it == buffer.begin() + r)
146 : {
147 : // no newline, just add the whole thing
148 0 : f_line += std::string(&buffer[position], r - position);
149 0 : break; // do not waste time, we know we are done
150 : }
151 :
152 : // retrieve the characters up to the newline
153 : // character and process the line
154 : //
155 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
156 0 : process_line(f_line);
157 0 : ++count_lines;
158 :
159 : // done with that line
160 : //
161 0 : f_line.clear();
162 :
163 : // we had a newline, we may still have some data
164 : // in that buffer; (+1 to skip the '\n' itself)
165 : //
166 0 : position = it - buffer.begin() + 1;
167 : }
168 :
169 : // when we reach here all the data read in `buffer` is
170 : // now either fully processed or in f_line
171 : //
172 : // TODO: change the way this works so we can test the
173 : // limit after each process_line() call
174 : //
175 0 : if(count_lines >= get_event_limit()
176 0 : || get_current_date() >= date_limit)
177 : {
178 : // we reach one or both limits, stop processing so
179 : // the other events have a chance to run
180 : //
181 0 : break;
182 : }
183 : }
184 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
185 : {
186 : // no more data available at this time
187 : //
188 : break;
189 : }
190 : else //if(r < 0)
191 : {
192 : // this happens all the time (i.e. another process quits)
193 : // so we make it a debug and not a warning or an error...
194 : //
195 0 : int const e(errno);
196 0 : SNAP_LOG_DEBUG
197 0 : << "an error occurred while reading from socket (errno: "
198 : << e
199 : << " -- "
200 0 : << strerror(e)
201 : << ")."
202 : << SNAP_LOG_SEND;
203 0 : process_error();
204 0 : return;
205 : }
206 0 : }
207 : }
208 : //else -- TBD: should we at least log an error when process_read() is called without a valid socket?
209 :
210 : // process the next level
211 0 : pipe_connection::process_read();
212 : }
213 :
214 :
215 : /** \brief Write as much data as we can to the pipe.
216 : *
217 : * This function writes the data that was cached in our f_output
218 : * buffer to the pipe, as much as possible, then it returns.
219 : *
220 : * The is_writer() function takes care of returning true if more
221 : * data is present in the f_output buffer and thus the process_write()
222 : * needs to be called again.
223 : *
224 : * Once the write buffer goes empty, this function calls the
225 : * process_empty_buffer() callback.
226 : */
227 0 : void pipe_buffer_connection::process_write()
228 : {
229 0 : if(get_socket() != -1)
230 : {
231 0 : errno = 0;
232 0 : ssize_t const r(pipe_connection::write(&f_output[f_position], f_output.size() - f_position));
233 0 : if(r > 0)
234 : {
235 : // some data was written
236 0 : f_position += r;
237 0 : if(f_position >= f_output.size())
238 : {
239 0 : f_output.clear();
240 0 : f_position = 0;
241 0 : process_empty_buffer();
242 : }
243 : }
244 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
245 : {
246 : // connection is considered bad, get rid of it
247 : //
248 0 : int const e(errno);
249 0 : SNAP_LOG_ERROR
250 0 : << "an error occurred while writing to socket of \""
251 0 : << get_name()
252 0 : << "\" (errno: "
253 : << e
254 : << " -- "
255 0 : << strerror(e)
256 : << ")."
257 : << SNAP_LOG_SEND;
258 0 : process_error();
259 0 : return;
260 : }
261 : }
262 : //else -- TBD: should we generate an error when the socket is not valid?
263 :
264 : // process next level too
265 0 : pipe_connection::process_write();
266 : }
267 :
268 :
269 : /** \brief The process received a hanged up pipe.
270 : *
271 : * The pipe on the other end was closed, somehow.
272 : */
273 0 : void pipe_buffer_connection::process_hup()
274 : {
275 0 : close();
276 :
277 0 : pipe_connection::process_hup();
278 0 : }
279 :
280 :
281 :
282 6 : } // namespace ed
283 : // vim: ts=4 sw=4 et
|