Line data Source code
1 : /*
2 : * Text:
3 : * libsnapwebsites/src/libdbproxy/table.cpp
4 : *
5 : * Description:
6 : * Handling of cassandra tables (Column Families).
7 : *
8 : * Documentation:
9 : * See each function below.
10 : *
11 : * License:
12 : * Copyright (c) 2011-2019 Made to Order Software Corp. All Rights Reserved
13 : *
14 : * https://snapwebsites.org/
15 : * contact@m2osw.com
16 : *
17 : * Permission is hereby granted, free of charge, to any person obtaining a
18 : * copy of this software and associated documentation files (the
19 : * "Software"), to deal in the Software without restriction, including
20 : * without limitation the rights to use, copy, modify, merge, publish,
21 : * distribute, sublicense, and/or sell copies of the Software, and to
22 : * permit persons to whom the Software is furnished to do so, subject to
23 : * the following conditions:
24 : *
25 : * The above copyright notice and this permission notice shall be included
26 : * in all copies or substantial portions of the Software.
27 : *
28 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
29 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
30 : * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
31 : * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
32 : * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
33 : * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
34 : * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 : */
36 :
37 : #include "libdbproxy/libdbproxy.h"
38 : #include "libdbproxy/table.h"
39 : #include "libdbproxy/context.h"
40 :
41 : #include <casswrapper/schema.h>
42 :
43 : #include <snapwebsites/log.h>
44 :
45 : #include <iostream>
46 : #include <stdexcept>
47 : #include <sstream>
48 :
49 : #include <QtCore>
50 :
51 : #include <unistd.h>
52 :
53 :
54 : namespace libdbproxy
55 : {
56 :
57 :
58 : /** \class table
59 : * \brief Defines a table and may hold a Cassandra column family definition.
60 : *
61 : * In Cassandra, a table is called a column family. Mainly because
62 : * each row in a Cassandra table can have a different set of columns
63 : * whereas a table is usually viewed as a set of rows that all have
64 : * the exact same number of columns (but really, even in SQL the set
65 : * of columns can be viewed as varying since columns can be set to
66 : * NULL and that has [nearly] the same effect as not defining a column
67 : * in Cassandra.)
68 : *
69 : * This class defines objects that can hold all the column family
70 : * information so as to create new ones and read existing ones.
71 : *
72 : * The name of a table is very limited (i.e. letters, digits, and
73 : * underscores, and the name must start with a letter.) The maximum
74 : * length is not specified, but it is relatively short as it is
75 : * used as a filename that will hold the data of the table.
76 : *
77 : * Whenever trying to get a row, the default behavior is to create
78 : * a new row if it doesn't exist yet. If you need to know whether a
79 : * row exists, make sure you use the exists() function.
80 : *
81 : * \note
82 : * A table can be created, updated, and dropped. In all those cases, the
83 : * functions return once the Cassandra instance with which you are
84 : * connected is ready.
85 : *
86 : * \sa exists()
87 : */
88 :
89 :
90 : /** \var table::f_from_cassandra
91 : * \brief Whether the table is a memory table or a server table.
92 : *
93 : * A table read from the Cassandra server or created with the
94 : * create() function is marked as being from Cassandra.
95 : * All other tables are considered memory tables.
96 : *
97 : * A memory table can be used as a set of global variables
98 : * with a format similar to a Cassandra table.
99 : *
100 : * If you define a new table with the intend to call the
101 : * create() function, avoid saving data in the new table
102 : * as it won't make it to the database. (This may change
103 : * in the future though.)
104 : */
105 :
106 : /** \var table::f_schema
107 : * \brief The table private data: CfDef
108 : *
109 : * A table is always part of a specific context. You can only create a
110 : * new table using a function from your context objects.
111 : *
112 : * This is a bare pointer since you cannot delete the context and hope
113 : * the table remains (i.e. when the context goes, the table goes!)
114 : */
115 :
116 : /** \var table::f_context
117 : * \brief The context that created this table.
118 : *
119 : * A table is always part of a specific context. You can only create a
120 : * new table using a function from your context objects.
121 : *
122 : * This is a bare pointer since you cannot delete the context and hope
123 : * the table remains (i.e. when the context goes, the table goes!)
124 : * However, you may keep a shared pointer to a table after the table
125 : * was deleted. In that case, the f_context pointer is set to NULL
126 : * and calling functions on that table may result in an exception
127 : * being raised.
128 : */
129 :
130 : /** \var table::f_rows
131 : * \brief Set of rows.
132 : *
133 : * The system caches rows in this map. The map index is the row key. You can
134 : * clear the table using the clearCache() function.
135 : */
136 :
137 : /** \brief Initialize a table object.
138 : *
139 : * This function initializes a table object.
140 : *
141 : * All the parameters are set to the defaults as defined in the Cassandra
142 : * definition of the CfDef message. You can use the different functions to
143 : * change the default values.
144 : *
145 : * Note that the context and table name cannot be changed later. These
146 : * are fixed values that follow the Cassandra behavior.
147 : *
148 : * A table name must be composed of letters (A-Za-z), digits (0-9) and
149 : * underscores (_). It must start with a letter. The corresponding lexical
150 : * expression is: /^[A-Za-z][A-Za-z0-9_]*$\/
151 : *
152 : * The length of a table name is also restricted, however it is not specified
153 : * by Cassandra. I suggest you never use a name of more than 200 letters.
154 : * The length is apparently limited by the number of characters that can be
155 : * used to name a file on the file system you are using.
156 : *
157 : * \param[in] context The context where this table definition is created.
158 : * \param[in] table_name The name of the table definition being created.
159 : */
160 0 : table::table(context::pointer_t context, const QString& table_name)
161 0 : : f_context(context)
162 : {
163 : // cache the name because we need it for each other we send
164 : //
165 0 : f_context_name = context->contextName();
166 :
167 : // verify the name here (faster than waiting for the server and good documentation)
168 : //
169 : // Note: we support uppercase names, however, this is only because
170 : // there is still one system table that uses such... uppercase
171 : // requires us to use double quotes around names each time we
172 : // access a table so it is some extra overhead.
173 : //
174 0 : bool has_quotes(false);
175 0 : bool has_uppercase(false);
176 0 : bool quotes_are_valid(false);
177 0 : int const max(table_name.length());
178 0 : for(int idx(0); idx < max; ++idx)
179 : {
180 0 : ushort c(table_name.at(idx).unicode());
181 0 : switch(c)
182 : {
183 0 : case '"':
184 0 : if(idx == 0)
185 : {
186 0 : has_quotes = true;
187 : }
188 0 : else if(idx == max - 1)
189 : {
190 0 : if(!has_quotes)
191 : {
192 0 : throw exception(QString("'%1' is not a valid table name (it cannot end with a double quote (\") if it does not start with a double quote.)")
193 0 : .arg(table_name).toUtf8().data());
194 : }
195 0 : quotes_are_valid = true;
196 : }
197 : else
198 : {
199 0 : throw exception(QString("'%1' is not a valid table name (a table name can be surrounded by double quotes, but it cannot itself include a double quote.)")
200 0 : .arg(table_name).toUtf8().data());
201 : }
202 0 : break;
203 :
204 0 : case '0':
205 : case '1':
206 : case '2':
207 : case '3':
208 : case '4':
209 : case '5':
210 : case '6':
211 : case '7':
212 : case '8':
213 : case '9':
214 : case '_':
215 0 : if(idx == 0
216 0 : || (idx == 1 && has_quotes))
217 : {
218 0 : throw exception(QString("'%1' is not a valid table name (a table name cannot start with a digit or an underscore (_), even when quoted.)")
219 0 : .arg(table_name).toUtf8().data());
220 : }
221 0 : break;
222 :
223 0 : default:
224 : // lowercase are always fine
225 0 : if(c >= 'a' && c <= 'z')
226 : {
227 0 : break;
228 : }
229 0 : if(c >= 'A' && c <= 'Z')
230 : {
231 0 : has_uppercase = true;
232 0 : break;
233 : }
234 : // the regex shown in the error message is simplified, the double
235 : // quotes must not appear at all or be defined at the start AND
236 : // the end
237 : //
238 0 : throw exception(QString("'%1' is an invalid table name (does not match \"?[a-zA-Z][a-zA-Z0-9_]*\"?)").arg(table_name).toUtf8().data());
239 :
240 : }
241 : }
242 0 : if(has_quotes && !quotes_are_valid)
243 : {
244 0 : throw exception(QString("'%1' is not a valid table name (it cannot start with a double quote (\") if it does not end with a double quote.)")
245 0 : .arg(table_name).toUtf8().data());
246 : }
247 :
248 0 : if(has_uppercase && !has_quotes)
249 : {
250 : // surround the name with double quotes...
251 0 : f_table_name = QString("\"%1\"").arg(table_name);
252 : }
253 : else
254 : {
255 0 : f_table_name = table_name;
256 : }
257 0 : f_proxy = context->parentCassandra()->getProxy();
258 0 : }
259 :
260 :
261 : /** \brief Clean up the table object.
262 : *
263 : * This function ensures that all resources allocated by the
264 : * table are released.
265 : *
266 : * Note that does not in any way destroy the table in the
267 : * Cassandra cluster.
268 : */
269 0 : table::~table()
270 : {
271 : try
272 : {
273 : // do an explicit clearCache() so we can try/catch otherwise we
274 : // could get a throw in the destructor
275 : //
276 0 : clearCache();
277 : }
278 0 : catch(const exception&)
279 : {
280 : // ignore, not much else we can do in a destructor
281 : }
282 0 : }
283 :
284 :
285 : /** \brief Return the name of the context attached to this table definition.
286 : *
287 : * This function returns the name of the context attached to this table
288 : * definition.
289 : *
290 : * Note that it is not possible to rename a context and therefore this
291 : * name will never change.
292 : *
293 : * To get a pointer to the context, use the cluster function context()
294 : * with this name. Since each context is unique, it will always return
295 : * the correct pointer.
296 : *
297 : * \return The name of the context this table definition is defined in.
298 : */
299 0 : const QString& table::contextName() const
300 : {
301 0 : return f_context_name;
302 : }
303 :
304 :
305 : /** \brief Retrieve the name of this table.
306 : *
307 : * This function returns the name of this table. Note that the
308 : * name cannot be changed.
309 : *
310 : * \return The table name.
311 : */
312 0 : QString table::tableName() const
313 : {
314 0 : if(f_table_name[0] == '"')
315 : {
316 : // remove the quotes if present
317 0 : return f_table_name.mid(1, f_table_name.length() - 2);
318 : }
319 0 : return f_table_name;
320 : }
321 :
322 :
323 0 : const casswrapper::schema::Value::map_t& table::fields() const
324 : {
325 0 : return f_schema->getFields();
326 : }
327 :
328 :
329 0 : casswrapper::schema::Value::map_t& table::fields()
330 : {
331 0 : return f_schema->getFields();
332 : }
333 :
334 :
335 : /** \brief Mark this table as from Cassandra.
336 : *
337 : * This very case happens when the user creates a new context that,
338 : * at the time of calling context::create(), includes
339 : * a list of table definitions.
340 : *
341 : * In that case we know that the context is being created, but not
342 : * the tables because the server does it transparently in one go.
343 : */
344 0 : void table::setFromCassandra()
345 : {
346 0 : f_from_cassandra = true;
347 0 : }
348 :
349 : /** \brief This is an internal function used to parse a CfDef structure.
350 : *
351 : * This function is called internally to parse a CfDef object.
352 : * The data is saved in this table.
353 : *
354 : * \param[in] data The pointer to the CfDef object.
355 : */
356 0 : void table::parseTableDefinition( casswrapper::schema::TableMeta::pointer_t table_meta )
357 : {
358 0 : f_schema = table_meta;
359 0 : f_from_cassandra = true;
360 0 : }
361 :
362 : #if 0
363 : /** \brief Prepare a table definition.
364 : *
365 : * This function transforms a libdbproxy table definition into
366 : * a Cassandra CfDef structure.
367 : *
368 : * The parameter is passed as a void * because we do not want to define
369 : * the thrift types in our public headers.
370 : *
371 : * \param[in] data The CfDef were the table is to be saved.
372 : */
373 : void table::prepareTableDefinition( CfDef* cf ) const
374 : {
375 : *cf = *f_private;
376 :
377 : // copy the columns
378 : cf->column_metadata.clear();
379 : for( auto c : f_column_definitions )
380 : {
381 : ColumnDef col;
382 : c->prepareColumnDefinition( &col );
383 : cf->column_metadata.push_back(col);
384 : }
385 : cf->__isset.column_metadata = !cf->column_metadata.empty();
386 : cf->__isset.compaction_strategy_options = !cf->compaction_strategy_options.empty();
387 : cf->__isset.compression_options = !cf->compression_options.empty();
388 : }
389 : #endif
390 :
391 :
392 : namespace
393 : {
394 0 : QString map_to_json( const std::map<std::string,std::string>& map )
395 : {
396 0 : QString ret;
397 0 : for( const auto& pair : map )
398 : {
399 0 : if( !ret.isEmpty() )
400 : {
401 0 : ret += ",";
402 : }
403 0 : ret += QString("'%1':'%2'").arg(pair.first.c_str()).arg(pair.second.c_str());
404 : }
405 0 : return ret;
406 : }
407 : }
408 :
409 :
410 0 : QString table::getTableOptions() const
411 : {
412 0 : QString query_string;
413 0 : for( const auto& pair : f_schema->getFields() )
414 : {
415 0 : query_string += QString("AND %1=%2\n")
416 0 : .arg(pair.first)
417 0 : .arg(pair.second.output())
418 : ;
419 : }
420 :
421 0 : return query_string;
422 : }
423 :
424 :
425 : /** \brief Create a Cassandra table.
426 : *
427 : * This function creates a Cassandra table in the context as specified
428 : * when you created the table object.
429 : *
430 : * If you want to declare a set of columns, this is a good time to do
431 : * it too (there is not QColumnDefinition::create() function!) By
432 : * default, columns use the default validation type as defined using
433 : * the setComparatorType() for their name and the
434 : * setDefaultValidationClass() for their data. It is not required to
435 : * define any column. In that case they all make use of the exact
436 : * same data.
437 : *
438 : * The table cannot already exist or an error will be thrown by the
439 : * Cassandra server. If the table is being updated, use the update()
440 : * function instead.
441 : *
442 : * Note that when you create a new context, you can create its tables
443 : * at once by defining tables before calling the context::create()
444 : * function.
445 : *
446 : * Creating a new table:
447 : *
448 : * \code
449 : * libdbproxy::table::pointer_t table(context->table("qt_cassandra_test_table"));
450 : * table->setComment("Our test table.");
451 : * table->setColumnType("Standard"); // Standard or Super
452 : * table->setKeyValidationClass("BytesType");
453 : * table->setDefaultValidationClass("BytesType");
454 : * table->setComparatorType("BytesType");
455 : * table->setKeyCacheSavePeriodInSeconds(14400); // unused in 1.1+
456 : * table->setMemtableFlushAfterMins(60); // unused in 1.1+
457 : * // Memtable defaults are dynamic and usually a better bet
458 : * //table->setMemtableThroughputInMb(247); // unused in 1.1+
459 : * //table->setMemtableOperationsInMillions(1.1578125); // unused in 1.1+
460 : * table->setGcGraceSeconds(864000);
461 : * table->setMinCompactionThreshold(4);
462 : * table->setMaxCompactionThreshold(22);
463 : * table->setReplicateOnWrite(1);
464 : * table->create();
465 : * \endcode
466 : *
467 : * \note
468 : * Once the table->create(); function returns, the table was created in the
469 : * Cassandra node you are connect with, but it was not yet replicated. In
470 : * order to use the table, the replication automatically happens behind the scenes.
471 : * In previous version of Cassandra, it was necessary to wait for this replication
472 : * to finish, but now with modern versions, that is no longer and issue.
473 : *
474 : * \sa update()
475 : * \sa context::create()
476 : */
477 0 : void table::create()
478 : {
479 : // TODO: this is actually wrong because it only creates the table
480 : // it should be capable of either creating the table or altering
481 : // it because the libQtCassandra user may have changed some
482 : // parameters
483 : //
484 : // so if the table exists, we should switch to ALTER TABLE ...
485 : // command instead (for Snap! we do not ever tweak table
486 : // parameters dynamically, so we are good for now.)
487 : //
488 0 : QString query_string( QString( "CREATE TABLE IF NOT EXISTS %1.%2"
489 : "(key BLOB,column1 BLOB,value BLOB,PRIMARY KEY(key, column1))"
490 : "WITH COMPACT STORAGE"
491 : " AND CLUSTERING ORDER BY(column1 ASC)" )
492 0 : .arg(f_context_name)
493 0 : .arg(f_table_name)
494 0 : );
495 0 : query_string += getTableOptions();
496 :
497 : // 1) Load existing tables from the database,
498 : // 2) Create the table using the query string,
499 : // 3) Add this object into the list.
500 : //
501 0 : order create_table;
502 0 : create_table.setCql(query_string, order::type_of_result_t::TYPE_OF_RESULT_SUCCESS);
503 0 : create_table.setTimeout(5 * 60 * 1000);
504 0 : create_table.setClearClusterDescription(true);
505 0 : order_result const create_table_result(f_proxy->sendOrder(create_table));
506 0 : if(!create_table_result.succeeded())
507 : {
508 0 : throw exception("table creation failed");
509 : }
510 :
511 0 : f_from_cassandra = true;
512 0 : }
513 :
514 :
515 : /** \brief Truncate a Cassandra table.
516 : *
517 : * The truncate() function removes all the rows from a Cassandra table
518 : * and clear out the cached data (rows and cells.)
519 : *
520 : * If the table is not connected to Cassandra, then nothing happens with
521 : * the Cassandra server.
522 : *
523 : * If you want to keep a copy of the cache, you will have to retrieve a
524 : * copy of the rows map using the getRows() function.
525 : *
526 : * \sa getRows()
527 : * \sa clearCache()
528 : */
529 0 : void table::truncate()
530 : {
531 0 : if( !f_from_cassandra )
532 : {
533 0 : return;
534 : }
535 :
536 : const QString query_string(
537 0 : QString("TRUNCATE %1.%2")
538 0 : .arg(f_context_name)
539 0 : .arg(f_table_name)
540 0 : );
541 :
542 0 : order truncate_table;
543 0 : truncate_table.setCql(query_string, order::type_of_result_t::TYPE_OF_RESULT_SUCCESS);
544 0 : truncate_table.setClearClusterDescription(true);
545 0 : order_result const truncate_table_result(f_proxy->sendOrder(truncate_table));
546 0 : if(!truncate_table_result.succeeded())
547 : {
548 0 : throw exception("table truncation failed");
549 : }
550 :
551 0 : clearCache();
552 : }
553 :
554 :
555 : /** \brief Clear the memory cache.
556 : *
557 : * This function clears the memory cache. This means all the rows and
558 : * their cells will be deleted from this table. The memory cache doesn't
559 : * affect the Cassandra database.
560 : *
561 : * After a clear, you can retrieve fresh data (i.e. by directly loading the
562 : * data from the Cassandra database.)
563 : *
564 : * Note that if you kept shared pointers to rows and cells defined in
565 : * this table, accessing those is likely going to generate an exception.
566 : */
567 0 : void table::clearCache()
568 : {
569 0 : closeCursor();
570 :
571 0 : f_rows.clear();
572 0 : }
573 :
574 :
575 : /** \brief Close the current cursor.
576 : *
577 : * This function closes the current cursor (i.e. the cursor used
578 : * to gather a set of rows and their data from a table.)
579 : */
580 0 : void table::closeCursor()
581 : {
582 0 : if(f_cursor_index >= 0)
583 : {
584 : // Note: the "CLOSE" CQL string is ignored
585 : //
586 0 : order close_cursor;
587 0 : close_cursor.setCql("CLOSE", order::type_of_result_t::TYPE_OF_RESULT_CLOSE);
588 0 : close_cursor.setCursorIndex(f_cursor_index);
589 0 : f_cursor_index = -1;
590 0 : order_result close_cursor_result(f_proxy->sendOrder(close_cursor));
591 0 : if(!close_cursor_result.succeeded())
592 : {
593 0 : throw exception("table::closeCursor(): closing cursor failed.");
594 : }
595 : }
596 0 : }
597 :
598 :
599 0 : void table::addRow( const QByteArray& row_key, const QByteArray& column_key, const QByteArray& data )
600 : {
601 0 : row::pointer_t new_row ( new row( shared_from_this(), row_key ) );
602 0 : cell::pointer_t new_cell ( new_row->getCell( column_key ) );
603 0 : new_cell->assignValue( value(data) );
604 :
605 : // Now add to the map.
606 : //
607 0 : f_rows[row_key] = new_row;
608 0 : }
609 :
610 :
611 0 : void table::startBatch()
612 : {
613 0 : order start_batch;
614 0 : start_batch.setCql("START_BATCH", order::type_of_result_t::TYPE_OF_RESULT_BATCH_DECLARE);
615 :
616 0 : order_result start_batch_result(f_proxy->sendOrder(start_batch));
617 0 : start_batch_result.swap(start_batch_result);
618 0 : if(!start_batch_result.succeeded())
619 : {
620 0 : throw exception("start batch failed");
621 : }
622 :
623 0 : f_batch_index = int32Value(start_batch_result.result(0));
624 0 : if(f_batch_index < 0)
625 : {
626 0 : throw logic_exception("received a negative number as batch index!");
627 : }
628 0 : }
629 :
630 :
631 0 : void table::commitBatch()
632 : {
633 0 : if(f_batch_index >= 0)
634 : {
635 : // Note: the "CLOSE" CQL string is ignored
636 : //
637 0 : order commit_batch;
638 0 : commit_batch.setCql("COMMIT_BATCH", order::type_of_result_t::TYPE_OF_RESULT_BATCH_COMMIT);
639 0 : commit_batch.setBatchIndex(f_batch_index);
640 0 : f_batch_index = -1;
641 0 : order_result commit_batch_result(f_proxy->sendOrder(commit_batch));
642 0 : if(!commit_batch_result.succeeded())
643 : {
644 0 : throw exception("table::commitBatch(): batch submission failed.");
645 : }
646 : }
647 0 : }
648 :
649 :
650 0 : void table::rollbackBatch()
651 : {
652 0 : if(f_batch_index >= 0)
653 : {
654 0 : order batch;
655 0 : batch.setCql("ROLLBACK_BATCH", order::type_of_result_t::TYPE_OF_RESULT_BATCH_ROLLBACK);
656 0 : batch.setBatchIndex(f_batch_index);
657 0 : f_batch_index = -1;
658 0 : order_result batch_result(f_proxy->sendOrder(batch));
659 0 : if(!batch_result.succeeded())
660 : {
661 0 : throw exception("table::commitBatch(): batch submission failed.");
662 : }
663 : }
664 0 : }
665 :
666 :
667 : /** \brief Read a set of rows as defined by the row predicate.
668 : *
669 : * This function reads a set of rows as defined by the row predicate.
670 : *
671 : * To change the consistency for this read, check out the
672 : * cell_predicate::setConsistencyLevel() function.
673 : *
674 : * If the table is not connected to Cassandra (i.e. the table is
675 : * a memory table) then nothing happens.
676 : *
677 : * Remember that if you are querying without checking for any column
678 : * you will get "empty" rows in your results (see dropRow() function
679 : * for more information and search for TombStones in Cassandra.)
680 : * This was true in version 0.8.0 to 1.1.5. It may get fixed at some
681 : * point.
682 : *
683 : * Note that the function updates the predicate so the next call
684 : * returns the following rows as expected.
685 : *
686 : * \warning
687 : * This function MAY NOT "WORK RIGHT" if your cluster was defined using
688 : * the RandomPartitioner. Rows are not sorted by key when the
689 : * RandomPartitioner is used. Instead, the rows are sorted by their
690 : * MD5 sum. Also the system may add additional data before or
691 : * after that MD5 and the slice range cannot anyway provide that
692 : * MD5 to the system. If you want to query sorted slices of your
693 : * rows, you must create your cluster with another partitioner.
694 : * Search for partitioner in conf/cassandra.yaml in the
695 : * Cassandra tarball.
696 : * See also: http://ria101.wordpress.com/2010/02/22/cassandra-randompartitioner-vs-orderpreservingpartitioner/
697 : *
698 : * \param[in,out] row_predicate The row predicate, which has to be allocated on the heap using shared_ptr.
699 : *
700 : * \return The number of rows read.
701 : *
702 : * \sa row_predicate (see detailed description of row predicate for an example)
703 : * \sa cell_predicate::setConsistencyLevel()
704 : * \sa dropRow()
705 : */
706 0 : uint32_t table::readRows( row_predicate::pointer_t row_predicate )
707 : {
708 0 : if( !row_predicate )
709 : {
710 0 : throw exception("row_predicate is nullptr!");
711 : }
712 :
713 0 : size_t idx(0);
714 0 : order_result selected_rows_result;
715 :
716 0 : f_rows.clear();
717 :
718 0 : if( f_cursor_index != -1 )
719 : {
720 : // Note: the "FETCH" is ignored, only the type is used in this case
721 : //
722 0 : order select_more_rows;
723 0 : select_more_rows.setCql("FETCH", order::type_of_result_t::TYPE_OF_RESULT_FETCH);
724 0 : select_more_rows.setCursorIndex(f_cursor_index);
725 0 : order_result select_more_rows_result(f_proxy->sendOrder(select_more_rows));
726 0 : selected_rows_result.swap(select_more_rows_result);
727 0 : if(!selected_rows_result.succeeded())
728 : {
729 0 : throw exception("select rows failed");
730 : }
731 :
732 0 : if(selected_rows_result.resultCount() == 0)
733 : {
734 0 : closeCursor();
735 0 : return 0;
736 : }
737 : }
738 : else
739 : {
740 0 : QString query_string( QString("SELECT key,column1,value FROM %1.%2")
741 0 : .arg(f_context_name)
742 0 : .arg(f_table_name)
743 0 : );
744 : // Note: with the proxy we do not care about the bind_count
745 : // but the appendQuery() function does the same thing
746 : //
747 0 : int bind_count = 0;
748 0 : row_predicate->appendQuery( query_string, bind_count );
749 0 : if(row_predicate->allowFiltering())
750 : {
751 0 : query_string += " ALLOW FILTERING";
752 : }
753 : //
754 : //std::cerr << "query=[" << query_string.toUtf8().data() << "]" << std::endl;
755 :
756 : // setup the consistency level
757 0 : consistency_level_t consistency_level( parentContext()->parentCassandra()->defaultConsistencyLevel() );
758 0 : consistency_level_t const default_consistency_level(consistency_level);
759 0 : consistency_level = row_predicate->consistencyLevel();
760 0 : if( consistency_level == CONSISTENCY_LEVEL_DEFAULT )
761 : {
762 0 : consistency_level = row_predicate->cellPredicate()->consistencyLevel();
763 0 : if( consistency_level == CONSISTENCY_LEVEL_DEFAULT )
764 : {
765 0 : consistency_level = default_consistency_level;
766 : }
767 : }
768 :
769 : // create a CURSOR
770 0 : order select_rows;
771 0 : select_rows.setCql(query_string, order::type_of_result_t::TYPE_OF_RESULT_DECLARE);
772 0 : select_rows.setColumnCount(3);
773 0 : select_rows.setConsistencyLevel(consistency_level);
774 :
775 : //
776 0 : row_predicate->bindOrder( select_rows );
777 0 : select_rows.setPagingSize( row_predicate->count() );
778 :
779 0 : order_result select_rows_result(f_proxy->sendOrder(select_rows));
780 0 : selected_rows_result.swap(select_rows_result);
781 0 : if(!selected_rows_result.succeeded())
782 : {
783 0 : throw exception("select rows failed");
784 : }
785 :
786 0 : if(selected_rows_result.resultCount() < 1)
787 : {
788 0 : throw exception("select rows did not return a cursor index");
789 : }
790 0 : f_cursor_index = int32Value(selected_rows_result.result(0));
791 0 : if(f_cursor_index < 0)
792 : {
793 0 : throw logic_exception("received a negative number as cursor index");
794 : }
795 : //std::cerr << "got a cursor = " << f_cursor_index << "\n";
796 :
797 : // ignore parameter one, it is not a row of data
798 0 : idx = 1;
799 : }
800 :
801 0 : auto re(row_predicate->rowNameMatch());
802 :
803 0 : size_t const max_results(selected_rows_result.resultCount());
804 : #ifdef _DEBUG
805 0 : if((max_results - idx) % 3 != 0)
806 : {
807 : // the number of results must be a multiple of 3, although on
808 : // the SELECT (first time in) we expect one additional result
809 : // which represents the cursor index
810 0 : throw logic_exception("the number of results must be an exact multipled of 3!");
811 : }
812 : #endif
813 0 : size_t result_size = 0;
814 0 : for(; idx < max_results; idx += 3, ++result_size )
815 : {
816 0 : const QByteArray row_key( selected_rows_result.result( idx + 0 ) );
817 :
818 0 : if( !re.isEmpty() )
819 : {
820 0 : const QString row_name( QString::fromUtf8(row_key.data()) );
821 0 : if( re.indexIn(row_name) == -1 )
822 : {
823 0 : continue;
824 : }
825 : }
826 :
827 0 : const QByteArray column_key( selected_rows_result.result( idx + 1 ) );
828 0 : const QByteArray data ( selected_rows_result.result( idx + 2 ) );
829 :
830 0 : addRow( row_key, column_key, data );
831 : }
832 :
833 0 : return result_size;
834 : }
835 :
836 :
837 0 : row::pointer_t table::getRow(const char* row_name)
838 : {
839 0 : return getRow( QByteArray(row_name,qstrlen(row_name)) );
840 : }
841 :
842 :
843 : /** \brief Search for a row or create a new one.
844 : *
845 : * This function searches for a row or, if it doesn't exist, create
846 : * a new row.
847 : *
848 : * Note that unless you set the value of a column in this row, the
849 : * row will never appear in the Cassandra cluster.
850 : *
851 : * This function accepts a name for the row. The name is a UTF-8 string.
852 : *
853 : * \param[in] row_name The name of the row to search or create.
854 : *
855 : * \return A shared pointer to the matching row or a null pointer.
856 : */
857 0 : row::pointer_t table::getRow(const QString& row_name)
858 : {
859 0 : return getRow(row_name.toUtf8());
860 : }
861 :
862 : /** \brief Search for a row or create a new one.
863 : *
864 : * This function searches for a row or, if it doesn't exist, create
865 : * a new row.
866 : *
867 : * Note that unless you set the value of a column in this row, the
868 : * row will never appear in the Cassandra cluster.
869 : *
870 : * This function assigns the row a binary key.
871 : *
872 : * \param[in] row_key The name of the row to search or create.
873 : *
874 : * \return A shared pointer to the matching row. Throws if not found in the map.
875 : *
876 : * \sa readRows()
877 : */
878 0 : row::pointer_t table::getRow(const QByteArray& row_key)
879 : {
880 : // row already exists?
881 0 : rows::iterator ri(f_rows.find(row_key));
882 0 : if(ri != f_rows.end())
883 : {
884 0 : return ri.value();
885 : }
886 :
887 : // this is a new row, allocate it
888 0 : row::pointer_t c(new row(shared_from_this(), row_key));
889 0 : f_rows.insert(row_key, c);
890 0 : return c;
891 : }
892 :
893 :
894 : /** \brief Retrieve the entire set of rows defined in this table.
895 : *
896 : * This function returns a constant reference to the map listing all
897 : * the rows currently defined in memory for this table.
898 : *
899 : * This can be used to determine how many rows are defined in memory
900 : * and to scan all the data.
901 : *
902 : * \return A constant reference to a map of rows. Throws if readRows() has not first been called.
903 : */
904 0 : const rows& table::getRows()
905 : {
906 0 : return f_rows;
907 : }
908 :
909 :
910 : /** \brief Search for a row.
911 : *
912 : * This function searches for a row. If it doesn't exist, then a NULL
913 : * pointer is returned (use the operator bool() function on the shared pointer.)
914 : *
915 : * The function can be used to check whether a given row was already created
916 : * in memory without actually creating it.
917 : *
918 : * This function accepts a row name viewed as a UTF-8 string.
919 : *
920 : * \warning
921 : * This function does NOT attempt to read the row from the Cassandra database
922 : * system. It only checks whether the row already exists in memory. To check
923 : * whether the row exists in the database, use the exists() function instead.
924 : *
925 : * \param[in] row_name The name of the row to check for.
926 : *
927 : * \return A shared pointer to the row, may be NULL (operator bool() returning true)
928 : *
929 : * \sa exists()
930 : * \sa getRow()
931 : */
932 0 : row::pointer_t table::findRow(const char* row_name) const
933 : {
934 0 : return findRow( QByteArray(row_name, qstrlen(row_name)) );
935 : }
936 :
937 :
938 : /** \brief Search for a row.
939 : *
940 : * This function searches for a row. If it doesn't exist, then a NULL
941 : * pointer is returned (use the operator bool() function on the shared pointer.)
942 : *
943 : * The function can be used to check whether a given row was already created
944 : * in memory without actually creating it.
945 : *
946 : * This function accepts a row name viewed as a UTF-8 string.
947 : *
948 : * \warning
949 : * This function does NOT attempt to read the row from the Cassandra database
950 : * system. It only checks whether the row already exists in memory. To check
951 : * whether the row exists in the database, use the exists() function instead.
952 : *
953 : * \param[in] row_name The name of the row to check for.
954 : *
955 : * \return A shared pointer to the row, may be NULL (operator bool() returning true)
956 : *
957 : * \sa exists()
958 : * \sa getRow()
959 : */
960 0 : row::pointer_t table::findRow(const QString& row_name) const
961 : {
962 0 : return findRow( row_name.toUtf8() );
963 : }
964 :
965 :
966 : /** \brief Search for a row.
967 : *
968 : * This function searches for a row. If it doesn't exist, then a NULL
969 : * pointer is returned (use the operator bool() function on the shared pointer.)
970 : *
971 : * The function can be used to check whether a given row was already created
972 : * in memory without actually creating it.
973 : *
974 : * This function accepts a row key which is a binary buffer.
975 : *
976 : * \warning
977 : * This function does NOT attempt to read the row from the Cassandra database
978 : * system. It only checks whether the row already exists in memory. To check
979 : * whether the row exists in the database, use the exists() function instead.
980 : *
981 : * \param[in] row_key The binary key of the row to search for.
982 : *
983 : * \return A shared pointer to the row, may be NULL (operator bool() returning true)
984 : *
985 : * \sa exists()
986 : * \sa getRow()
987 : */
988 0 : row::pointer_t table::findRow(const QByteArray& row_key) const
989 : {
990 0 : auto ri(f_rows.find(row_key));
991 0 : if(ri == f_rows.end())
992 : {
993 0 : row::pointer_t null;
994 0 : return null;
995 : }
996 0 : return *ri;
997 : }
998 :
999 :
1000 : /** \brief Check whether a row exists.
1001 : *
1002 : * This function checks whether the named row exists.
1003 : *
1004 : * \param[in] row_name The row name to transform to UTF-8.
1005 : *
1006 : * \return true if the row exists in memory or the Cassandra database.
1007 : */
1008 0 : bool table::exists(const char *row_name) const
1009 : {
1010 0 : return exists(QByteArray(row_name, qstrlen(row_name)));
1011 : }
1012 :
1013 :
1014 : /** \brief Check whether a row exists.
1015 : *
1016 : * This function checks whether the named row exists.
1017 : *
1018 : * \param[in] row_name The row name to transform to UTF-8.
1019 : *
1020 : * \return true if the row exists in memory or the Cassandra database.
1021 : */
1022 0 : bool table::exists(const QString& row_name) const
1023 : {
1024 0 : return exists(row_name.toUtf8());
1025 : }
1026 :
1027 :
1028 : /** \brief Check whether a row exists.
1029 : *
1030 : * This function checks whether a row exists. First it checks whether
1031 : * it exists in memory. If not, then it checks in the Cassandra database.
1032 : *
1033 : * Empty keys are always viewed as non-existant and this function
1034 : * returns false in that case.
1035 : *
1036 : * \warning
1037 : * If you can avoid calling this function, all the better. It may be
1038 : * very slow against a table that has many tombstones since it will
1039 : * have to go through those to know whether there is at least one valid
1040 : * row.
1041 : *
1042 : * \warning
1043 : * If you dropped the row recently, IT STILL EXISTS. This is a "bug" in
1044 : * Cassandra and there isn't really a way around it except by testing
1045 : * whether a specific cell exists in the row. We cannot really test for
1046 : * a cell here since we cannot know what cell exists here. So this test
1047 : * really only tells you that (1) the row was never created; or (2) the
1048 : * row was drop "a while back" (the amount of time it takes for a row
1049 : * to completely disappear is not specified and it looks like it can take
1050 : * days.) [This warning may not apply since we now use CQL]
1051 : *
1052 : * \todo
1053 : * At this time there isn't a way to specify the consistency level of the
1054 : * calls used by this function. The libdbproxy default is used.
1055 : *
1056 : * \param[in] row_key The binary key of the row to check for.
1057 : *
1058 : * \return true if the row exists in memory or in Cassandra.
1059 : */
1060 0 : bool table::exists(const QByteArray& row_key) const
1061 : {
1062 : // an empty key cannot represent a valid row
1063 0 : if(row_key.size() == 0)
1064 : {
1065 0 : return false;
1066 : }
1067 :
1068 0 : auto ri(f_rows.find(row_key));
1069 0 : if(ri != f_rows.end())
1070 : {
1071 : // row exists in memory
1072 0 : return true;
1073 : }
1074 :
1075 0 : auto pred( std::make_shared<row_key_predicate>() );
1076 0 : pred->setRowKey(row_key);
1077 0 : pred->setCount(1); // read as little as possible (TBD verify that works even with many tombstones)
1078 :
1079 : class save_current_cursor_index_t
1080 : {
1081 : public:
1082 0 : save_current_cursor_index_t(table *table, int32_t& cursor_index)
1083 0 : : f_table(table)
1084 : , f_cursor_index_ref(cursor_index)
1085 0 : , f_saved_cursor_index(cursor_index)
1086 : {
1087 : // simulate the closure of the current cursor index if open
1088 : //
1089 0 : f_cursor_index_ref = -1;
1090 0 : }
1091 :
1092 : save_current_cursor_index_t(save_current_cursor_index_t const & rhs) = delete;
1093 :
1094 0 : ~save_current_cursor_index_t()
1095 0 : {
1096 : try
1097 : {
1098 0 : f_table->closeCursor();
1099 : }
1100 0 : catch(const exception&)
1101 : {
1102 : // not muc we can do here (destructor and exceptions do not
1103 : // work together)
1104 : //
1105 : // exception can happen if we lose the network connection
1106 : // and try to close the cursor
1107 : }
1108 0 : f_cursor_index_ref = f_saved_cursor_index;
1109 0 : }
1110 :
1111 : save_current_cursor_index_t & operator = (save_current_cursor_index_t const & rhs) = delete;
1112 :
1113 : private:
1114 : table * f_table = nullptr;
1115 : int32_t & f_cursor_index_ref;
1116 : int32_t f_saved_cursor_index = 0;
1117 : };
1118 0 : save_current_cursor_index_t save_cursor_index(const_cast<table *>(this), const_cast<table *>(this)->f_cursor_index);
1119 :
1120 : // TODO: we should be able to do that without using the full fledge
1121 : // readRows() with a cursor + fetch etc. since we just want to
1122 : // know whether at least one entry exists we could just do one
1123 : // SELECT and save its result; then we would avoid the
1124 : // "save_current_cursor_index_t" problem
1125 : //
1126 : return const_cast<table *>(this)
1127 0 : ->readRows( std::static_pointer_cast<row_predicate>(pred) ) != 0;
1128 : }
1129 :
1130 : /** \brief Retrieve a table row.
1131 : *
1132 : * This function retrieves a table row. If the named row doesn't exist yet,
1133 : * then it is created first.
1134 : *
1135 : * The reference is writable so you make write to a cell in this row.
1136 : *
1137 : * This function accepts a UTF-8 name for this row reference.
1138 : *
1139 : * \param[in] row_name The name of the row to retrieve.
1140 : *
1141 : * \return A reference to a row.
1142 : */
1143 0 : row& table::operator [] (const char *row_name)
1144 : {
1145 : // in this case we may create the row and that's fine!
1146 0 : return *getRow(row_name);
1147 : }
1148 :
1149 : /** \brief Retrieve a table row.
1150 : *
1151 : * This function retrieves a table row. If the named row doesn't exist yet,
1152 : * then it is created first.
1153 : *
1154 : * The reference is writable so you make write to a cell in this row.
1155 : *
1156 : * This function accepts a UTF-8 name for this row reference.
1157 : *
1158 : * \param[in] row_name The name of the row to retrieve.
1159 : *
1160 : * \return A reference to a row.
1161 : */
1162 0 : row& table::operator [] (const QString& row_name)
1163 : {
1164 : // in this case we may create the row and that's fine!
1165 0 : return *getRow(row_name);
1166 : }
1167 :
1168 : /** \brief Retrieve a table row.
1169 : *
1170 : * This function retrieves a table row. If the keyed row doesn't exist yet,
1171 : * then it is created first.
1172 : *
1173 : * The reference is writable so you make write to a cell in this row.
1174 : *
1175 : * This function accepts a binary key for this row reference.
1176 : *
1177 : * \param[in] row_key The binary key of the row to retrieve.
1178 : *
1179 : * \return A reference to a row.
1180 : */
1181 0 : row& table::operator[] (const QByteArray& row_key)
1182 : {
1183 : // in this case we may create the row and that's fine!
1184 0 : return *getRow(row_key);
1185 : }
1186 :
1187 : /** \brief Retrieve a table row.
1188 : *
1189 : * This function retrieves a table row. If the named row doesn't exist yet,
1190 : * then the function raises an error.
1191 : *
1192 : * The reference is read-only (constant) so you may retrieve a cell value
1193 : * from it, but not modify the cell.
1194 : *
1195 : * This function accepts a name as the row reference. The name is viewed as
1196 : * a UTF-8 string.
1197 : *
1198 : * \exception exception
1199 : * The function checks whether the named row exists. If not, then this error
1200 : * is raised because the function is constant and cannot create a new row.
1201 : *
1202 : * \param[in] row_name The name of the row to retrieve.
1203 : *
1204 : * \return A constant reference to a row.
1205 : */
1206 0 : const row& table::operator[] (const char *row_name) const
1207 : {
1208 0 : const row::pointer_t p_row(findRow(row_name));
1209 0 : if(!p_row) {
1210 0 : throw exception("row does not exist so it cannot be read from");
1211 : }
1212 0 : return *p_row;
1213 : }
1214 :
1215 : /** \brief Retrieve a table row.
1216 : *
1217 : * This function retrieves a table row. If the named row doesn't exist yet,
1218 : * then the function raises an error.
1219 : *
1220 : * The reference is read-only (constant) so you may retrieve a cell value
1221 : * from it, but not modify the cell.
1222 : *
1223 : * This function accepts a name as the row reference.
1224 : *
1225 : * \exception exception
1226 : * The function checks whether the named row exists. If not, then this error
1227 : * is raised because the function is constant and cannot create a new row.
1228 : *
1229 : * \param[in] row_name The name of the row to retrieve.
1230 : *
1231 : * \return A constant reference to a row.
1232 : */
1233 0 : const row& table::operator[] (const QString& row_name) const
1234 : {
1235 0 : const row::pointer_t p_row(findRow(row_name));
1236 0 : if( !p_row ) {
1237 0 : throw exception("row does not exist so it cannot be read from");
1238 : }
1239 0 : return *p_row;
1240 : }
1241 :
1242 : /** \brief Retrieve a table row.
1243 : *
1244 : * This function retrieves a table row. If the named row doesn't exist yet,
1245 : * then the function raises an error.
1246 : *
1247 : * The reference is read-only (constant) so you may retrieve a cell value
1248 : * from it, but not modify the cell.
1249 : *
1250 : * This function accepts a binary key as the row reference.
1251 : *
1252 : * \exception exception
1253 : * The function checks whether the named row exists. If not, then this error
1254 : * is raised because the function is constant and cannot create a new row.
1255 : *
1256 : * \param[in] row_key The binary key of the row to retrieve.
1257 : *
1258 : * \return A constant reference to a row.
1259 : */
1260 0 : const row& table::operator[] (const QByteArray& row_key) const
1261 : {
1262 0 : const row::pointer_t p_row(findRow(row_key));
1263 0 : if(!p_row) {
1264 0 : throw exception("row does not exist so it cannot be read from");
1265 : }
1266 0 : return *p_row;
1267 : }
1268 :
1269 :
1270 : /** \brief Drop the named row.
1271 : *
1272 : * This function is the same as the dropRow() that takes a row_key parameter.
1273 : * It simply transforms the row name into a row key and calls that other
1274 : * function.
1275 : *
1276 : * \param[in] row_name Specify the name of the row to drop.
1277 : */
1278 0 : void table::dropRow(const char *row_name)
1279 : {
1280 0 : dropRow( QByteArray(row_name, qstrlen(row_name)) );
1281 0 : }
1282 :
1283 :
1284 : /** \brief Drop the named row.
1285 : *
1286 : * This function is the same as the dropRow() that takes a row_key parameter.
1287 : * It simply transforms the row name into a row key and calls that other
1288 : * function.
1289 : *
1290 : * \param[in] row_name Specify the name of the row to drop.
1291 : */
1292 0 : void table::dropRow(const QString& row_name)
1293 : {
1294 0 : dropRow(row_name.toUtf8());
1295 0 : }
1296 :
1297 :
1298 : /** \brief Drop the row from the Cassandra database.
1299 : *
1300 : * This function deletes the specified row and its data from the Cassandra
1301 : * database and from memory.
1302 : *
1303 : * In regard to getting the row deleted from memory, you are expected to
1304 : * use a weak pointer as follow:
1305 : *
1306 : * \code
1307 : * ...
1308 : * {
1309 : * QWeakPointer<libdbproxy::row> getRow(table.getRow(row_key)));
1310 : * ...
1311 : * table.dropRow(row_key);
1312 : * }
1313 : * ...
1314 : * \endcode
1315 : *
1316 : * Note that Cassandra doesn't actually remove the row from its database until
1317 : * the next time it does a garbage collection. Still, if there is a row you do
1318 : * not need, drop it.
1319 : *
1320 : * The timestamp \p mode can be set to value::TIMESTAMP_MODE_DEFINED
1321 : * in which case the value defined in the \p timestamp parameter is used by the
1322 : * Cassandra remove() function.
1323 : *
1324 : * By default the \p mode parameter is set to
1325 : * value::TIMESTAMP_MODE_AUTO which means that we'll make use of
1326 : * the current time (i.e. only a row created after this call will exist.)
1327 : *
1328 : * The consistency level is set to CONSISTENCY_LEVEL_ALL since you are likely
1329 : * willing to delete the row on all the nodes. However, I'm not certain this
1330 : * is the best choice here. So the default may change in the future. You
1331 : * may specify CONSISTENCY_LEVEL_DEFAULT in which case the libdbproxy object
1332 : * default is used.
1333 : *
1334 : * \warning
1335 : * Remember that a row doesn't actually get removed from the Cassandra database
1336 : * until the next Garbage Collection runs. This is important for all your data
1337 : * centers to be properly refreshed. This also means a readRows() will continue
1338 : * to return the deleted row unless you check for a specific column that has
1339 : * to exist. In that case, the row is not returned since all the columns ARE
1340 : * deleted (or at least hidden in some way.) This function could be called
1341 : * truncate(), however, all empty rows are really removed at some point.
1342 : *
1343 : * \warning
1344 : * After a row was dropped, you cannot use the row object anymore, even if you
1345 : * kept a shared pointer to it. Calling functions of a dropped row is likely
1346 : * to get you a run-time exception. Note that all the cells defined in the
1347 : * row are also dropped and are also likely to generate a run-time exception
1348 : * if you kept a shared pointer on any one of them.
1349 : *
1350 : * \param[in] row_key Specify the key of the row.
1351 : */
1352 0 : void table::dropRow(const QByteArray& row_key)
1353 : {
1354 0 : remove( row_key );
1355 0 : f_rows.remove( row_key );
1356 0 : }
1357 :
1358 :
1359 : /** \brief Get the pointer to the parent object.
1360 : *
1361 : * \return Shared pointer to the cassandra object.
1362 : */
1363 0 : context::pointer_t table::parentContext() const
1364 : {
1365 0 : context::pointer_t context(f_context.lock());
1366 0 : if(context == nullptr)
1367 : {
1368 0 : throw std::runtime_error("this table was dropped and is not attached to a context anymore");
1369 : }
1370 :
1371 0 : return context;
1372 : }
1373 :
1374 :
1375 : /** \brief Save a cell value that changed.
1376 : *
1377 : * This function calls the context insertValue() function to save the new value that
1378 : * was defined in a cell. The idea is so that when the user alters the value of a cell,
1379 : * it directly updates the database. If the row does not exist, an exception is thrown.
1380 : *
1381 : * \param[in] row_key The key used to identify the row.
1382 : * \param[in] column_key The key used to identify the column.
1383 : * \param[in] value The new value of the cell.
1384 : */
1385 0 : void table::insertValue( const QByteArray& row_key, const QByteArray& column_key, const value& value )
1386 : {
1387 0 : if( !f_from_cassandra )
1388 : {
1389 0 : return;
1390 : }
1391 :
1392 : // We expect all of our orders to be serialized within a session, to
1393 : // ensure such a serialization, we have to ourselves specify the
1394 : // TIMESTAMP parameter. This also means a DROP may have problems
1395 : // and it adds some slowness.
1396 : //
1397 0 : int64_t const timestamp(libdbproxy::timeofday());
1398 0 : QString query_string(QString("INSERT INTO %1.%2(key,column1,value)VALUES(?,?,?)USING TIMESTAMP %3")
1399 0 : .arg(f_context_name)
1400 0 : .arg(f_table_name)
1401 0 : .arg(timestamp)
1402 0 : );
1403 :
1404 : // setup the consistency level
1405 0 : consistency_level_t consistency_level( value.consistencyLevel() );
1406 0 : if( consistency_level == CONSISTENCY_LEVEL_DEFAULT )
1407 : {
1408 0 : consistency_level = parentContext()->parentCassandra()->defaultConsistencyLevel();
1409 : }
1410 :
1411 : // define TTL only if the user defined it (Cassandra uses a 'null' when
1412 : // undefined and that's probably better than having either a really large
1413 : // value or 0 if that would work as 'permanent' in Cassandra.)
1414 : //
1415 0 : if(value.ttl() != value::TTL_PERMANENT)
1416 : {
1417 0 : query_string += QString(" AND TTL %1").arg(value.ttl());
1418 : }
1419 :
1420 0 : order insert_value;
1421 0 : insert_value.setCql( query_string, (f_batch_index == -1)
1422 : ? order::type_of_result_t::TYPE_OF_RESULT_SUCCESS
1423 : : order::type_of_result_t::TYPE_OF_RESULT_BATCH_ADD
1424 : );
1425 0 : insert_value.setConsistencyLevel( consistency_level );
1426 0 : insert_value.setBatchIndex(f_batch_index);
1427 :
1428 0 : insert_value.addParameter( row_key );
1429 0 : insert_value.addParameter( column_key );
1430 0 : insert_value.addParameter( value.binaryValue() );
1431 :
1432 0 : order_result const insert_value_result(f_proxy->sendOrder(insert_value));
1433 0 : if(!insert_value_result.succeeded())
1434 : {
1435 0 : SNAP_LOG_ERROR("unable to insert a value into the table for query: '")(query_string)("'");
1436 0 : throw exception("inserting a value failed");
1437 : }
1438 : }
1439 :
1440 :
1441 :
1442 :
1443 :
1444 : /** \brief Get a cell value from Cassandra.
1445 : *
1446 : * This function calls the context getValue() function to retrieve a value
1447 : * from Cassandra.
1448 : *
1449 : * The \p value parameter is not modified unless some data can be retrieved
1450 : * from Cassandra.
1451 : *
1452 : * \param[in] row_key The key used to identify the row.
1453 : * \param[in] column_key The key used to identify the column.
1454 : * \param[out] value The new value of the cell.
1455 : *
1456 : * \return false when the value was not found in the database, true otherwise
1457 : */
1458 0 : bool table::getValue(const QByteArray& row_key, const QByteArray& column_key, value& value)
1459 : {
1460 0 : const QString query_string( QString("SELECT value FROM %1.%2 WHERE key=? AND column1=?")
1461 0 : .arg(f_context_name)
1462 0 : .arg(f_table_name) );
1463 :
1464 : // setup the consistency level
1465 0 : consistency_level_t consistency_level( value.consistencyLevel() );
1466 0 : if( consistency_level == CONSISTENCY_LEVEL_DEFAULT )
1467 : {
1468 0 : consistency_level = parentContext()->parentCassandra()->defaultConsistencyLevel();
1469 : }
1470 :
1471 0 : order get_value;
1472 0 : get_value.setCql(query_string, order::type_of_result_t::TYPE_OF_RESULT_ROWS);
1473 0 : get_value.setConsistencyLevel( consistency_level );
1474 :
1475 0 : get_value.addParameter( row_key );
1476 0 : get_value.addParameter( column_key );
1477 :
1478 0 : order_result const get_value_result(f_proxy->sendOrder(get_value));
1479 0 : if(!get_value_result.succeeded())
1480 : {
1481 0 : throw exception("retrieving a value failed");
1482 : }
1483 :
1484 0 : if(get_value_result.resultCount() == 0)
1485 : {
1486 0 : value.setNullValue();
1487 0 : return false;
1488 : }
1489 :
1490 0 : value = get_value_result.result(0);
1491 :
1492 0 : return true;
1493 : }
1494 :
1495 :
1496 : /** \brief Count columns.
1497 : *
1498 : * This function counts a the number of columns that match a specified
1499 : * \p column_predicate.
1500 : *
1501 : * If you want to use a different consistency, also indicate such in
1502 : * the \p column_predicate parameter.
1503 : *
1504 : * \param[in] row_key The row for which this data is being counted.
1505 : * \param[in] column_predicate The predicate to use to count the cells.
1506 : *
1507 : * \return The number of columns in this row.
1508 : */
1509 0 : int32_t table::getCellCount
1510 : ( const QByteArray& row_key
1511 : , cell_predicate::pointer_t column_predicate
1512 : )
1513 : {
1514 0 : if( f_rows.find( row_key ) == f_rows.end() )
1515 : {
1516 0 : const QString query_string ( QString("SELECT COUNT(*)AS count FROM %1.%2")
1517 0 : .arg(f_context_name)
1518 0 : .arg(f_table_name)
1519 0 : );
1520 :
1521 : // setup the consistency level
1522 : consistency_level_t consistency_level( column_predicate
1523 0 : ? column_predicate->consistencyLevel()
1524 0 : : CONSISTENCY_LEVEL_DEFAULT );
1525 0 : if( consistency_level == CONSISTENCY_LEVEL_DEFAULT )
1526 : {
1527 0 : consistency_level = parentContext()->parentCassandra()->defaultConsistencyLevel();
1528 : }
1529 :
1530 0 : order cell_count;
1531 0 : cell_count.setCql(query_string, order::type_of_result_t::TYPE_OF_RESULT_ROWS);
1532 0 : cell_count.setPagingSize(column_predicate ? column_predicate->count() : 100);
1533 0 : cell_count.setConsistencyLevel(consistency_level);
1534 0 : order_result cell_count_result(f_proxy->sendOrder(cell_count));
1535 0 : if(!cell_count_result.succeeded()
1536 0 : || cell_count_result.resultCount() != 1)
1537 : {
1538 0 : throw exception("cell count failed");
1539 : }
1540 :
1541 0 : return int32Value(cell_count_result.result(0));
1542 : }
1543 :
1544 : // return the count from the memory cache
1545 0 : return f_rows[row_key]->getCells().size();
1546 : }
1547 :
1548 : /** \brief Delete a Cell from a table row.
1549 : *
1550 : * This function removes a cell from the Cassandra database as specified
1551 : * by the parameters.
1552 : *
1553 : * \param[in] row_key The row in which the cell is to be removed.
1554 : * \param[in] column_key The cell to be removed.
1555 : * \param[in] consistency_level The consistency level to specify when dropping the row with respect to other data centers.
1556 : */
1557 0 : void table::remove
1558 : ( const QByteArray& row_key
1559 : , const QByteArray& column_key
1560 : , consistency_level_t consistency_level
1561 : )
1562 : {
1563 0 : if( !f_from_cassandra || !f_proxy )
1564 : {
1565 0 : return;
1566 : }
1567 :
1568 : const QString query_string(
1569 0 : QString("DELETE FROM %1.%2 WHERE key=? AND column1=?")
1570 0 : .arg(f_context_name)
1571 0 : .arg(f_table_name)
1572 0 : );
1573 :
1574 0 : order drop_cell;
1575 0 : drop_cell.setCql( query_string, (f_batch_index == -1)
1576 : ? order::type_of_result_t::TYPE_OF_RESULT_SUCCESS
1577 : : order::type_of_result_t::TYPE_OF_RESULT_BATCH_ADD
1578 : );
1579 0 : drop_cell.setBatchIndex(f_batch_index);
1580 0 : drop_cell.setConsistencyLevel(consistency_level);
1581 0 : drop_cell.setTimestamp(libdbproxy::timeofday()); // make sure it gets deleted no matter when it was created
1582 0 : drop_cell.addParameter(row_key);
1583 0 : drop_cell.addParameter(column_key);
1584 0 : order_result const drop_cell_result(f_proxy->sendOrder(drop_cell));
1585 0 : if(!drop_cell_result.succeeded())
1586 : {
1587 0 : throw exception("drop cell failed");
1588 : }
1589 : }
1590 :
1591 : /** \brief Delete a Cell from a table row.
1592 : *
1593 : * This function removes a cell from the Cassandra database as specified
1594 : * by the parameters.
1595 : *
1596 : * \param[in] row_key The row in which the cell is to be removed.
1597 : */
1598 0 : void table::remove( const QByteArray& row_key )
1599 : {
1600 0 : if( !f_from_cassandra || !f_proxy )
1601 : {
1602 0 : return;
1603 : }
1604 :
1605 : const QString query_string(
1606 0 : QString("DELETE FROM %1.%2 WHERE key=?")
1607 0 : .arg(f_context_name)
1608 0 : .arg(f_table_name)
1609 0 : );
1610 :
1611 0 : order drop_cell;
1612 0 : drop_cell.setCql( query_string, (f_batch_index == -1)
1613 : ? order::type_of_result_t::TYPE_OF_RESULT_SUCCESS
1614 : : order::type_of_result_t::TYPE_OF_RESULT_BATCH_ADD
1615 : );
1616 0 : drop_cell.setBatchIndex(f_batch_index);
1617 0 : drop_cell.setConsistencyLevel(parentContext()->parentCassandra()->defaultConsistencyLevel());
1618 0 : drop_cell.setTimestamp(libdbproxy::timeofday()); // make sure it gets deleted no matter when it was created
1619 0 : drop_cell.addParameter(row_key);
1620 0 : order_result const drop_cell_result(f_proxy->sendOrder(drop_cell));
1621 0 : if(!drop_cell_result.succeeded())
1622 : {
1623 0 : throw exception("drop cell failed");
1624 : }
1625 : }
1626 :
1627 6 : } // namespace libdbproxy
1628 :
1629 : // vim: ts=4 sw=4 et
|