Line data Source code
1 : /*
2 : * Text:
3 : * libsnapwebsites/src/libdbproxy/proxy.cpp
4 : *
5 : * Description:
6 : * Handle sending CQL orders to the snapproxy and receiving the
7 : * Cassandra results.
8 : *
9 : * Documentation:
10 : * See each function below.
11 : *
12 : * License:
13 : * Copyright (c) 2011-2019 Made to Order Software Corp. All Rights Reserved
14 : *
15 : * https://snapwebsites.org/
16 : * contact@m2osw.com
17 : *
18 : * Permission is hereby granted, free of charge, to any person obtaining a
19 : * copy of this software and associated documentation files (the
20 : * "Software"), to deal in the Software without restriction, including
21 : * without limitation the rights to use, copy, modify, merge, publish,
22 : * distribute, sublicense, and/or sell copies of the Software, and to
23 : * permit persons to whom the Software is furnished to do so, subject to
24 : * the following conditions:
25 : *
26 : * The above copyright notice and this permission notice shall be included
27 : * in all copies or substantial portions of the Software.
28 : *
29 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
30 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
31 : * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
32 : * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
33 : * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
34 : * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
35 : * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 : */
37 :
38 : // ourselves
39 : //
40 : #include "libdbproxy/proxy.h"
41 : #include "libdbproxy/libdbproxy.h"
42 :
43 : #include <snapwebsites/log.h>
44 :
45 : // 3rd party lib
46 : //
47 : #include <QtCore>
48 :
49 : // C++ lib
50 : //
51 : #include <iostream>
52 : #include <sstream>
53 :
54 : // OS libs
55 : //
56 : #include <openssl/err.h>
57 : #include <unistd.h>
58 :
59 :
60 : namespace libdbproxy
61 : {
62 :
63 :
64 : namespace
65 : {
66 :
67 :
68 : /** \brief Release a BIO object.
69 : *
70 : * This function is used with the bio shared pointer to make sure we
71 : * can automatically remove the bio.
72 : *
73 : * Also, it prevents the BIO_free_all() function from sending a shutdown()
74 : * to the other side. I'm not too sure why the programmers thought that
75 : * was a good idea.
76 : *
77 : * \param[in] bio The BIO object to be freed.
78 : */
79 0 : void bio_deleter(BIO * bio)
80 : {
81 : // IMPORTANT NOTE:
82 : //
83 : // The BIO_free_all() calls shutdown() on the socket. This is not
84 : // acceptable in a normal Unix application that makes use of fork().
85 : // So... instead we ask the BIO interface to not close the socket,
86 : // and instead we close it ourselves. This means the shutdown()
87 : // never gets called.
88 : //
89 0 : BIO_set_close(bio, BIO_NOCLOSE);
90 :
91 : int c;
92 : #pragma GCC diagnostic push
93 : #pragma GCC diagnostic ignored "-Wold-style-cast"
94 0 : BIO_get_fd(bio, &c);
95 : #pragma GCC diagnostic pop
96 0 : if(c != -1)
97 : {
98 0 : close(c);
99 : }
100 :
101 0 : BIO_free_all(bio);
102 0 : }
103 :
104 :
105 : bool g_bio_initialized = false;
106 0 : void bio_initialize()
107 : {
108 : // already initialized?
109 0 : if(g_bio_initialized)
110 : {
111 0 : return;
112 : }
113 0 : g_bio_initialized = true;
114 :
115 : // Make sure the SSL library gets initialized
116 : //SSL_library_init();
117 :
118 : // TBD: should we call the load string functions only when we
119 : // are about to generate an error?
120 0 : ERR_load_crypto_strings();
121 : //ERR_load_SSL_strings();
122 :
123 : // TODO: define a way to only define safe algorithm
124 : // (it looks like we can force TLSv1 below at least)
125 : //OpenSSL_add_all_algorithms();
126 :
127 : // TBD: need a PRNG seeding before creating a new SSL context?
128 : }
129 :
130 : // this fast_buffer implements a very fast "lets use the stack if
131 : // we do not need to much memory allocated" so that way we can
132 : // avoid many malloc()'s
133 : //
134 : // many times, the reply will be very small (less than 1Kb)
135 : // so using the stack for that purpose is an incredible saver!
136 : // (i.e. in assembly, the stack allocation is one SUB SSP, 4096
137 : // and to release the memory, ADD SSP, 4096, so 2 instructions
138 : // which are anyway done at the start and end of the function
139 : // call.)
140 : //
141 : class fast_buffer
142 : {
143 : public:
144 0 : fast_buffer(size_t size)
145 0 : : f_ptr(size > sizeof(f_fast_alloc) ? new char[size] : f_fast_alloc)
146 : {
147 0 : }
148 :
149 0 : ~fast_buffer()
150 0 : {
151 0 : if(f_ptr != f_fast_alloc)
152 : {
153 0 : delete [] f_ptr;
154 : }
155 0 : }
156 :
157 : // we don't offer copying at the moment, although it would be possible
158 : // we just don't use it so don't waste time developing it, but make sure
159 : // we're safe in case we wanted to be able to...
160 : //
161 : fast_buffer(fast_buffer const & rhs) = delete;
162 : fast_buffer & operator = (fast_buffer const & rhs) = delete;
163 :
164 0 : char * get()
165 : {
166 0 : return f_ptr;
167 : }
168 :
169 : private:
170 : char f_fast_alloc[4096]; // default
171 : char * f_ptr = nullptr;
172 : };
173 :
174 :
175 : }
176 :
177 :
178 : /** \brief Creating a proxy from the daemon.
179 : *
180 : * This constructor is used whenever we build the proxy
181 : * object from the daemon side. In that case the daemon calls
182 : * the receiveOrder() and sendResult() with a socket as one of
183 : * the parameters.
184 : */
185 0 : proxy::proxy()
186 : {
187 0 : }
188 :
189 :
190 0 : proxy::proxy(QString const & host, int port)
191 : : f_host(host)
192 0 : , f_port(port)
193 : {
194 0 : }
195 :
196 :
197 0 : order_result proxy::sendOrder(order const & order)
198 : {
199 : // Note: by default result is marked as "failed"
200 : //
201 0 : order_result result;
202 :
203 0 : QByteArray const encoded(order.encodeOrder());
204 :
205 : // send the encoded buffer in one write
206 : //
207 0 : if(static_cast<int>(bio_write(encoded.data(), encoded.size())) != encoded.size())
208 : {
209 0 : SNAP_LOG_DEBUG("++++ bio_write() failed!");
210 0 : return result;
211 : }
212 :
213 : // then we want to wait for the result
214 : //
215 0 : if(order.blocking())
216 : {
217 : // results are very similar to what we send: 4 bytes of
218 : // what we are receiving, a size, and the result buffer
219 : // of data encoded as per the order_result scheme
220 : //
221 : unsigned char buf[8];
222 0 : int const r(bio_read(buf, sizeof(buf)));
223 0 : if(r != sizeof(buf)) // 4 letters + 4 bytes for size
224 : {
225 0 : SNAP_LOG_DEBUG("++++ bio_read() could not read ")(sizeof(buf))(" bytes of header, instead it got ")(r)(" bytes, so it failed!");
226 0 : return result;
227 : }
228 :
229 0 : std::string const command(reinterpret_cast<char const *>(buf), 4);
230 :
231 0 : uint32_t const reply_size(
232 0 : (buf[4] << 24)
233 0 : | (buf[5] << 16)
234 0 : | (buf[6] << 8)
235 0 : | (buf[7] << 0));
236 :
237 : // Coverity says we should have a test for largest size
238 : //
239 0 : if(reply_size > 200 * 1024 * 1024)
240 : {
241 0 : SNAP_LOG_DEBUG("++++ reply_size out of bounds! (max. 200Mb) size=")(reply_size);
242 0 : return result;
243 : }
244 :
245 0 : fast_buffer reply(reply_size);
246 0 : if(reply_size > 0)
247 : {
248 0 : if(static_cast<uint32_t>(bio_read(reply.get(), reply_size)) != reply_size)
249 : {
250 0 : SNAP_LOG_DEBUG("++++ reply_size not read! size=")(reply_size);
251 0 : return result;
252 : }
253 : }
254 :
255 0 : if(result.decodeResult(reinterpret_cast<unsigned char const *>(reply.get()), reply_size))
256 : {
257 : // right now we expect either SUCS or EROR
258 0 : result.setSucceeded(command == "SUCS");
259 : }
260 : }
261 : else
262 : {
263 0 : result.setSucceeded(true);
264 : }
265 :
266 0 : return result;
267 : }
268 :
269 :
270 :
271 : /** \brief Read the next incoming order.
272 : *
273 : * This function is called by snapdbproxy to listen for further data
274 : * store orders to forward to Cassandra.
275 : *
276 : * The function blocks reading on the input \p reader. The result of
277 : * the function is exactly one order. snapdbproxy takes care of
278 : * the rest which is in generate to send the order to Cassandra,
279 : * wait for the answer, encode the answer and reply to client
280 : * with an encoded result (unless the order says it is "non-blocking"
281 : * in which case no reply are expected.)
282 : *
283 : * \todo
284 : * Look into whether we should instead use a poll() on all the
285 : * sockets, but right now we expect the snapdbproxy to use one
286 : * thread per socket...
287 : *
288 : * \param[in] reader A proxy_io implementation that can
289 : * read data from somewhere.
290 : *
291 : * \return The order just read, or an invalid order.
292 : */
293 0 : order proxy::receiveOrder(proxy_io & io)
294 : {
295 0 : if(!f_host.isEmpty())
296 : {
297 0 : SNAP_LOG_DEBUG("++++ receiveOrder(): f_host is not empty!");
298 0 : throw exception("proxy::receiveOrder() called from the client...");
299 : }
300 :
301 : // create an invalid order by default
302 : //
303 0 : order order;
304 0 : order.setValidOrder(false);
305 :
306 : // each order starts with a 4 letter command
307 : //
308 : unsigned char buf[8];
309 0 : int const r(io.read(buf, sizeof(buf)));
310 0 : if(r != sizeof(buf)) // 4 letters + 4 bytes for size
311 : {
312 : // this one happens all the time when the client exits without
313 : // sending a clean disconnect to the snapdbproxy daemon
314 : //
315 0 : SNAP_LOG_DEBUG("++++ io.read() could not read ")
316 0 : (sizeof(buf))
317 0 : (" bytes of header, instead it got ")
318 0 : (r)
319 0 : (" bytes, so it failed!");
320 0 : return order;
321 : }
322 :
323 0 : uint32_t const order_size(
324 0 : (buf[4] << 24)
325 0 : | (buf[5] << 16)
326 0 : | (buf[6] << 8)
327 0 : | (buf[7] << 0));
328 :
329 : // Coverity says we should have a test for largest size
330 : //
331 0 : if(order_size > 200 * 1024 * 1024)
332 : {
333 0 : SNAP_LOG_DEBUG("++++ order_size out of bounds! (max. 200Mb) size=")(order_size);
334 0 : return order;
335 : }
336 :
337 0 : std::string const command(reinterpret_cast<char const *>(buf), 4);
338 0 : if(command != "CQLP")
339 : {
340 0 : SNAP_LOG_DEBUG("++++ wrong command!");
341 0 : return order;
342 : }
343 :
344 : // now we want to read the order itself, so we need a buffer
345 : //
346 0 : fast_buffer order_data(order_size);
347 0 : if(static_cast<uint32_t>(io.read(order_data.get(), order_size)) != order_size)
348 : {
349 0 : SNAP_LOG_DEBUG("++++ io.read() error!");
350 0 : return order;
351 : }
352 :
353 0 : if(!order.decodeOrder(reinterpret_cast<unsigned char const *>(order_data.get()), order_size))
354 : {
355 0 : SNAP_LOG_DEBUG("++++ decodeOrder() failed!");
356 0 : return order;
357 : }
358 :
359 : // it worked, the order is valid
360 : //
361 0 : order.setValidOrder(true);
362 :
363 0 : return order;
364 : }
365 :
366 :
367 : /** \brief Send a result back to a client.
368 : *
369 : * This function is used to send the specified \p result back to
370 : * the client that sent an order ealier.
371 : *
372 : * While writing to a socket, if the client closes the socket, it is
373 : * likely that the write() function will return an invalid size. As
374 : * a result, this function returns false. On a false, you should end
375 : * your loop immediately.
376 : *
377 : * \param[in] io An object capable of writing the result to a socket.
378 : * \param[in] result The result to be sent to the client.
379 : *
380 : * \return true if the message is sent as expected, false otherwise.
381 : */
382 0 : bool proxy::sendResult(proxy_io & io, order_result const & result)
383 : {
384 0 : if(!f_host.isEmpty())
385 : {
386 0 : throw exception("proxy::sendResult() called from the client...");
387 : }
388 :
389 0 : QByteArray const encoded(result.encodeResult());
390 :
391 : // now send the encoded buffer all at once
392 : //
393 0 : if(io.write(encoded.data(), encoded.size()) != encoded.size())
394 : {
395 0 : return false;
396 : }
397 :
398 : // result sent successfully
399 : //
400 0 : return true;
401 : }
402 :
403 :
404 0 : bool proxy::isConnected() const
405 : {
406 0 : return f_bio != nullptr;
407 : }
408 :
409 :
410 0 : void proxy::bio_get()
411 : {
412 0 : if(!f_bio)
413 : {
414 0 : if(f_host.isEmpty())
415 : {
416 0 : throw exception("proxy::bio_get(): server cannot call bio_get()...");
417 : }
418 :
419 0 : bio_initialize();
420 :
421 : // create a plain BIO connection
422 0 : std::shared_ptr<BIO> bio; // use reset(), see SNAP-507
423 0 : bio.reset(BIO_new(BIO_s_connect()), bio_deleter);
424 0 : if(!bio)
425 : {
426 0 : ERR_print_errors_fp(stderr);
427 0 : throw exception("proxy::bio_get(): failed initializing a BIO object");
428 : }
429 :
430 : // it is supposed to be non-blocking by default, but just in case,
431 : // mark that one as such; although even that does not prevent the
432 : // BIO_read() and BIO_write() functions from returning early!
433 : //
434 : // TODO: this is wrong, this function needs to be called before
435 : // the BIO_s_connect() call! (see man page for details)
436 : //
437 0 : BIO_set_nbio(bio.get(), 0);
438 :
439 : #pragma GCC diagnostic push
440 : #pragma GCC diagnostic ignored "-Wold-style-cast"
441 0 : BIO_set_conn_hostname(bio.get(), const_cast<char *>(f_host.toUtf8().data()));
442 0 : BIO_set_conn_port(bio.get(), const_cast<char *>(std::to_string(f_port).c_str()));
443 : #pragma GCC diagnostic pop
444 :
445 : // connect to the server (open the socket)
446 0 : if(BIO_do_connect(bio.get()) <= 0)
447 : {
448 0 : ERR_print_errors_fp(stderr);
449 0 : throw exception("proxy::bio_get(): failed connecting BIO object to server");
450 : }
451 :
452 : // it worked, save the results
453 0 : f_bio = bio;
454 : }
455 0 : }
456 :
457 :
458 0 : void proxy::bio_reset()
459 : {
460 0 : f_bio.reset();
461 0 : }
462 :
463 :
464 0 : int proxy::bio_read(void * buf, size_t size)
465 : {
466 0 : if(size == 0)
467 : {
468 0 : return 0;
469 : }
470 :
471 0 : if(!f_bio)
472 : {
473 0 : bio_get();
474 : }
475 :
476 : // even though we open the BIO as blocking, somehow, we can get
477 : // the BIO_read() to return too soon... so here we loop until
478 : // all of the expected data got transferred
479 : //
480 0 : int count(0);
481 0 : int r(static_cast<int>(BIO_read(f_bio.get(), buf, size)));
482 0 : while(r >= -1)
483 : {
484 0 : if(r <= 0)
485 : {
486 0 : if(!BIO_should_retry(f_bio.get()))
487 : {
488 0 : break;
489 : }
490 : }
491 : else
492 : {
493 0 : count += r;
494 0 : size -= r;
495 0 : if(size == 0)
496 : {
497 0 : return count;
498 : }
499 0 : buf = reinterpret_cast<char *>(buf) + r;
500 : }
501 0 : r = static_cast<int>(BIO_read(f_bio.get(), buf, size));
502 : }
503 :
504 : // the BIO generated an error (TBD should we check BIO_eof() too?)
505 : // or the BIO is not implemented
506 : // XXX: do we have to set errno?
507 0 : ERR_print_errors_fp(stderr);
508 0 : return -1;
509 : }
510 :
511 :
512 0 : int proxy::bio_write(void const * buf, size_t size)
513 : {
514 0 : if(size == 0)
515 : {
516 0 : return 0;
517 : }
518 :
519 0 : if(!f_bio)
520 : {
521 0 : bio_get();
522 : }
523 :
524 : // even though we open the BIO as blocking, somehow, we can get
525 : // the BIO_read() to return too soon... so here we loop until
526 : // all of the expected data got transferred
527 : //
528 0 : int count(0);
529 0 : int r(static_cast<int>(BIO_write(f_bio.get(), buf, size)));
530 0 : while(r >= -1)
531 : {
532 0 : if(r == -1 || r == 0)
533 : {
534 0 : if(!BIO_should_retry(f_bio.get()))
535 : {
536 0 : break;
537 : }
538 : }
539 : else
540 : {
541 0 : count += r;
542 0 : size -= r;
543 0 : if(size == 0)
544 : {
545 0 : BIO_flush(f_bio.get());
546 0 : return count;
547 : }
548 0 : buf = reinterpret_cast<char const *>(buf) + r;
549 0 : r = static_cast<int>(BIO_write(f_bio.get(), buf, size));
550 : }
551 : }
552 :
553 : // the BIO generated an error (TBD should we check BIO_eof() too?)
554 : // or the BIO is not implemented
555 : //
556 : // XXX: do we have to set errno?
557 : //
558 0 : ERR_print_errors_fp(stderr);
559 :
560 : // TBD: should we return `count` if it is not zero?
561 : // then on next call it will return -1, but then the user
562 : // knows part of the message was at least buffered if not
563 : // transmitted...
564 : //
565 0 : return -1;
566 : }
567 :
568 :
569 6 : } // namespace libdbproxy
570 : // vim: ts=4 sw=4 et
|