Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/fd_buffer_connection.h"
46 :
47 : #include "eventdispatcher/utils.h"
48 :
49 :
50 : // snaplogger lib
51 : //
52 : #include <snaplogger/message.h>
53 :
54 :
55 : // C++ lib
56 : //
57 : #include <algorithm>
58 : #include <cstring>
59 :
60 :
61 : // last include
62 : //
63 : #include <snapdev/poison.h>
64 :
65 :
66 :
67 : namespace ed
68 : {
69 :
70 :
71 :
72 : /** \brief Initialize an fd connection with a buffer.
73 : *
74 : * The connection is initialized with the specified fd parameter.
75 : *
76 : * This initialization, so things work as expected in our environment,
77 : * marks the file descriptor as non-blocking. This is important for
78 : * the reader and writer capabilities.
79 : *
80 : * \param[in] fd The file descriptor (often a pipe).
81 : * \param[in] mode The mode describing the file descriptor (i.e. read-only
82 : * write-only, or read/write.)
83 : */
84 0 : fd_buffer_connection::fd_buffer_connection(int fd, mode_t mode)
85 0 : : fd_connection(fd, mode)
86 : {
87 0 : non_blocking();
88 0 : }
89 :
90 :
91 : /** \brief Check whether this file descriptor still has some buffered input.
92 : *
93 : * This function returns true if there is partial incoming data in this
94 : * object's buffer.
95 : *
96 : * \return true if some buffered input is waiting for completion.
97 : */
98 0 : bool fd_buffer_connection::has_input() const
99 : {
100 0 : return !f_line.empty();
101 : }
102 :
103 :
104 :
105 : /** \brief Check whether this file descriptor still has some buffered output.
106 : *
107 : * This function returns true if there is still some output data in the
108 : * output cache buffer. Output is added by the write() function, which is
109 : * called, for example, by the send_message() function.
110 : *
111 : * Note that if the fd was already closed, this function may still return
112 : * true in the event we have some cached data.
113 : *
114 : * \return true if some buffered output is waiting to be sent out.
115 : */
116 0 : bool fd_buffer_connection::has_output() const
117 : {
118 0 : return !f_output.empty();
119 : }
120 :
121 :
122 :
123 : /** \brief Tells that this file descriptor is a writer when we have data.
124 : *
125 : * This function checks to know whether there is output data to be written
126 : * to this file descriptor. If so then the function returns true. Otherwise
127 : * it just returns false.
128 : *
129 : * This happens whenever you called the write() function and the connection
130 : * cache is not empty yet.
131 : *
132 : * Note that if the connection was closed or it not writable (as per the
133 : * fd mode specified when creating this connection) then this function
134 : * returns false.
135 : *
136 : * \return true if there is data to write to the fd, false otherwise.
137 : */
138 0 : bool fd_buffer_connection::is_writer() const
139 : {
140 0 : return !f_output.empty() && fd_connection::is_writer();
141 : }
142 :
143 :
144 : /** \brief Write data to the connection.
145 : *
146 : * This function can be used to send data to this file descriptor.
147 : * The data is bufferized and as soon as the connection file descriptor
148 : * can accept more data it gets written there. In other words, we cannot
149 : * just sleep and wait for an answer. The transfer of the data is therefore
150 : * asynchronous.
151 : *
152 : * \todo
153 : * Determine whether we may end up with really large buffers that
154 : * grow for a long time. This function only inserts and the
155 : * process_signal() function only reads some of the bytes but it
156 : * does not reduce the size of the buffer until all the data was
157 : * sent.
158 : *
159 : * \note
160 : * The function returns -1 and sets errno to EBADF if the file
161 : * descriptor was closed (-1) or if it is not marked as a writer.
162 : *
163 : * \param[in] data The pointer to the buffer of data to be sent.
164 : * \param[out] length The number of bytes to send.
165 : *
166 : * \return The number of bytes written, either -1, 0, or \p length
167 : */
168 0 : ssize_t fd_buffer_connection::write(void const * data, size_t const length)
169 : {
170 : // WARNING: We MUST call the fd_connection version of the is_writer(),
171 : // because the fd_buffer_connection::is_writer() also checks
172 : // the f_output buffer which has unwanted side effects
173 : //
174 0 : if(get_socket() == -1
175 0 : || !fd_connection::is_writer()) // WARNING: see above
176 : {
177 0 : errno = EBADF;
178 0 : return -1;
179 : }
180 :
181 0 : if(data != nullptr
182 0 : && length > 0)
183 : {
184 0 : char const * d(reinterpret_cast<char const *>(data));
185 0 : f_output.insert(f_output.end(), d, d + length);
186 0 : return length;
187 : }
188 :
189 0 : return 0;
190 : }
191 :
192 :
193 : /** \brief Read and process as much data as possible.
194 : *
195 : * This function reads as much incoming data as possible and processes
196 : * it.
197 : *
198 : * If the input includes a newline character ('\n') then this function
199 : * calls the process_line() callback which can further process that
200 : * line of data.
201 : *
202 : * \todo
203 : * Look into a way, if possible, to have a single instantiation since
204 : * as far as I know this code matches the one written in the
205 : * process_read() of the tcp_client_buffer_connection and
206 : * the pipe_buffer_connection classes (and now fd_buffer_connection).
207 : */
208 0 : void fd_buffer_connection::process_read()
209 : {
210 : // we read one character at a time until we get a '\n'
211 : // since we have a non-blocking socket we can read as
212 : // much as possible and then check for a '\n' and keep
213 : // any extra data in a cache.
214 : //
215 0 : if(get_socket() != -1)
216 : {
217 0 : int count_lines(0);
218 0 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
219 0 : std::vector<char> buffer;
220 0 : buffer.resize(1024);
221 : for(;;)
222 : {
223 0 : errno = 0;
224 0 : ssize_t const r(read(&buffer[0], buffer.size()));
225 0 : if(r > 0)
226 : {
227 0 : for(ssize_t position(0); position < r; )
228 : {
229 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
230 0 : if(it == buffer.begin() + r)
231 : {
232 : // no newline, just add the whole thing
233 0 : f_line += std::string(&buffer[position], r - position);
234 0 : break; // do not waste time, we know we are done
235 : }
236 :
237 : // retrieve the characters up to the newline
238 : // character and process the line
239 : //
240 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
241 0 : process_line(f_line);
242 0 : ++count_lines;
243 :
244 : // done with that line
245 : //
246 0 : f_line.clear();
247 :
248 : // we had a newline, we may still have some data
249 : // in that buffer; (+1 to skip the '\n' itself)
250 : //
251 0 : position = it - buffer.begin() + 1;
252 : }
253 :
254 : // when we reach here all the data read in `buffer` is
255 : // now either fully processed or in f_line
256 : //
257 : // TODO: change the way this works so we can test the
258 : // limit after each process_line() call
259 : //
260 0 : if(count_lines >= get_event_limit()
261 0 : || get_current_date() >= date_limit)
262 : {
263 : // we reach one or both limits, stop processing so
264 : // the other events have a chance to run
265 : //
266 0 : break;
267 : }
268 : }
269 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
270 : {
271 : // no more data available at this time
272 : break;
273 : }
274 : else //if(r < 0)
275 : {
276 0 : int const e(errno);
277 0 : SNAP_LOG_WARNING
278 0 : << "an error occurred while reading from socket (errno: "
279 : << e
280 : << " -- "
281 0 : << strerror(e)
282 : << ")."
283 : << SNAP_LOG_SEND;
284 0 : process_error();
285 0 : return;
286 : }
287 0 : }
288 : }
289 :
290 : // process next level too
291 0 : fd_connection::process_read();
292 : }
293 :
294 :
295 : /** \brief Write to the connection's socket.
296 : *
297 : * This function implementation writes as much data as possible to the
298 : * connection's socket.
299 : *
300 : * This function calls the process_empty_buffer() callback whenever the
301 : * output buffer goes empty.
302 : */
303 0 : void fd_buffer_connection::process_write()
304 : {
305 0 : if(get_socket() != -1)
306 : {
307 0 : errno = 0;
308 0 : ssize_t const r(fd_connection::write(&f_output[f_position], f_output.size() - f_position));
309 0 : if(r > 0)
310 : {
311 : // some data was written
312 0 : f_position += r;
313 0 : if(f_position >= f_output.size())
314 : {
315 0 : f_output.clear();
316 0 : f_position = 0;
317 0 : process_empty_buffer();
318 : }
319 : }
320 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
321 : {
322 : // connection is considered bad, get rid of it
323 : //
324 0 : int const e(errno);
325 0 : SNAP_LOG_ERROR
326 0 : << "an error occurred while writing to socket of \""
327 0 : << get_name()
328 0 : << "\" (errno: "
329 : << e
330 : << " -- "
331 0 : << strerror(e)
332 : << ")."
333 : << SNAP_LOG_SEND;
334 0 : process_error();
335 0 : return;
336 : }
337 : }
338 :
339 : // process next level too
340 0 : fd_connection::process_write();
341 : }
342 :
343 :
344 : /** \brief The remote used hanged up.
345 : *
346 : * This function makes sure that the connection gets closed properly.
347 : */
348 0 : void fd_buffer_connection::process_hup()
349 : {
350 : // this connection is dead...
351 : //
352 : //close(); -- we are not currently responsible for closing this FD
353 :
354 0 : fd_connection::process_hup();
355 0 : }
356 :
357 :
358 :
359 6 : } // namespace ed
360 : // vim: ts=4 sw=4 et
|