Line data Source code
1 : // Copyright (c) 2012-2021 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/eventdispatcher
4 : // contact@m2osw.com
5 : //
6 : // This program is free software; you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation; either version 2 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License along
17 : // with this program; if not, write to the Free Software Foundation, Inc.,
18 : // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 :
20 : /** \file
21 : * \brief Implementation of the AF_UNIX bufferization.
22 : *
23 : * This class handles the bufferization of data send via a stream local
24 : * connection.
25 : */
26 :
27 :
28 : // self
29 : //
30 : #include "eventdispatcher/local_stream_client_buffer_connection.h"
31 :
32 : #include "eventdispatcher/utils.h"
33 :
34 :
35 : // snaplogger lib
36 : //
37 : #include <snaplogger/message.h>
38 :
39 :
40 : // C++ lib
41 : //
42 : #include <algorithm>
43 : #include <cstring>
44 :
45 :
46 : // last include
47 : //
48 : #include <snapdev/poison.h>
49 :
50 :
51 :
52 : namespace ed
53 : {
54 :
55 :
56 :
57 : /** \brief Initialize a bufferized local client socket.
58 : *
59 : * The client socket gets initialized with the specified address (\p u)
60 : * parameter.
61 : *
62 : * \param[in] addr The Unix address to connect to.
63 : * \param[in] blocking If true, keep a blocking socket, otherwise non-blocking.
64 : * \param[in] close_on_exec Whether the connection should automatically be
65 : * closed on an execve().
66 : */
67 1 : local_stream_client_buffer_connection::local_stream_client_buffer_connection(
68 : addr::unix const & address
69 : , bool const blocking
70 1 : , bool const close_on_exec)
71 1 : : local_stream_client_connection(address, blocking, close_on_exec)
72 : {
73 1 : }
74 :
75 :
76 : /** \brief Check whether this connection still has some input in its buffer.
77 : *
78 : * This function returns true if there is partial incoming data in this
79 : * object's buffer.
80 : *
81 : * \return true if some buffered input is waiting for completion.
82 : */
83 0 : bool local_stream_client_buffer_connection::has_input() const
84 : {
85 0 : return !f_line.empty();
86 : }
87 :
88 :
89 :
90 : /** \brief Check whether this connection still has some output in its buffer.
91 : *
92 : * This function returns true if there is still some output in the client
93 : * buffer. Output is added by the write() function, which is called by
94 : * the send_message() function.
95 : *
96 : * \return true if some buffered output is waiting to be sent out.
97 : */
98 0 : bool local_stream_client_buffer_connection::has_output() const
99 : {
100 0 : return !f_output.empty();
101 : }
102 :
103 :
104 :
105 : /** \brief Write data to the connection.
106 : *
107 : * This function can be used to send data to this local connection.
108 : * The data is bufferized and as soon as the connection can WRITE
109 : * to the socket, it will wake up and send the data. In other words,
110 : * we cannot just sleep and wait for an answer. The transfer will
111 : * be asynchronous.
112 : *
113 : * \todo
114 : * Optimization: look into writing the \p data buffer directly in
115 : * the socket if the f_output cache is empty. If that works then
116 : * we can completely bypass our intermediate cache. This works only
117 : * if we make sure that the socket is non-blocking, though.
118 : *
119 : * \todo
120 : * Determine whether we may end up with really large buffers that
121 : * grow for a long time. This function only inserts and the
122 : * process_signal() function only reads some of the bytes but it
123 : * does not reduce the size of the buffer until all the data was
124 : * sent.
125 : *
126 : * \param[in] data The pointer to the buffer of data to be sent.
127 : * \param[out] length The number of bytes to send.
128 : *
129 : * \return The number of bytes that were saved in our buffer, 0 if
130 : * no data was written to the buffer (i.e. length is zero or data
131 : * is a null pointer). Or -1 on an error (i.e. the socket is closed).
132 : */
133 2 : ssize_t local_stream_client_buffer_connection::write(void const * data, size_t length)
134 : {
135 2 : if(get_socket() == -1)
136 : {
137 0 : errno = EBADF;
138 0 : return -1;
139 : }
140 :
141 2 : if(data != nullptr && length > 0)
142 : {
143 2 : char const * d(reinterpret_cast<char const *>(data));
144 2 : f_output.insert(f_output.end(), d, d + length);
145 2 : return length;
146 : }
147 :
148 0 : return 0;
149 : }
150 :
151 :
152 : /** \brief The buffer is a writer when the output buffer is not empty.
153 : *
154 : * This function returns true as long as the output buffer of this
155 : * client connection is not empty.
156 : *
157 : * \return true if the output buffer is not empty, false otherwise.
158 : */
159 5 : bool local_stream_client_buffer_connection::is_writer() const
160 : {
161 5 : return get_socket() != -1 && !f_output.empty();
162 : }
163 :
164 :
165 : /** \brief Instantiation of process_read().
166 : *
167 : * This function reads incoming data from a socket.
168 : *
169 : * The function is what manages our low level TCP/IP connection protocol
170 : * which is to read one line of data (i.e. bytes up to the next '\n'
171 : * character; note that '\r' are not understood.)
172 : *
173 : * Once a complete line of data was read, it is converted to UTF-8 and
174 : * sent to the next layer using the process_line() function passing
175 : * the line it just read (without the '\n') to that callback.
176 : *
177 : * \sa process_write()
178 : * \sa process_line()
179 : */
180 1 : void local_stream_client_buffer_connection::process_read()
181 : {
182 : // we read one character at a time until we get a '\n'
183 : // since we have a non-blocking socket we can read as
184 : // much as possible and then check for a '\n' and keep
185 : // any extra data in a cache.
186 : //
187 1 : if(get_socket() != -1)
188 : {
189 1 : int count_lines(0);
190 1 : std::int64_t const date_limit(get_current_date() + get_processing_time_limit());
191 2 : std::vector<char> buffer(1024);
192 : for(;;)
193 : {
194 2 : errno = 0;
195 2 : ssize_t const r(read(&buffer[0], buffer.size()));
196 2 : if(r > 0)
197 : {
198 2 : for(ssize_t position(0); position < r; )
199 : {
200 1 : std::vector<char>::const_iterator it(std::find(buffer.begin() + position, buffer.begin() + r, '\n'));
201 1 : if(it == buffer.begin() + r)
202 : {
203 : // no newline, just add the whole thing
204 0 : f_line += std::string(&buffer[position], r - position);
205 0 : break; // do not waste time, we know we are done
206 : }
207 :
208 : // retrieve the characters up to the newline
209 : // character and process the line
210 : //
211 1 : f_line += std::string(&buffer[position], it - buffer.begin() - position);
212 1 : process_line(f_line);
213 1 : ++count_lines;
214 :
215 : // done with that line
216 : //
217 1 : f_line.clear();
218 :
219 : // we had a newline, we may still have some data
220 : // in that buffer; (+1 to skip the '\n' itself)
221 : //
222 1 : position = it - buffer.begin() + 1;
223 : }
224 :
225 : // when we reach here all the data read in `buffer` is
226 : // now either fully processed or in f_line
227 : //
228 : // TODO: change the way this works so we can test the
229 : // limit after each process_line() call
230 : //
231 2 : if(count_lines >= get_event_limit()
232 1 : || get_current_date() >= date_limit)
233 : {
234 : // we reach one or both limits, stop processing so
235 : // the other events have a chance to run
236 : //
237 0 : break;
238 : }
239 : }
240 1 : else if(r == 0 || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
241 : {
242 : // no more data available at this time
243 : //
244 : break;
245 : }
246 : else //if(r < 0)
247 : {
248 : // TODO: do something about the error
249 : //
250 0 : int const e(errno);
251 0 : SNAP_LOG_ERROR
252 0 : << "an error occurred while reading from socket (errno: "
253 : << e
254 : << " -- "
255 0 : << strerror(e)
256 : << ")."
257 : << SNAP_LOG_SEND;
258 0 : process_error();
259 0 : return;
260 : }
261 1 : }
262 : }
263 :
264 : // process next level too
265 : //
266 1 : local_stream_client_connection::process_read();
267 : }
268 :
269 :
270 : /** \brief Instantiation of process_write().
271 : *
272 : * This function writes outgoing data to a socket.
273 : *
274 : * This function manages our own internal cache, which we use to allow
275 : * for out of synchronization (non-blocking) output.
276 : *
277 : * When the output buffer goes empty, this function calls the
278 : * process_empty_buffer() callback.
279 : *
280 : * \sa write()
281 : * \sa process_read()
282 : * \sa process_empty_buffer()
283 : */
284 2 : void local_stream_client_buffer_connection::process_write()
285 : {
286 2 : if(get_socket() != -1)
287 : {
288 2 : errno = 0;
289 2 : ssize_t const r(local_stream_client_connection::write(&f_output[f_position], f_output.size() - f_position));
290 2 : if(r > 0)
291 : {
292 : // some data was written
293 2 : f_position += r;
294 2 : if(f_position >= f_output.size())
295 : {
296 2 : f_output.clear();
297 2 : f_position = 0;
298 2 : process_empty_buffer();
299 : }
300 : }
301 0 : else if(r < 0 && errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
302 : {
303 : // connection is considered bad, generate an error
304 : //
305 0 : int const e(errno);
306 0 : SNAP_LOG_ERROR
307 0 : << "an error occurred while writing to socket of \""
308 0 : << get_name()
309 0 : << "\" (errno: "
310 : << e
311 : << " -- "
312 0 : << strerror(e)
313 : << ")."
314 : << SNAP_LOG_SEND;
315 0 : process_error();
316 0 : return;
317 : }
318 : }
319 :
320 : // process next level too
321 : //
322 2 : local_stream_client_connection::process_write();
323 : }
324 :
325 :
326 : /** \brief The hang up event occurred.
327 : *
328 : * This function closes the socket and then calls the previous level
329 : * hang up code which removes this connection from the communicator
330 : * object it was last added in.
331 : */
332 0 : void local_stream_client_buffer_connection::process_hup()
333 : {
334 : // this connection is dead...
335 : //
336 0 : close();
337 :
338 : // process next level too
339 : //
340 0 : local_stream_client_connection::process_hup();
341 0 : }
342 :
343 :
344 : /** \fn local_stream_client_buffer_connection::process_line(std::string const & line);
345 : * \brief Process a line of data.
346 : *
347 : * This is the default virtual class that can be overridden to implement
348 : * your own processing.
349 : *
350 : * \param[in] line The line of data that was just read from the input
351 : * socket.
352 : */
353 :
354 :
355 :
356 6 : } // namespace ed
357 : // vim: ts=4 sw=4 et
|