Line data Source code
1 : // Copyright (c) 2012-2022 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 : // self
43 : //
44 : #include "eventdispatcher/local_stream_server_client_buffer_connection.h"
45 :
46 : #include "eventdispatcher/utils.h"
47 :
48 :
49 : // snaplogger lib
50 : //
51 : #include <snaplogger/message.h>
52 :
53 :
54 : // C++ lib
55 : //
56 : #include <algorithm>
57 : #include <cstring>
58 :
59 :
60 : // last include
61 : //
62 : #include <snapdev/poison.h>
63 :
64 :
65 :
66 : namespace ed
67 : {
68 :
69 :
70 :
71 : /** \brief Initialize a client socket.
72 : *
73 : * The client socket gets initialized with the specified 'socket'
74 : * parameter.
75 : *
76 : * If you are a pure client (opposed to a client that was just accepted)
77 : * you may want to consider using the local_stream_client_buffer_connection
78 : * instead. That gives you a way to open the socket from a set of address
79 : * and port definitions among other things.
80 : *
81 : * This initialization, so things work as expected in our environment,
82 : * the function marks the socket as non-blocking. This is important for
83 : * the reader and writer capabilities.
84 : *
85 : * \param[in] client The client to be used for reading and writing.
86 : */
87 1 : local_stream_server_client_buffer_connection::local_stream_server_client_buffer_connection(snapdev::raii_fd_t client)
88 1 : : local_stream_server_client_connection(std::move(client))
89 : {
90 1 : non_blocking();
91 1 : }
92 :
93 :
94 : /** \brief Check whether this connection still has some input in its buffer.
95 : *
96 : * This function returns true if there is partial incoming data in this
97 : * object's buffer.
98 : *
99 : * \return true if some buffered input is waiting for completion.
100 : */
101 0 : bool local_stream_server_client_buffer_connection::has_input() const
102 : {
103 0 : return !f_line.empty();
104 : }
105 :
106 :
107 :
108 : /** \brief Check whether this connection still has some output in its buffer.
109 : *
110 : * This function returns true if there is still some output in the client
111 : * buffer. Output is added by the write() function, which is called by
112 : * the send_message() function.
113 : *
114 : * \return true if some buffered output is waiting to be sent out.
115 : */
116 0 : bool local_stream_server_client_buffer_connection::has_output() const
117 : {
118 0 : return !f_output.empty();
119 : }
120 :
121 :
122 :
123 : /** \brief Tells that this connection is a writer when we have data to write.
124 : *
125 : * This function checks to know whether there is data to be written to
126 : * this connection socket. If so then the function returns true. Otherwise
127 : * it just returns false.
128 : *
129 : * This happens whenever you called the write() function and our cache
130 : * is not empty yet.
131 : *
132 : * \return true if there is data to write to the socket, false otherwise.
133 : */
134 7 : bool local_stream_server_client_buffer_connection::is_writer() const
135 : {
136 7 : return get_socket() != -1 && !f_output.empty();
137 : }
138 :
139 :
140 : /** \brief Write data to the connection.
141 : *
142 : * This function can be used to send data to this TCP/IP connection.
143 : * The data is bufferized and as soon as the connection can WRITE
144 : * to the socket, it will wake up and send the data. In other words,
145 : * we cannot just sleep and wait for an answer. The transfer will
146 : * be asynchronous.
147 : *
148 : * \todo
149 : * Determine whether we may end up with really large buffers that
150 : * grow for a long time. This function only inserts and the
151 : * process_signal() function only reads some of the bytes but it
152 : * does not reduce the size of the buffer until all the data was
153 : * sent.
154 : *
155 : * \param[in] data The pointer to the buffer of data to be sent.
156 : * \param[out] length The number of bytes to send.
157 : *
158 : * \return The number of bytes written or -1 if the connection is closed.
159 : */
160 1 : ssize_t local_stream_server_client_buffer_connection::write(void const * data, size_t const length)
161 : {
162 1 : if(get_socket() == -1)
163 : {
164 0 : errno = EBADF;
165 0 : return -1;
166 : }
167 :
168 1 : if(data != nullptr && length > 0)
169 : {
170 1 : char const * d(reinterpret_cast<char const *>(data));
171 1 : f_output.insert(f_output.end(), d, d + length);
172 1 : return length;
173 : }
174 :
175 0 : return 0;
176 : }
177 :
178 :
179 : /** \brief Read and process as much data as possible.
180 : *
181 : * This function reads as much incoming data as possible and processes
182 : * it.
183 : *
184 : * If the input includes a newline character ('\n') then this function
185 : * calls the process_line() callback which can further process that
186 : * line of data.
187 : *
188 : * \todo
189 : * Look into a way, if possible, to have a single instantiation since
190 : * as far as I know this code matches the one written in the
191 : * process_read() of the local_stream_client_buffer_connection and
192 : * the pipe_buffer_connection classes.
193 : */
194 2 : void local_stream_server_client_buffer_connection::process_read()
195 : {
196 : // we read one character at a time until we get a '\n'
197 : // since we have a non-blocking socket we can read as
198 : // much as possible and then check for a '\n' and keep
199 : // any extra data in a cache.
200 : //
201 2 : if(get_socket() != -1)
202 : {
203 2 : int count_lines(0);
204 2 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
205 4 : std::vector<char> buffer;
206 2 : buffer.resize(1024);
207 : for(;;)
208 : {
209 4 : errno = 0;
210 4 : ssize_t const r(read(&buffer[0], buffer.size()));
211 4 : if(r > 0)
212 : {
213 4 : for(ssize_t position(0); position < r; )
214 : {
215 2 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
216 2 : if(it == buffer.begin() + r)
217 : {
218 : // no newline, just add the whole thing
219 0 : f_line += std::string(&buffer[position], r - position);
220 0 : break; // do not waste time, we know we are done
221 : }
222 :
223 : // retrieve the characters up to the newline
224 : // character and process the line
225 : //
226 2 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
227 2 : process_line(f_line);
228 2 : ++count_lines;
229 :
230 : // done with that line
231 : //
232 2 : f_line.clear();
233 :
234 : // we had a newline, we may still have some data
235 : // in that buffer; (+1 to skip the '\n' itself)
236 : //
237 2 : position = it - buffer.begin() + 1;
238 : }
239 :
240 : // when we reach here all the data read in `buffer` is
241 : // now either fully processed or in f_line
242 : //
243 : // TODO: change the way this works so we can test the
244 : // limit after each process_line() call
245 : //
246 4 : if(count_lines >= get_event_limit()
247 2 : || get_current_date() >= date_limit)
248 : {
249 : // we reach one or both limits, stop processing so
250 : // the other events have a chance to run
251 : //
252 0 : break;
253 : }
254 : }
255 2 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
256 : {
257 : // no more data available at this time
258 : break;
259 : }
260 : else //if(r < 0)
261 : {
262 0 : int const e(errno);
263 0 : SNAP_LOG_WARNING
264 0 : << "an error occurred while reading from socket (errno: "
265 : << e
266 : << " -- "
267 0 : << strerror(e)
268 : << ")."
269 : << SNAP_LOG_SEND;
270 0 : process_error();
271 0 : return;
272 : }
273 2 : }
274 : }
275 :
276 : // process next level too
277 2 : local_stream_server_client_connection::process_read();
278 : }
279 :
280 :
281 : /** \brief Write to the connection's socket.
282 : *
283 : * This function implementation writes as much data as possible to the
284 : * connection's socket.
285 : *
286 : * This function calls the process_empty_buffer() callback whenever the
287 : * output buffer goes empty.
288 : */
289 1 : void local_stream_server_client_buffer_connection::process_write()
290 : {
291 1 : if(get_socket() != -1)
292 : {
293 1 : errno = 0;
294 1 : ssize_t const r(local_stream_server_client_connection::write(&f_output[f_position], f_output.size() - f_position));
295 1 : if(r > 0)
296 : {
297 : // some data was written
298 1 : f_position += r;
299 1 : if(f_position >= f_output.size())
300 : {
301 1 : f_output.clear();
302 1 : f_position = 0;
303 1 : process_empty_buffer();
304 : }
305 : }
306 0 : else if(r != 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
307 : {
308 : // connection is considered bad, get rid of it
309 : //
310 0 : int const e(errno);
311 0 : SNAP_LOG_ERROR
312 : << "an error occurred while writing to socket of \""
313 0 : << get_name()
314 0 : << "\" (errno: "
315 : << e
316 : << " -- "
317 0 : << strerror(e)
318 : << ")."
319 : << SNAP_LOG_SEND;
320 0 : process_error();
321 0 : return;
322 : }
323 : }
324 :
325 : // process next level too
326 1 : local_stream_server_client_connection::process_write();
327 : }
328 :
329 :
330 : /** \brief The remote hanged up.
331 : *
332 : * This function makes sure that the local connection gets closed properly.
333 : */
334 0 : void local_stream_server_client_buffer_connection::process_hup()
335 : {
336 : // this connection is dead...
337 : //
338 0 : close();
339 :
340 0 : local_stream_server_client_connection::process_hup();
341 0 : }
342 :
343 :
344 :
345 6 : } // namespace ed
346 : // vim: ts=4 sw=4 et
|