Line data Source code
1 : // Copyright (c) 2012-2019 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // This program is free software; you can redistribute it and/or modify
4 : // it under the terms of the GNU General Public License as published by
5 : // the Free Software Foundation; either version 2 of the License, or
6 : // (at your option) any later version.
7 : //
8 : // This program is distributed in the hope that it will be useful,
9 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 : // GNU General Public License for more details.
12 : //
13 : // You should have received a copy of the GNU General Public License
14 : // along with this program; if not, write to the Free Software
15 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 :
17 : /** \file
18 : * \brief Implementation of the Snap Communicator class.
19 : *
20 : * This class wraps the C poll() interface in a C++ object with many types
21 : * of objects:
22 : *
23 : * \li Server Connections; for software that want to offer a port to
24 : * which clients can connect to; the server will call accept()
25 : * once a new client connection is ready; this results in a
26 : * Server/Client connection object
27 : * \li Client Connections; for software that want to connect to
28 : * a server; these expect the IP address and port to connect to
29 : * \li Server/Client Connections; for the server when it accepts a new
30 : * connection; in this case the server gets a socket from accept()
31 : * and creates one of these objects to handle the connection
32 : *
33 : * Using the poll() function is the easiest and allows us to listen
34 : * on pretty much any number of sockets (on my server it is limited
35 : * at 16,768 and frankly over 1,000 we probably will start to have
36 : * real slowness issues on small VPN servers.)
37 : */
38 :
39 : //// to get the POLLRDHUP definition
40 : //#ifndef _GNU_SOURCE
41 : //#define _GNU_SOURCE
42 : //#endif
43 :
44 :
45 : // self
46 : //
47 : #include "eventdispatcher/tcp_client_buffer_connection.h"
48 :
49 : #include "eventdispatcher/utils.h"
50 :
51 :
52 : // snaplogger lib
53 : //
54 : #include "snaplogger/message.h"
55 :
56 :
57 : //// snapdev lib
58 : ////
59 : //#include "snapdev/not_reached.h"
60 : //#include "snapdev/not_used.h"
61 : //#include "snapdev/string_replace_many.h"
62 : //
63 : //
64 : //// libaddr lib
65 : ////
66 : //#include "libaddr/addr_parser.h"
67 : //
68 : //
69 : //// C++ lib
70 : ////
71 : //#include <sstream>
72 : //#include <limits>
73 : //#include <atomic>
74 : //
75 : //
76 : //// C lib
77 : ////
78 : //#include <fcntl.h>
79 : //#include <poll.h>
80 : //#include <unistd.h>
81 : //#include <sys/eventfd.h>
82 : //#include <sys/inotify.h>
83 : //#include <sys/ioctl.h>
84 : //#include <sys/resource.h>
85 : //#include <sys/syscall.h>
86 : //#include <sys/time.h>
87 :
88 :
89 : // last include
90 : //
91 : #include <snapdev/poison.h>
92 :
93 :
94 :
95 : namespace ed
96 : {
97 :
98 :
99 :
100 : /** \brief Initialize a client socket.
101 : *
102 : * The client socket gets initialized with the specified 'socket'
103 : * parameter.
104 : *
105 : * This constructor creates a writer connection too. This gives you
106 : * a read/write connection. You can get the writer with the writer()
107 : * function. So you may write data with:
108 : *
109 : * \code
110 : * my_reader.writer().write(buf, buf_size);
111 : * \endcode
112 : *
113 : * \param[in] addr The address to connect to.
114 : * \param[in] port The port to connect to.
115 : * \param[in] mode The mode to connect as (PLAIN or SECURE).
116 : * \param[in] blocking If true, keep a blocking socket, other non-blocking.
117 : */
118 0 : tcp_client_buffer_connection::tcp_client_buffer_connection(
119 : std::string const & addr
120 : , int const port
121 : , mode_t const mode
122 : , bool const blocking)
123 0 : : tcp_client_connection(addr, port, mode)
124 : {
125 0 : if(!blocking)
126 : {
127 0 : non_blocking();
128 : }
129 0 : }
130 :
131 :
132 : /** \brief Check whether this connection still has some input in its buffer.
133 : *
134 : * This function returns true if there is partial incoming data in this
135 : * object's buffer.
136 : *
137 : * \return true if some buffered input is waiting for completion.
138 : */
139 0 : bool tcp_client_buffer_connection::has_input() const
140 : {
141 0 : return !f_line.empty();
142 : }
143 :
144 :
145 :
146 : /** \brief Check whether this connection still has some output in its buffer.
147 : *
148 : * This function returns true if there is still some output in the client
149 : * buffer. Output is added by the write() function, which is called by
150 : * the send_message() function.
151 : *
152 : * \return true if some buffered output is waiting to be sent out.
153 : */
154 0 : bool tcp_client_buffer_connection::has_output() const
155 : {
156 0 : return !f_output.empty();
157 : }
158 :
159 :
160 :
161 : /** \brief Write data to the connection.
162 : *
163 : * This function can be used to send data to this TCP/IP connection.
164 : * The data is bufferized and as soon as the connection can WRITE
165 : * to the socket, it will wake up and send the data. In other words,
166 : * we cannot just sleep and wait for an answer. The transfer will
167 : * be asynchroneous.
168 : *
169 : * \todo
170 : * Optimization: look into writing the \p data buffer directly in
171 : * the socket if the f_output cache is empty. If that works then
172 : * we can completely bypass our intermediate cache. This works only
173 : * if we make sure that the socket is non-blocking, though.
174 : *
175 : * \todo
176 : * Determine whether we may end up with really large buffers that
177 : * grow for a long time. This function only inserts and the
178 : * process_signal() function only reads some of the bytes but it
179 : * does not reduce the size of the buffer until all the data was
180 : * sent.
181 : *
182 : * \param[in] data The pointer to the buffer of data to be sent.
183 : * \param[out] length The number of bytes to send.
184 : *
185 : * \return The number of bytes that were saved in our buffer, 0 if
186 : * no data was written to the buffer (i.e. the socket is
187 : * closed, length is zero, or data is a null pointer.)
188 : */
189 0 : ssize_t tcp_client_buffer_connection::write(void const * data, size_t length)
190 : {
191 0 : if(get_socket() == -1)
192 : {
193 0 : errno = EBADF;
194 0 : return -1;
195 : }
196 :
197 0 : if(data != nullptr && length > 0)
198 : {
199 0 : char const * d(reinterpret_cast<char const *>(data));
200 0 : f_output.insert(f_output.end(), d, d + length);
201 0 : return length;
202 : }
203 :
204 0 : return 0;
205 : }
206 :
207 :
208 : /** \brief The buffer is a writer when the output buffer is not empty.
209 : *
210 : * This function returns true as long as the output buffer of this
211 : * client connection is not empty.
212 : *
213 : * \return true if the output buffer is not empty, false otherwise.
214 : */
215 0 : bool tcp_client_buffer_connection::is_writer() const
216 : {
217 0 : return get_socket() != -1 && !f_output.empty();
218 : }
219 :
220 :
221 : /** \brief Instantiation of process_read().
222 : *
223 : * This function reads incoming data from a socket.
224 : *
225 : * The function is what manages our low level TCP/IP connection protocol
226 : * which is to read one line of data (i.e. bytes up to the next '\n'
227 : * character; note that '\r' are not understood.)
228 : *
229 : * Once a complete line of data was read, it is converted to UTF-8 and
230 : * sent to the next layer using the process_line() function passing
231 : * the line it just read (without the '\n') to that callback.
232 : *
233 : * \sa process_write()
234 : * \sa process_line()
235 : */
236 0 : void tcp_client_buffer_connection::process_read()
237 : {
238 : // we read one character at a time until we get a '\n'
239 : // since we have a non-blocking socket we can read as
240 : // much as possible and then check for a '\n' and keep
241 : // any extra data in a cache.
242 : //
243 0 : if(get_socket() != -1)
244 : {
245 0 : int count_lines(0);
246 0 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
247 0 : std::vector<char> buffer;
248 0 : buffer.resize(1024);
249 0 : for(;;)
250 : {
251 0 : errno = 0;
252 0 : ssize_t const r(read(&buffer[0], buffer.size()));
253 0 : if(r > 0)
254 : {
255 0 : for(ssize_t position(0); position < r; )
256 : {
257 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
258 0 : if(it == buffer.begin() + r)
259 : {
260 : // no newline, just add the whole thing
261 0 : f_line += std::string(&buffer[position], r - position);
262 0 : break; // do not waste time, we know we are done
263 : }
264 :
265 : // retrieve the characters up to the newline
266 : // character and process the line
267 : //
268 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
269 0 : process_line(f_line);
270 0 : ++count_lines;
271 :
272 : // done with that line
273 : //
274 0 : f_line.clear();
275 :
276 : // we had a newline, we may still have some data
277 : // in that buffer; (+1 to skip the '\n' itself)
278 : //
279 0 : position = it - buffer.begin() + 1;
280 : }
281 :
282 : // when we reach here all the data read in `buffer` is
283 : // now either fully processed or in f_line
284 : //
285 : // TODO: change the way this works so we can test the
286 : // limit after each process_line() call
287 : //
288 0 : if(count_lines >= get_event_limit()
289 0 : || get_current_date() >= date_limit)
290 : {
291 : // we reach one or both limits, stop processing so
292 : // the other events have a chance to run
293 : //
294 0 : break;
295 : }
296 : }
297 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
298 : {
299 : // no more data available at this time
300 : break;
301 : }
302 : else //if(r < 0)
303 : {
304 : // TODO: do something about the error
305 0 : int const e(errno);
306 : SNAP_LOG_ERROR
307 0 : << "an error occurred while reading from socket (errno: "
308 0 : << e
309 0 : << " -- "
310 0 : << strerror(e)
311 0 : << ").";
312 0 : process_error();
313 0 : return;
314 : }
315 : }
316 : }
317 :
318 : // process next level too
319 0 : tcp_client_connection::process_read();
320 : }
321 :
322 :
323 : /** \brief Instantiation of process_write().
324 : *
325 : * This function writes outgoing data to a socket.
326 : *
327 : * This function manages our own internal cache, which we use to allow
328 : * for out of synchronization (non-blocking) output.
329 : *
330 : * When the output buffer goes empty, this function calls the
331 : * process_empty_buffer() callback.
332 : *
333 : * \sa write()
334 : * \sa process_read()
335 : * \sa process_empty_buffer()
336 : */
337 0 : void tcp_client_buffer_connection::process_write()
338 : {
339 0 : if(get_socket() != -1)
340 : {
341 0 : errno = 0;
342 0 : ssize_t const r(tcp_client_connection::write(&f_output[f_position], f_output.size() - f_position));
343 0 : if(r > 0)
344 : {
345 : // some data was written
346 0 : f_position += r;
347 0 : if(f_position >= f_output.size())
348 : {
349 0 : f_output.clear();
350 0 : f_position = 0;
351 0 : process_empty_buffer();
352 : }
353 : }
354 0 : else if(r < 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
355 : {
356 : // connection is considered bad, generate an error
357 : //
358 0 : int const e(errno);
359 : SNAP_LOG_ERROR
360 0 : << "an error occurred while writing to socket of \""
361 0 : << get_name()
362 0 : << "\" (errno: "
363 0 : << e
364 0 : << " -- "
365 0 : << strerror(e)
366 0 : << ").";
367 0 : process_error();
368 0 : return;
369 : }
370 : }
371 :
372 : // process next level too
373 0 : tcp_client_connection::process_write();
374 : }
375 :
376 :
377 : /** \brief The hang up event occurred.
378 : *
379 : * This function closes the socket and then calls the previous level
380 : * hang up code which removes this connection from the snap_communicator
381 : * object it was last added in.
382 : */
383 0 : void tcp_client_buffer_connection::process_hup()
384 : {
385 : // this connection is dead...
386 : //
387 0 : close();
388 :
389 : // process next level too
390 0 : tcp_client_connection::process_hup();
391 0 : }
392 :
393 :
394 : /** \fn tcp_client_buffer_connection::process_line(std::string const & line);
395 : * \brief Process a line of data.
396 : *
397 : * This is the default virtual class that can be overridden to implement
398 : * your own processing. By default this function does nothing.
399 : *
400 : * \note
401 : * At this point I implemented this function so one can instantiate
402 : * a snap_tcp_server_client_buffer_connection without having to
403 : * derive it, although I do not think that is 100% proper.
404 : *
405 : * \param[in] line The line of data that was just read from the input
406 : * socket.
407 : */
408 :
409 :
410 :
411 6 : } // namespace ed
412 : // vim: ts=4 sw=4 et
|