Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program; if not, write to the Free Software
18 : // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the Snap Communicator class.
22 : *
23 : * This class wraps the C poll() interface in a C++ object with many types
24 : * of objects:
25 : *
26 : * \li Server Connections; for software that want to offer a port to
27 : * which clients can connect to; the server will call accept()
28 : * once a new client connection is ready; this results in a
29 : * Server/Client connection object
30 : * \li Client Connections; for software that want to connect to
31 : * a server; these expect the IP address and port to connect to
32 : * \li Server/Client Connections; for the server when it accepts a new
33 : * connection; in this case the server gets a socket from accept()
34 : * and creates one of these objects to handle the connection
35 : *
36 : * Using the poll() function is the easiest and allows us to listen
37 : * on pretty much any number of sockets (on my server it is limited
38 : * at 16,768 and frankly over 1,000 we probably will start to have
39 : * real slowness issues on small VPN servers.)
40 : */
41 :
42 :
43 : // self
44 : //
45 : #include "eventdispatcher/tcp_client_buffer_connection.h"
46 :
47 : #include "eventdispatcher/utils.h"
48 :
49 :
50 : // snaplogger lib
51 : //
52 : #include <snaplogger/message.h>
53 :
54 :
55 : // C++ lib
56 : //
57 : #include <algorithm>
58 : #include <cstring>
59 :
60 :
61 : // last include
62 : //
63 : #include <snapdev/poison.h>
64 :
65 :
66 :
67 : namespace ed
68 : {
69 :
70 :
71 :
72 : /** \brief Initialize a client socket.
73 : *
74 : * The client socket gets initialized with the specified 'socket'
75 : * parameter.
76 : *
77 : * This constructor creates a writer connection too. This gives you
78 : * a read/write connection. You can get the writer with the writer()
79 : * function. So you may write data with:
80 : *
81 : * \code
82 : * my_reader.writer().write(buf, buf_size);
83 : * \endcode
84 : *
85 : * \param[in] addr The address to connect to.
86 : * \param[in] port The port to connect to.
87 : * \param[in] mode The mode to connect as (PLAIN or SECURE).
88 : * \param[in] blocking If true, keep a blocking socket, other non-blocking.
89 : */
90 0 : tcp_client_buffer_connection::tcp_client_buffer_connection(
91 : std::string const & addr
92 : , int const port
93 : , mode_t const mode
94 0 : , bool const blocking)
95 0 : : tcp_client_connection(addr, port, mode)
96 : {
97 0 : if(!blocking)
98 : {
99 0 : non_blocking();
100 : }
101 0 : }
102 :
103 :
104 : /** \brief Check whether this connection still has some input in its buffer.
105 : *
106 : * This function returns true if there is partial incoming data in this
107 : * object's buffer.
108 : *
109 : * \return true if some buffered input is waiting for completion.
110 : */
111 0 : bool tcp_client_buffer_connection::has_input() const
112 : {
113 0 : return !f_line.empty();
114 : }
115 :
116 :
117 :
118 : /** \brief Check whether this connection still has some output in its buffer.
119 : *
120 : * This function returns true if there is still some output in the client
121 : * buffer. Output is added by the write() function, which is called by
122 : * the send_message() function.
123 : *
124 : * \return true if some buffered output is waiting to be sent out.
125 : */
126 0 : bool tcp_client_buffer_connection::has_output() const
127 : {
128 0 : return !f_output.empty();
129 : }
130 :
131 :
132 :
133 : /** \brief Write data to the connection.
134 : *
135 : * This function can be used to send data to this TCP/IP connection.
136 : * The data is bufferized and as soon as the connection can WRITE
137 : * to the socket, it will wake up and send the data. In other words,
138 : * we cannot just sleep and wait for an answer. The transfer will
139 : * be asynchronous.
140 : *
141 : * \todo
142 : * Optimization: look into writing the \p data buffer directly in
143 : * the socket if the f_output cache is empty. If that works then
144 : * we can completely bypass our intermediate cache. This works only
145 : * if we make sure that the socket is non-blocking, though.
146 : *
147 : * \todo
148 : * Determine whether we may end up with really large buffers that
149 : * grow for a long time. This function only inserts and the
150 : * process_signal() function only reads some of the bytes but it
151 : * does not reduce the size of the buffer until all the data was
152 : * sent.
153 : *
154 : * \param[in] data The pointer to the buffer of data to be sent.
155 : * \param[out] length The number of bytes to send.
156 : *
157 : * \return The number of bytes that were saved in our buffer, 0 if
158 : * no data was written to the buffer (i.e. length is zero or data
159 : * is a null pointer). Or -1 on an error (i.e. the socket is closed).
160 : */
161 0 : ssize_t tcp_client_buffer_connection::write(void const * data, size_t length)
162 : {
163 0 : if(get_socket() == -1)
164 : {
165 0 : errno = EBADF;
166 0 : return -1;
167 : }
168 :
169 0 : if(data != nullptr && length > 0)
170 : {
171 0 : char const * d(reinterpret_cast<char const *>(data));
172 0 : f_output.insert(f_output.end(), d, d + length);
173 0 : return length;
174 : }
175 :
176 0 : return 0;
177 : }
178 :
179 :
180 : /** \brief The buffer is a writer when the output buffer is not empty.
181 : *
182 : * This function returns true as long as the output buffer of this
183 : * client connection is not empty.
184 : *
185 : * \return true if the output buffer is not empty, false otherwise.
186 : */
187 0 : bool tcp_client_buffer_connection::is_writer() const
188 : {
189 0 : return get_socket() != -1 && !f_output.empty();
190 : }
191 :
192 :
193 : /** \brief Instantiation of process_read().
194 : *
195 : * This function reads incoming data from a socket.
196 : *
197 : * The function is what manages our low level TCP/IP connection protocol
198 : * which is to read one line of data (i.e. bytes up to the next '\n'
199 : * character; note that '\r' are not understood.)
200 : *
201 : * Once a complete line of data was read, it is converted to UTF-8 and
202 : * sent to the next layer using the process_line() function passing
203 : * the line it just read (without the '\n') to that callback.
204 : *
205 : * \sa process_write()
206 : * \sa process_line()
207 : */
208 0 : void tcp_client_buffer_connection::process_read()
209 : {
210 : // we read one character at a time until we get a '\n'
211 : // since we have a non-blocking socket we can read as
212 : // much as possible and then check for a '\n' and keep
213 : // any extra data in a cache.
214 : //
215 0 : if(get_socket() != -1)
216 : {
217 0 : int count_lines(0);
218 0 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
219 0 : std::vector<char> buffer;
220 0 : buffer.resize(1024);
221 : for(;;)
222 : {
223 0 : errno = 0;
224 0 : ssize_t const r(read(&buffer[0], buffer.size()));
225 0 : if(r > 0)
226 : {
227 0 : for(ssize_t position(0); position < r; )
228 : {
229 0 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
230 0 : if(it == buffer.begin() + r)
231 : {
232 : // no newline, just add the whole thing
233 0 : f_line += std::string(&buffer[position], r - position);
234 0 : break; // do not waste time, we know we are done
235 : }
236 :
237 : // retrieve the characters up to the newline
238 : // character and process the line
239 : //
240 0 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
241 0 : process_line(f_line);
242 0 : ++count_lines;
243 :
244 : // done with that line
245 : //
246 0 : f_line.clear();
247 :
248 : // we had a newline, we may still have some data
249 : // in that buffer; (+1 to skip the '\n' itself)
250 : //
251 0 : position = it - buffer.begin() + 1;
252 : }
253 :
254 : // when we reach here all the data read in `buffer` is
255 : // now either fully processed or in f_line
256 : //
257 : // TODO: change the way this works so we can test the
258 : // limit after each process_line() call
259 : //
260 0 : if(count_lines >= get_event_limit()
261 0 : || get_current_date() >= date_limit)
262 : {
263 : // we reach one or both limits, stop processing so
264 : // the other events have a chance to run
265 : //
266 0 : break;
267 : }
268 : }
269 0 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
270 : {
271 : // no more data available at this time
272 : break;
273 : }
274 : else //if(r < 0)
275 : {
276 : // TODO: do something about the error
277 0 : int const e(errno);
278 0 : SNAP_LOG_ERROR
279 0 : << "an error occurred while reading from socket (errno: "
280 : << e
281 : << " -- "
282 0 : << strerror(e)
283 : << ")."
284 : << SNAP_LOG_SEND;
285 0 : process_error();
286 0 : return;
287 : }
288 0 : }
289 : }
290 :
291 : // process next level too
292 0 : tcp_client_connection::process_read();
293 : }
294 :
295 :
296 : /** \brief Instantiation of process_write().
297 : *
298 : * This function writes outgoing data to a socket.
299 : *
300 : * This function manages our own internal cache, which we use to allow
301 : * for out of synchronization (non-blocking) output.
302 : *
303 : * When the output buffer goes empty, this function calls the
304 : * process_empty_buffer() callback.
305 : *
306 : * \sa write()
307 : * \sa process_read()
308 : * \sa process_empty_buffer()
309 : */
310 0 : void tcp_client_buffer_connection::process_write()
311 : {
312 0 : if(get_socket() != -1)
313 : {
314 0 : errno = 0;
315 0 : ssize_t const r(tcp_client_connection::write(&f_output[f_position], f_output.size() - f_position));
316 0 : if(r > 0)
317 : {
318 : // some data was written
319 0 : f_position += r;
320 0 : if(f_position >= f_output.size())
321 : {
322 0 : f_output.clear();
323 0 : f_position = 0;
324 0 : process_empty_buffer();
325 : }
326 : }
327 0 : else if(r < 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
328 : {
329 : // connection is considered bad, generate an error
330 : //
331 0 : int const e(errno);
332 0 : SNAP_LOG_ERROR
333 0 : << "an error occurred while writing to socket of \""
334 0 : << get_name()
335 0 : << "\" (errno: "
336 : << e
337 : << " -- "
338 0 : << strerror(e)
339 : << ")."
340 : << SNAP_LOG_SEND;
341 0 : process_error();
342 0 : return;
343 : }
344 : }
345 :
346 : // process next level too
347 0 : tcp_client_connection::process_write();
348 : }
349 :
350 :
351 : /** \brief The hang up event occurred.
352 : *
353 : * This function closes the socket and then calls the previous level
354 : * hang up code which removes this connection from the communicator
355 : * object it was last added in.
356 : */
357 0 : void tcp_client_buffer_connection::process_hup()
358 : {
359 : // this connection is dead...
360 : //
361 0 : close();
362 :
363 : // process next level too
364 0 : tcp_client_connection::process_hup();
365 0 : }
366 :
367 :
368 : /** \fn tcp_client_buffer_connection::process_line(std::string const & line);
369 : * \brief Process a line of data.
370 : *
371 : * This is the default virtual class that can be overridden to implement
372 : * your own processing. By default this function does nothing.
373 : *
374 : * \note
375 : * At this point I implemented this function so one can instantiate
376 : * a tcp_server_client_buffer_connection without having to
377 : * derive it, although I do not think that is 100% proper.
378 : *
379 : * \param[in] line The line of data that was just read from the input
380 : * socket.
381 : */
382 :
383 :
384 :
385 6 : } // namespace ed
386 : // vim: ts=4 sw=4 et
|