Line data Source code
1 : // Copyright (c) 2016-2024 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/cluck
4 : // contact@m2osw.com
5 : //
6 : // This program is free software: you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation, either version 3 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program. If not, see <https://www.gnu.org/licenses/>.
18 :
19 :
20 : // self
21 : //
22 : #include "cluckd.h"
23 :
24 :
25 :
26 : // cluck
27 : //
28 : #include <cluck/exception.h>
29 : #include <cluck/names.h>
30 : #include <cluck/version.h>
31 :
32 :
33 : // communicatord
34 : //
35 : #include <communicatord/flags.h>
36 : #include <communicatord/names.h>
37 :
38 :
39 : // as2js
40 : //
41 : #include <as2js/json.h>
42 :
43 :
44 : // eventdispatcher
45 : //
46 : #include <eventdispatcher/names.h>
47 :
48 :
49 : // snapdev
50 : //
51 : #include <snapdev/gethostname.h>
52 : #include <snapdev/hexadecimal_string.h>
53 : #include <snapdev/stringize.h>
54 : #include <snapdev/tokenize_string.h>
55 : #include <snapdev/to_string_literal.h>
56 :
57 :
58 : // snaplogger
59 : //
60 : #include <snaplogger/logger.h>
61 : #include <snaplogger/options.h>
62 : #include <snaplogger/severity.h>
63 :
64 :
65 : // advgetopt
66 : //
67 : #include <advgetopt/advgetopt.h>
68 : #include <advgetopt/exception.h>
69 :
70 :
71 : // C++
72 : //
73 : #include <algorithm>
74 : #include <iostream>
75 : #include <sstream>
76 :
77 :
78 : // openssl
79 : //
80 : #include <openssl/rand.h>
81 :
82 :
83 : // last include
84 : //
85 : #include <snapdev/poison.h>
86 :
87 :
88 :
89 : /** \file
90 : * \brief Implementation of the inter-process lock mechanism.
91 : *
92 : * This file implements an inter-process lock that functions between
93 : * any number of machines. The basic algorithm used is the Bakery
94 : * Algorithm by Lamport. The concept is simple: you get a waiting
95 : * ticket and loop through the tickets until it yours is picked.
96 : *
97 : * Contrary to a multi-processor environment thread synchronization,
98 : * this lock system uses messages and arrays to know its current
99 : * state. A user interested in obtaining a lock sends a LOCK
100 : * message. The cluck daemon then waits until the lock is
101 : * obtained and sends a LOCKED as a reply. Once done with the lock,
102 : * the user sends UNLOCK to release the lock.
103 : *
104 : * The implementation makes use of any number of cluck instances.
105 : * The locking mechanism makes use of the QUORUM voting system to
106 : * know that enough of the other cluck agree on a statement.
107 : * This allows the cluck daemon to obtain/release locks in an
108 : * unknown network environment (i.e. any one of the machines may
109 : * be up or down and the locking mechanism still functions as
110 : * expected).
111 : *
112 : * The QUORUM is computed using the following formula:
113 : *
114 : * \f[
115 : * {number of computers} over 2 + 1
116 : * \f]
117 : *
118 : * Note that the lock mechanism itself only uses 3 computers (2 or 1 if
119 : * your cluster is that small). So in a cluster of, say, 100 computers,
120 : * you would still end up using just 3 for the locks. However, the
121 : * elections still require you to have a quorum of all the computers
122 : * (100 / 2 + 1 is at least 51 computers).
123 : *
124 : * \note
125 : * The cluck implementation checks parameters and throws away
126 : * messages that are definitely not going to be understood. However,
127 : * it is, like most Snap! daemons, very trustworthy of other cluck
128 : * daemons and does not expect other daemons to mess around with its
129 : * sequence of lock messages used to ensure that everything worked as
130 : * expected.
131 : */
132 :
133 :
134 :
135 : namespace cluck_daemon
136 : {
137 :
138 :
139 : namespace
140 : {
141 :
142 :
143 :
144 : constexpr std::string_view g_default_candidate_priority =
145 : snapdev::integer_to_string_literal<computer::PRIORITY_DEFAULT>.data();
146 :
147 :
148 : advgetopt::option const g_options[] =
149 : {
150 : advgetopt::define_option(
151 : advgetopt::Name("candidate-priority")
152 : , advgetopt::ShortName('p')
153 : , advgetopt::Flags(advgetopt::all_flags<
154 : advgetopt::GETOPT_FLAG_REQUIRED
155 : , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
156 : , advgetopt::Help("Define the priority of this candidate (1 to 14) to gain a leader position or \"off\".")
157 : , advgetopt::DefaultValue(g_default_candidate_priority.data())
158 : ),
159 : advgetopt::define_option(
160 : advgetopt::Name("server-name")
161 : , advgetopt::ShortName('n')
162 : , advgetopt::Flags(advgetopt::all_flags<
163 : advgetopt::GETOPT_FLAG_DYNAMIC_CONFIGURATION
164 : , advgetopt::GETOPT_FLAG_REQUIRED
165 : , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
166 : , advgetopt::Help("Set the name of this server instance.")
167 : ),
168 : advgetopt::end_options()
169 : };
170 :
171 :
172 : advgetopt::group_description const g_group_descriptions[] =
173 : {
174 : advgetopt::define_group(
175 : advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_COMMANDS)
176 : , advgetopt::GroupName("command")
177 : , advgetopt::GroupDescription("Commands:")
178 : ),
179 : advgetopt::define_group(
180 : advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_OPTIONS)
181 : , advgetopt::GroupName("option")
182 : , advgetopt::GroupDescription("Options:")
183 : ),
184 : advgetopt::end_groups()
185 : };
186 :
187 :
188 : constexpr char const * const g_configuration_files[] =
189 : {
190 : "/etc/cluck/cluckd.conf",
191 : nullptr
192 : };
193 :
194 :
195 : advgetopt::options_environment const g_options_environment =
196 : {
197 : .f_project_name = "cluckd",
198 : .f_group_name = "cluck",
199 : .f_options = g_options,
200 : .f_environment_variable_name = "CLUCKD_OPTIONS",
201 : .f_configuration_files = g_configuration_files,
202 : .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_SYSTEM_PARAMETERS
203 : | advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
204 : .f_help_header = "Usage: %p [-<opt>]\n"
205 : "where -<opt> is one or more of:",
206 : .f_help_footer = "%c",
207 : .f_version = CLUCK_VERSION_STRING,
208 : .f_license = "GNU GPL v3",
209 : .f_copyright = "Copyright (c) 2013-"
210 : SNAPDEV_STRINGIZE(UTC_BUILD_YEAR)
211 : " by Made to Order Software Corporation -- All Rights Reserved",
212 : .f_groups = g_group_descriptions,
213 : };
214 :
215 :
216 :
217 : }
218 : // no name namespace
219 :
220 :
221 :
222 :
223 :
224 :
225 :
226 :
227 :
228 :
229 :
230 :
231 :
232 : /** \class cluckd
233 : * \brief Class handling intercomputer locking.
234 : *
235 : * This class is used in order to create intercomputer locks on request.
236 : *
237 : * The class uses Snap! Communicator messages and implements
238 : * the LOCK and UNLOCK commands and sends the LOCKED and UNLOCKED
239 : * commands to its senders.
240 : *
241 : * The system makes use of the Lamport's Bakery Algorithm. This is
242 : * explained in the ticket class.
243 : *
244 : * \note
245 : * At this time, there is one potential problem that can arise: the
246 : * lock may fail to concretize because the computer to which you
247 : * first sent the LOCK message goes down in some way. The other
248 : * cluck computers will have no clue by which computer the lock
249 : * was being worked on and whether one of them should take over.
250 : * One way to remediate is to run one instance of cluck on each
251 : * computer on which a lock is likely to happen. Then the LOCK
252 : * message will be proxied to the proper destination (a leader).
253 : *
254 : * \warning
255 : * The LOCK mechanism uses the system clock of each computer to know when
256 : * a lock times out. You are responsible for making sure that all those
257 : * computers have a synchronized clocked (i.e. run a timed daemon such
258 : * as ntpd). The difference in time should be as small as possible. The
259 : * precision required by cluck is around 1 second.
260 : *
261 : * The following shows the messages used to promote 3 leaders, in other
262 : * words it shows how the election process happens. The election itself
263 : * is done on the computer that is part of the cluster considered to be up
264 : * and which has the smallest IP address. That's the one computer that will
265 : * send the LOCKLEADERS. As soon as that happens all the other nodes on the
266 : * cluster know the leaders and inform new nodes through the LOCKSTARTED
267 : * message.
268 : *
269 : * \msc
270 : * Communicator,A,B,C,D,E,F;
271 : *
272 : * A->Communicator [label="REGISTER"];
273 : * Communicator->A [label="HELP"];
274 : * Communicator->A [label="READY"];
275 : *
276 : * A->Communicator [label="CLUSTERSTATUS"];
277 : * Communicator->A [label="CLUSTERUP"];
278 : *
279 : * # Broadcast to B to F, but we do not know who's up at this point
280 : * A->* [label="LOCKSTARTED"];
281 : *
282 : * # A answers each one of those because for it B, C, D, ... are new
283 : * B->A [label="LOCKSTARTED"];
284 : * A->B [label="LOCKSTARTED"];
285 : *
286 : * C->A [label="LOCKSTARTED"];
287 : * A->C [label="LOCKSTARTED"];
288 : *
289 : * D->A [label="LOCKSTARTED"];
290 : * A->D [label="LOCKSTARTED"];
291 : *
292 : * # When we reach here we have a CLUSTERUP in terms of cluck daemons
293 : * # Again here we broadcast, maybe we should send to known computers instead?
294 : * # IMPORTANT: A determines the leaders only if its IP is the smallest
295 : * A->* [label="LOCKLEADERS"];
296 : *
297 : * # Here the replies from A will include the leaders
298 : * # Of course, as shown above, E will have sent the message to all and
299 : * # so it will receive the leaders from multiple sources
300 : * E->A [label="LOCKSTARTED"];
301 : * A->E [label="LOCKSTARTED"];
302 : *
303 : * F->A [label="LOCKSTARTED"];
304 : * A->F [label="LOCKSTARTED"];
305 : * \endmsc
306 : *
307 : * \sa ticket
308 : */
309 :
310 :
311 :
312 : /** \brief Initializes a cluckd object.
313 : *
314 : * This function parses the command line arguments, reads configuration
315 : * files, setups the logger.
316 : *
317 : * It also immediately executes a --help or a --version command line
318 : * option and exits the process if these are present.
319 : *
320 : * \param[in] argc The number of arguments in the argv array.
321 : * \param[in] argv The array of argument strings.
322 : */
323 34 : cluckd::cluckd(int argc, char * argv[])
324 34 : : f_opts(g_options_environment)
325 : {
326 34 : snaplogger::add_logger_options(f_opts);
327 :
328 : // before we can parse command line arguments, we must create the
329 : // fluid settings & communicator client objects which happen to
330 : // dynamically add command line options to f_opts
331 : //
332 34 : f_messenger = std::make_shared<messenger>(this, f_opts);
333 :
334 34 : f_opts.finish_parsing(argc, argv);
335 32 : if(!snaplogger::process_logger_options(f_opts, "/etc/cluck/logger"))
336 : {
337 1 : throw advgetopt::getopt_exit("logger options generated an error.", 0);
338 : }
339 :
340 : // determine this server name
341 : //
342 : // TODO: if the name of the server is changed, we should reboot, but
343 : // to the minimum we need to restart cluckd (among other daemons)
344 : // remember that snapmanager.cgi gives you that option;
345 : // we CANNOT use fluid-settings for this one since each computer
346 : // must have a different name
347 : //
348 31 : if(f_opts.is_defined("server-name"))
349 : {
350 2 : f_server_name = f_opts.get_string("server-name");
351 : }
352 31 : if(f_server_name.empty())
353 : {
354 29 : f_server_name = snapdev::gethostname();
355 : }
356 :
357 31 : f_start_time = snapdev::now();
358 70 : }
359 :
360 :
361 : /** \brief Do some clean ups.
362 : *
363 : * At this point, the destructor is present mainly because we have
364 : * some virtual functions.
365 : */
366 31 : cluckd::~cluckd()
367 : {
368 31 : }
369 :
370 :
371 : /** \brief Finish the cluck daemon initialization.
372 : *
373 : * This function creates all the connections used by the cluck daemon.
374 : *
375 : * \note
376 : * This is separate from the run() function so we can run unit tests
377 : * against the cluck daemon.
378 : *
379 : * \sa run()
380 : */
381 25 : void cluckd::add_connections()
382 : {
383 25 : f_communicator = ed::communicator::instance();
384 :
385 : // capture Ctrl-C (SIGINT) to get a clean exit by default
386 : //
387 25 : f_interrupt = std::make_shared<interrupt>(this);
388 25 : f_communicator->add_connection(f_interrupt);
389 :
390 : // timer so we can timeout locks
391 : //
392 25 : f_timer = std::make_shared<timer>(this);
393 25 : f_communicator->add_connection(f_timer);
394 :
395 : // add the messenger used to communicate with the communicator daemon
396 : // and other services as required
397 : //
398 25 : f_communicator->add_connection(f_messenger);
399 :
400 : // the following call actually connects the messenger to the
401 : // communicator daemon
402 : //
403 25 : f_messenger->finish_parsing();
404 25 : }
405 :
406 :
407 25 : void cluckd::set_my_ip_address(addr::addr const & a)
408 : {
409 25 : f_my_ip_address = a;
410 25 : }
411 :
412 :
413 : /** \brief Run the cluck daemon.
414 : *
415 : * This function is the core function of the daemon. It runs the loop
416 : * used to lock processes from any number of computers that have access
417 : * to the cluck daemon network.
418 : *
419 : * \sa add_connections()
420 : */
421 25 : void cluckd::run()
422 : {
423 25 : SNAP_LOG_INFO
424 : << "--------------------------------- cluckd started."
425 : << SNAP_LOG_SEND;
426 :
427 : // now run our listening loop
428 : //
429 25 : f_communicator->run();
430 24 : }
431 :
432 :
433 :
434 :
435 :
436 : /** \brief Return the number of known computers running cluckd.
437 : *
438 : * This function is used by the ticket objects to calculate
439 : * the quorum so as to know how many computers need to reply to
440 : * our messages before we can be sure we got the correct
441 : * results.
442 : *
443 : * \return The number of instances of cluckd running and connected.
444 : */
445 483 : int cluckd::get_computer_count() const
446 : {
447 483 : return f_computers.size();
448 : }
449 :
450 :
451 : /** \brief Get the name of the server we are running on.
452 : *
453 : * This function returns the name of the server this instance of
454 : * cluck is running. It is used by the ticket implementation
455 : * to know whether to send a reply to the cluck object (i.e.
456 : * at this time we can send messages to that object only from the
457 : * server it was sent from).
458 : *
459 : * \return The name of the server cluck is running on.
460 : */
461 261 : std::string const & cluckd::get_server_name() const
462 : {
463 261 : return f_server_name;
464 : }
465 :
466 :
467 : /** \brief Check whether the cluck daemon is ready to process lock requests.
468 : *
469 : * This function checks whether the cluck daemon is ready by looking
470 : * at whether it has leaders and if so, whether each leader is connected.
471 : *
472 : * Once both tests succeeds, this cluck daemon can forward the locks to
473 : * the leaders. If it is a leader itself, it can enter a ticket in
474 : * the selection and message both of the other leaders about it.
475 : *
476 : * \return true once locks can be processed.
477 : */
478 396 : bool cluckd::is_daemon_ready() const
479 : {
480 : // we are not yet properly registered
481 : //
482 396 : if(f_messenger == nullptr || !f_messenger->is_ready())
483 : {
484 9 : return false;
485 : }
486 :
487 : // without at least one leader we are definitely not ready
488 : //
489 387 : if(f_leaders.empty())
490 : {
491 78 : SNAP_LOG_TRACE
492 : << "not considered ready: no leaders."
493 : << SNAP_LOG_SEND;
494 78 : return false;
495 : }
496 :
497 : // enough leaders for that cluster?
498 : //
499 : // we consider that having at least 2 leaders is valid because locks
500 : // will still work, an election should be happening when we lose a
501 : // leader fixing that temporary state
502 : //
503 : // the test below allows for the case where we have a single computer
504 : // too (i.e. "one neighbor")
505 : //
506 : // notice how not having received the CLUSTERUP would be taken in
507 : // account here since f_neighbors_count will still be 0 in that case
508 : // (however, the previous empty() test already takes that in account)
509 : //
510 309 : if(f_leaders.size() == 1
511 309 : && f_neighbors_count != 1)
512 : {
513 2 : SNAP_LOG_TRACE
514 : << "not considered ready: not enough leaders for this cluster."
515 : << SNAP_LOG_SEND;
516 2 : return false;
517 : }
518 :
519 : // the election_status() function verifies that the quorum is
520 : // attained, but it can change if the cluster grows or shrinks
521 : // so we have to check here again as the lock system becomes
522 : // "unready" when the quorum is lost; see that other function
523 : // for additional info
524 :
525 : // this one probably looks complicated...
526 : //
527 : // if our quorum is 1 or 2 then we need a number of computers
528 : // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
529 : // status which we verify here)
530 : //
531 614 : if(f_neighbors_quorum < 3
532 307 : && f_computers.size() < f_neighbors_count)
533 : {
534 2 : SNAP_LOG_TRACE
535 1 : << "not considered ready: quorum changed, re-election expected soon (number of neighbors: "
536 1 : << f_neighbors_count
537 1 : << ", neighbors quorum: "
538 1 : << f_neighbors_quorum
539 1 : << ", number of computers: "
540 1 : << f_computers.size()
541 : << "."
542 : << SNAP_LOG_SEND;
543 1 : return false;
544 : }
545 :
546 : // the neighbors count & quorum can change over time so
547 : // we have to verify that the number of computers is
548 : // still acceptable here
549 : //
550 306 : if(f_computers.size() < f_neighbors_quorum)
551 : {
552 2 : SNAP_LOG_TRACE
553 : << "not considered ready: quorum lost, re-election expected soon."
554 : << SNAP_LOG_SEND;
555 2 : return false;
556 : }
557 :
558 : // are all leaders connected to us?
559 : //
560 304 : std::size_t ready(0);
561 304 : computer::pointer_t last_leader;
562 950 : for(auto const & l : f_leaders)
563 : {
564 646 : if(l->get_connected())
565 : {
566 644 : ++ready;
567 : }
568 : else
569 : {
570 : // attempt resending a LOCK_STARTED because it could be that it
571 : // did not work quite right and the cluck daemons are not
572 : // going to ever talk with each others otherwise
573 : //
574 : // we also make sure we do not send the message too many times,
575 : // it should be resolved in five seconds or less...
576 : //
577 2 : time_t const now(time(nullptr));
578 2 : if(now >= f_pace_lockstarted)
579 : {
580 : // pause for 5 to 6 seconds in case this happens a lot
581 : //
582 2 : f_pace_lockstarted = now + 5;
583 :
584 : // only send it to that specific cluck daemon
585 : //
586 2 : ed::message temporary_message;
587 2 : temporary_message.set_sent_from_server(l->get_name());
588 2 : temporary_message.set_sent_from_service(cluck::g_name_cluck_service_name);
589 2 : const_cast<cluckd *>(this)->send_lock_started(&temporary_message);
590 2 : }
591 :
592 2 : last_leader = l;
593 : }
594 : }
595 :
596 : // we only need to have 2 leaders to have a functional system with
597 : // 2 or 3 leaders total
598 : //
599 304 : if(ready >= 2
600 304 : || ready == f_leaders.size())
601 : {
602 304 : return true;
603 : }
604 :
605 : // we're still not ready
606 : //
607 0 : SNAP_LOG_TRACE
608 : << "not considered ready: no direct connection with leader: \""
609 0 : << last_leader->get_name()
610 : << "\"."
611 : << SNAP_LOG_SEND;
612 :
613 0 : return false;
614 304 : }
615 :
616 :
617 : /** \brief Search for a leader.
618 : *
619 : * This function goes through the list of leaders to determine whether
620 : * the specified identifier represents a leader. If so it returns a pointer
621 : * to that leader computer object.
622 : *
623 : * When the function is called with an empty string as the computer
624 : * identifier, this computer is checked to see whether it is a leader.
625 : *
626 : * \warning
627 : * This function is considered slow since it goes through the list of leaders
628 : * on each call. On the other hand, it's only 1 to 3 leaders. Yet, you should
629 : * cache the result within your function if you need the result multiple
630 : * times, as in:
631 : *
632 : * \code
633 : * computer::pointer_t leader(is_leader());
634 : * if(leader != nullptr)
635 : * {
636 : * // ... use `leader` any number of times ...
637 : * }
638 : * \endcode
639 : *
640 : * \par
641 : * This is done that way because the function may return a different result
642 : * over time (i.e. if a re-election happens). So you do not want to cache
643 : * the results between calls to your function.
644 : *
645 : * \param[in] id The identifier of the leader to search, if empty, default
646 : * to f_my_id (i.e. whether this cluckd is a leader).
647 : *
648 : * \return The leader computer::pointer_t or a null pointer.
649 : */
650 1937 : computer::pointer_t cluckd::is_leader(std::string id) const
651 : {
652 1937 : if(id.empty())
653 : {
654 1937 : id = f_my_id;
655 : }
656 :
657 3874 : auto const l(std::find_if(
658 : f_leaders.begin()
659 : , f_leaders.end()
660 2055 : , [id](auto const & c){
661 2055 : return c->get_id() == id;
662 1937 : }));
663 1937 : if(l != f_leaders.end())
664 : {
665 1861 : return *l;
666 : }
667 :
668 76 : return computer::pointer_t();
669 : }
670 :
671 :
672 : /** \brief Get pointer to leader A.
673 : *
674 : * We have 1 to 3 leaders in each cluck daemon. There is "self", leader A,
675 : * and leader B. This function returns leader A or a null pointer if there
676 : * is only one leader.
677 : *
678 : * Leader A is either f_leaders[0] or f_leaders[1]. If "self" is f_leaders[0]
679 : * then the function returns f_leaders[1], If "self" is f_leaders[1], then
680 : * the function returns f_leaders[0].
681 : *
682 : * \note
683 : * In a setup where you have only 1 computer total, this function always
684 : * returns a null pointer.
685 : *
686 : * \note
687 : * If the elections have not yet happened, this function always returns
688 : * a null pionter.
689 : *
690 : * \return A pointer to the leader A computer or nullptr if there is no
691 : * such computer.
692 : */
693 814 : computer::pointer_t cluckd::get_leader_a() const
694 : {
695 : #ifdef _DEBUG
696 814 : if(is_leader() == nullptr)
697 : {
698 9 : throw cluck::logic_error("cluckd::get_leader_a(): only a leader can call this function.");
699 : }
700 : #endif
701 :
702 805 : switch(f_leaders.size())
703 : {
704 : case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
705 : default:
706 : throw cluck::logic_error("cluckd::get_leader_a(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
707 :
708 19 : case 1:
709 19 : return computer::pointer_t();
710 :
711 786 : case 2:
712 : case 3:
713 786 : return f_leaders[f_leaders[0]->is_self() ? 1 : 0];
714 :
715 : }
716 : }
717 :
718 :
719 : /** \brief Get pointer to leader B.
720 : *
721 : * We have 1 to 3 leaders in each cluck daemon. There is "self", leader A,
722 : * and leader B. This function returns leader B or a null pointer if there
723 : * is only one leader.
724 : *
725 : * Leader B is either f_leaders[1] or f_leaders[2]. If "self" is f_leaders[1]
726 : * then the function returns f_leaders[2], If "self" is f_leaders[2], then
727 : * the function returns f_leaders[1].
728 : *
729 : * \note
730 : * In a setup where you have only 1 or 2 computers total, this function
731 : * always returns a null pointer.
732 : *
733 : * \note
734 : * If the elections have not yet happened, this function always returns
735 : * a null pionter.
736 : *
737 : * \return A pointer to the leader B computer or nullptr if there is no
738 : * such computer.
739 : */
740 795 : computer::pointer_t cluckd::get_leader_b() const
741 : {
742 : #ifdef _DEBUG
743 795 : if(is_leader() == nullptr)
744 : {
745 9 : throw cluck::logic_error("cluckd::get_leader_b(): only a leader can call this function.");
746 : }
747 : #endif
748 :
749 786 : switch(f_leaders.size())
750 : {
751 : case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
752 : default:
753 : throw cluck::unexpected_case("cluckd::get_leader_b(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
754 :
755 714 : case 1:
756 : case 2: // we have a leader A but no leader B when we have only 2 leaders
757 714 : return computer::pointer_t();
758 :
759 72 : case 3:
760 72 : return f_leaders[f_leaders[2]->is_self() ? 1 : 2];
761 :
762 : }
763 : }
764 :
765 :
766 :
767 : /** \brief Return a JSON with the state of this cluckd object.
768 : *
769 : * This function generates a JSON string of the current state of this
770 : * cluck daemon and replies with that message back to the caller.
771 : *
772 : * This is primarily used to debug a cluckd instance and make sure that the
773 : * state is how you would otherwise expect it to be.
774 : *
775 : * \todo
776 : * The list of tickets uses the internal serialization mechanism, which
777 : * creates a much small array of tickets. At some point, we should transform
778 : * that to output JSON instead.
779 : *
780 : * \param[in] msg The INFO message.
781 : */
782 20 : void cluckd::msg_info(ed::message & msg)
783 : {
784 20 : std::stringstream ss;
785 :
786 20 : as2js::position p;
787 20 : p.set_filename("cluckd.cpp");
788 20 : p.set_function("msg_info");
789 :
790 20 : as2js::json::json_value::object_t obj;
791 20 : as2js::json::json_value::pointer_t result(std::make_shared<as2js::json::json_value>(p, obj));
792 :
793 : {
794 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, is_daemon_ready()));
795 20 : result->set_member("daemon_ready", value);
796 20 : }
797 :
798 20 : if(!f_my_id.empty())
799 : {
800 19 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, f_my_id));
801 19 : result->set_member("id", value);
802 19 : }
803 :
804 : {
805 20 : addr::addr zero;
806 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
807 20 : f_my_ip_address == zero
808 40 : ? "<not assigned>"
809 40 : : f_my_ip_address.to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
810 20 : result->set_member("ip", value);
811 20 : }
812 :
813 : {
814 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_count)));
815 20 : result->set_member("neighbors_count", value);
816 20 : }
817 :
818 : {
819 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_quorum)));
820 20 : result->set_member("neighbors_quorum", value);
821 20 : }
822 :
823 : {
824 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_leaders.size())));
825 20 : result->set_member("leaders_count", value);
826 20 : }
827 :
828 20 : if(!f_message_cache.empty())
829 : {
830 1 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_message_cache.size())));
831 1 : result->set_member("cache_size", value);
832 1 : }
833 :
834 : {
835 20 : as2js::json::json_value::array_t computers;
836 20 : as2js::json::json_value::pointer_t list(std::make_shared<as2js::json::json_value>(p, computers));
837 :
838 112 : for(auto const & c : f_computers)
839 : {
840 92 : as2js::json::json_value::object_t computer;
841 92 : as2js::json::json_value::pointer_t item(std::make_shared<as2js::json::json_value>(p, computer));
842 :
843 : {
844 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_name()));
845 92 : item->set_member("name", value);
846 92 : }
847 :
848 : {
849 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_id()));
850 92 : item->set_member("id", value);
851 92 : }
852 :
853 : {
854 184 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_ip_address().to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
855 92 : item->set_member("ip", value);
856 92 : }
857 :
858 : {
859 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_connected()));
860 92 : item->set_member("connected", value);
861 92 : }
862 :
863 : {
864 92 : auto const it(std::find_if(
865 : f_leaders.begin()
866 : , f_leaders.end()
867 179 : , [&c](auto const & l)
868 : {
869 179 : return c.second == l;
870 : }));
871 92 : std::string leader;
872 92 : if(it != f_leaders.end())
873 : {
874 31 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(it - f_leaders.begin())));
875 31 : item->set_member("leader", value);
876 31 : }
877 92 : }
878 :
879 92 : list->set_item(list->get_array().size(), item);
880 92 : }
881 20 : result->set_member("computers", list);
882 20 : }
883 :
884 40 : if(msg.has_parameter(cluck::g_name_cluck_param_mode)
885 40 : && msg.get_parameter(cluck::g_name_cluck_param_mode) == cluck::g_name_cluck_value_debug)
886 : {
887 : // TODO: create a serialization to JSON instead of a specialized string
888 : //
889 4 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, serialized_tickets()));
890 2 : result->set_member("tickets", value);
891 2 : }
892 :
893 20 : ed::message reply_message;
894 20 : reply_message.set_command(cluck::g_name_cluck_cmd_cluckd_status);
895 20 : reply_message.reply_to(msg);
896 20 : reply_message.add_parameter(communicatord::g_name_communicatord_param_status, result->to_string());
897 20 : f_messenger->send_message(reply_message);
898 40 : }
899 :
900 :
901 :
902 : /** \brief Generate the output for "cluck-status --list"
903 : *
904 : * This function loops over the list of tickets and outputs a string that
905 : * it sends back to the `cluck-status --list` tool for printing to the administrator.
906 : *
907 : * \return A string with the list of all the tickets.
908 : */
909 4 : std::string cluckd::ticket_list() const
910 : {
911 4 : std::stringstream list;
912 11 : for(auto const & obj_ticket : f_tickets)
913 : {
914 18 : for(auto const & key_ticket : obj_ticket.second)
915 : {
916 : list
917 11 : << "ticket_id: "
918 11 : << key_ticket.second->get_ticket_number()
919 : << " object name: \""
920 11 : << key_ticket.second->get_object_name()
921 : << "\" key: "
922 22 : << key_ticket.second->get_entering_key();
923 :
924 11 : cluck::timeout_t const lock_timeout(key_ticket.second->get_lock_timeout_date());
925 11 : if(lock_timeout != cluck::timeout_t())
926 : {
927 : list
928 : << " timeout "
929 3 : << lock_timeout.to_string();
930 : }
931 : else
932 : {
933 8 : cluck::timeout_t const obtention_timeout(key_ticket.second->get_obtention_timeout());
934 : list
935 : << " obtention "
936 8 : << obtention_timeout.to_string();
937 : }
938 :
939 11 : list << '\n';
940 : }
941 : }
942 :
943 8 : return list.str();
944 4 : }
945 :
946 :
947 : /** \brief Check the status of the election.
948 : *
949 : * This function checks whether the leaders were already elected. If so,
950 : * then nothing happens.
951 : *
952 : * Otherwise, it checks the state of the election:
953 : *
954 : * * leaders are not already elected
955 : * * this cluckd received its own IP address
956 : * * we do not yet know how many neighbors there are in our cluster
957 : * * the cluster as a sufficient (quorum) number of computers to support
958 : * the lock mechanism properly (2 of the 3 leaders or a cluster quorum
959 : * when the cluster has 4 or more computers)
960 : * * we have the lowest IP address (only the cluck daemon with the lowest
961 : * IP can decide which computers are the leaders
962 : * * not too many of the cluckd are opt out of leadership
963 : */
964 81 : void cluckd::election_status()
965 : {
966 : // we already have election results?
967 : //
968 81 : if(!f_leaders.empty())
969 : {
970 : // the results may have been "tempered" with (i.e. one of
971 : // the leaders was lost)
972 : //
973 19 : if(f_leaders.size() == 3
974 19 : || (f_neighbors_count < 3 && f_leaders.size() == f_neighbors_count))
975 : {
976 : // status is fine
977 : //
978 9 : return;
979 : }
980 : }
981 :
982 : // we do not yet know our IP address, we cannot support an election just yet
983 : //
984 72 : if(f_my_ip_address.is_default())
985 : {
986 : // this should not be possible since we expect to receive the
987 : // REGISTER's reply before any other message
988 : //
989 : return; // LCOV_EXCL_LINE
990 : }
991 :
992 : // neighbors count is 0 until we receive a very first CLUSTER_UP
993 : // (note that it does not go back to zero on CLUSTER_DOWN, however,
994 : // the quorum as checked in the next if() is never going to be
995 : // reached if the cluster is down)
996 : //
997 72 : if(f_neighbors_count == 0)
998 : {
999 0 : return;
1000 : }
1001 :
1002 : // this one probably looks complicated...
1003 : //
1004 : // if our quorum is 1 or 2 then we need a number of computers
1005 : // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
1006 : // status which we compute here)
1007 : //
1008 144 : if(f_neighbors_quorum < 3
1009 72 : && f_computers.size() < f_neighbors_count)
1010 : {
1011 28 : return;
1012 : }
1013 :
1014 : // since the neighbors count & quorum never go back to zero (on a
1015 : // CLUSTER_DOWN) we have to verify that the number of computers is
1016 : // acceptable here
1017 : //
1018 : // Note: further we will not count computers marked disabled, which
1019 : // is done below when sorting by ID, however, that does not
1020 : // prevent the quorum to be attained, even with disabled
1021 : // computers
1022 : //
1023 44 : if(f_computers.size() < f_neighbors_quorum)
1024 : {
1025 12 : return;
1026 : }
1027 :
1028 : // to proceed with an election we must have the smallest IP address
1029 : // (it is not absolutely required, but that way we avoid many
1030 : // consensus problems, in effect we have one "temporary-leader" that ends
1031 : // up telling us who the final three leaders are)
1032 : //
1033 : // TODO: verify that this works properly in a non-complete cluster
1034 : //
1035 118 : for(auto & c : f_computers)
1036 : {
1037 : // Note: the test fails when we compare to ourselves so we do not
1038 : // need any special case
1039 : //
1040 93 : if(c.second->get_ip_address() < f_my_ip_address)
1041 : {
1042 7 : return;
1043 : }
1044 : }
1045 :
1046 : // to select the leaders sort them by identifier and take the first
1047 : // three (i.e. lower priority, random, IP, pid.)
1048 : //
1049 25 : int off(0);
1050 25 : computer::map_t sort_by_id;
1051 104 : for(auto const & c : f_computers)
1052 : {
1053 : // ignore nodes with a priority of 15 (i.e. OFF)
1054 : //
1055 79 : if(c.second->get_priority() != computer::PRIORITY_OFF)
1056 : {
1057 50 : sort_by_id[c.second->get_id()] = c.second;
1058 : }
1059 : else
1060 : {
1061 29 : ++off;
1062 : }
1063 : }
1064 :
1065 75 : for(auto const & s : sort_by_id)
1066 : {
1067 50 : SNAP_LOG_WARNING << "--- sort by ID: " << s.first << SNAP_LOG_SEND;
1068 : }
1069 :
1070 25 : bool too_many_computers_off(false);
1071 25 : if(f_computers.size() <= 3)
1072 : {
1073 18 : if(off != 0
1074 18 : && f_computers.size() >= f_neighbors_count)
1075 : {
1076 9 : SNAP_LOG_FATAL
1077 : << "you cannot have any cluck computer turned OFF when you"
1078 : " have three or less computers total in your cluster."
1079 : " The elections cannot be completed in these"
1080 : " conditions."
1081 : << SNAP_LOG_SEND;
1082 9 : too_many_computers_off = true;
1083 : }
1084 : }
1085 7 : else if(f_computers.size() - off < 3
1086 7 : && f_computers.size() >= f_neighbors_count)
1087 : {
1088 2 : SNAP_LOG_FATAL
1089 1 : << "you have a total of "
1090 1 : << f_computers.size()
1091 1 : << " computers in your cluster. You turned off "
1092 : << off
1093 : << " of them, which means less than three are left"
1094 : " as candidates for leadership which is not enough."
1095 1 : " You can have a maximum of "
1096 1 : << f_computers.size() - 3
1097 : << " that are turned off on this cluster."
1098 : << SNAP_LOG_SEND;
1099 1 : too_many_computers_off = true;
1100 : }
1101 25 : if(too_many_computers_off)
1102 : {
1103 : // only generate the flag once we reach the CLUSTER_COMPLETE status
1104 : // (we cannot be sure that the `off` variable is valid until then)
1105 : //
1106 10 : if(f_computers.size() >= f_neighbors_count)
1107 : {
1108 : // this is something that breaks the entire system so someone
1109 : // needs to fix it and thus it has a really high priority
1110 : //
1111 90 : communicatord::flag::pointer_t flag(COMMUNICATORD_FLAG_UP(
1112 : "cluckd"
1113 : , "election"
1114 : , "instances-off"
1115 : , "the cluck daemon detected that too many of the"
1116 : " daemons have their priority set to OFF;"
1117 : " you must turn some of these back ON."
1118 40 : ));
1119 10 : flag->set_priority(99);
1120 10 : flag->add_tag("settings");
1121 10 : flag->set_manual_down(true);
1122 10 : flag->save();
1123 10 : }
1124 10 : return;
1125 : }
1126 :
1127 : // an election works as soon as we have at least 2 leaders
1128 : // or we reached the total number of computers
1129 : //
1130 15 : if(sort_by_id.size() < 2
1131 15 : && sort_by_id.size() != f_computers.size())
1132 : {
1133 1 : return;
1134 : }
1135 :
1136 : //std::cerr << f_communicator_port << " is conducting an election:\n";
1137 : //for(auto s : sort_by_id)
1138 : //{
1139 : //std::cerr << " " << s.second->get_name() << " " << s.first << "\n";
1140 : //}
1141 :
1142 : // the first three are the new leaders
1143 : //
1144 14 : ed::message lock_leaders_message;
1145 14 : lock_leaders_message.set_command(cluck::g_name_cluck_cmd_lock_leaders);
1146 14 : lock_leaders_message.set_service(communicatord::g_name_communicatord_server_any);
1147 14 : f_leaders.clear();
1148 14 : f_election_date = snapdev::now();
1149 14 : lock_leaders_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1150 14 : auto leader(sort_by_id.begin());
1151 14 : std::size_t const max(std::min(static_cast<computer::map_t::size_type>(3), sort_by_id.size()));
1152 42 : for(std::size_t idx(0); idx < max; ++idx, ++leader)
1153 : {
1154 56 : lock_leaders_message.add_parameter(
1155 56 : cluck::g_name_cluck_param_leader + std::to_string(idx)
1156 28 : , leader->second->get_id());
1157 28 : f_leaders.push_back(leader->second);
1158 : }
1159 14 : f_messenger->send_message(lock_leaders_message);
1160 :
1161 : #if 1
1162 28 : SNAP_LOG_WARNING
1163 14 : << "election status = add leader(s)... "
1164 14 : << f_computers.size()
1165 14 : << " computers and "
1166 14 : << f_leaders.size()
1167 : << " leaders."
1168 : << SNAP_LOG_SEND;
1169 : #endif
1170 :
1171 : // we need to synchronize from this cluckd daemon if it is one of the
1172 : // leaders otherwise we won't get the LOCK_STARTED which would also
1173 : // call that function
1174 : //
1175 14 : synchronize_leaders();
1176 25 : }
1177 :
1178 :
1179 91 : void cluckd::check_lock_status()
1180 : {
1181 91 : bool const lock_status(is_daemon_ready());
1182 91 : if(f_lock_status == lock_status)
1183 : {
1184 72 : return;
1185 : }
1186 19 : f_lock_status = lock_status;
1187 :
1188 19 : ed::message status_message;
1189 19 : status_message.set_command(f_lock_status
1190 : ? cluck::g_name_cluck_cmd_lock_ready
1191 : : cluck::g_name_cluck_cmd_no_lock);
1192 19 : status_message.set_service(communicatord::g_name_communicatord_server_me);
1193 19 : status_message.add_parameter(communicatord::g_name_communicatord_param_cache, communicatord::g_name_communicatord_value_no);
1194 19 : f_messenger->send_message(status_message);
1195 :
1196 19 : if(lock_status
1197 19 : && !f_message_cache.empty())
1198 : {
1199 : // we still have a cache of locks that can now be processed
1200 : //
1201 : // note:
1202 : // although msg_lock() could re-add some of those messages
1203 : // in the f_message_cache vector, it should not since it
1204 : // calls the same is_daemon_ready() function which we know returns
1205 : // true and therefore no cache is required
1206 : //
1207 2 : message_cache::list_t cache;
1208 2 : cache.swap(f_message_cache);
1209 4 : for(auto & mc : cache)
1210 : {
1211 2 : msg_lock(mc.f_message);
1212 : }
1213 2 : }
1214 19 : }
1215 :
1216 :
1217 77 : void cluckd::send_lock_started(ed::message const * msg)
1218 : {
1219 : // tell other cluck daemon instances that are already listening that
1220 : // we are ready; this way we can calculate the number of computers
1221 : // available in our network and use that to calculate the QUORUM
1222 : //
1223 77 : ed::message lock_started_message;
1224 77 : lock_started_message.set_command(cluck::g_name_cluck_cmd_lock_started);
1225 77 : if(msg == nullptr)
1226 : {
1227 24 : lock_started_message.set_service(communicatord::g_name_communicatord_service_public_broadcast);
1228 :
1229 : // unfortunately, the following does NOT work as expected...
1230 : // (i.e. the following ends up sending the message to ourselves only
1231 : // and does not forward to any remote communicators).
1232 : //
1233 : //lock_started_message.set_server(communicatord::g_name_communicatord_server_any);
1234 : //lock_started_message.set_service(cluck::g_name_cluck_service_name);
1235 : }
1236 : else
1237 : {
1238 53 : lock_started_message.reply_to(*msg);
1239 : }
1240 :
1241 : // our info: server name and id
1242 : //
1243 77 : lock_started_message.add_parameter(communicatord::g_name_communicatord_param_server_name, f_server_name);
1244 77 : lock_started_message.add_parameter(cluck::g_name_cluck_param_lock_id, f_my_id);
1245 77 : lock_started_message.add_parameter(cluck::g_name_cluck_param_start_time, f_start_time);
1246 :
1247 : // include the leaders if present
1248 : //
1249 77 : if(!f_leaders.empty())
1250 : {
1251 26 : lock_started_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1252 88 : for(size_t idx(0); idx < f_leaders.size(); ++idx)
1253 : {
1254 124 : lock_started_message.add_parameter(
1255 124 : cluck::g_name_cluck_param_leader + std::to_string(idx)
1256 62 : , f_leaders[idx]->get_id());
1257 : }
1258 : }
1259 :
1260 77 : f_messenger->send_message(lock_started_message);
1261 154 : }
1262 :
1263 :
1264 : /** \brief Called whenever we receive the STOP command or equivalent.
1265 : *
1266 : * This function makes sure the cluck daemon exits as quickly as
1267 : * possible. This means unregistering all the daemon's connections
1268 : * from the communicator.
1269 : *
1270 : * If possible, the function sends an UNREGISTER message to the
1271 : * communicator daemon.
1272 : *
1273 : * \param[in] quitting Set to true if we received a QUITTING message (false
1274 : * usually means we received a STOP message).
1275 : */
1276 49 : void cluckd::stop(bool quitting)
1277 : {
1278 49 : if(f_messenger != nullptr)
1279 : {
1280 25 : f_messenger->unregister_fluid_settings(quitting);
1281 25 : f_communicator->remove_connection(f_messenger);
1282 25 : f_messenger.reset();
1283 : }
1284 :
1285 49 : if(f_communicator != nullptr)
1286 : {
1287 49 : f_communicator->remove_connection(f_interrupt);
1288 49 : f_interrupt.reset();
1289 :
1290 49 : f_communicator->remove_connection(f_timer);
1291 49 : f_timer.reset();
1292 : }
1293 49 : }
1294 :
1295 :
1296 : /** \brief Make sure the very first ticket is marked as LOCKED.
1297 : *
1298 : * This function is called whenever the f_tickets map changes
1299 : * (more specifically, one of its children) to make sure
1300 : * that the first ticket is clearly marked as being locked.
1301 : * Most of the time this happens when we add and when we remove
1302 : * tickets.
1303 : *
1304 : * Note that the function may be called many times even though the
1305 : * first ticket does not actually change. Generally this is fine
1306 : * although each time it sends an ACTIVATE_LOCK message so we want
1307 : * to limit the number of calls to make sure we do not send too
1308 : * many possibly confusing messages.
1309 : *
1310 : * \note
1311 : * We need the ACTIVATE_LOCK and LOCK_ACTIVATED messages to make sure
1312 : * that we only activate the very first lock which we cannot be sure
1313 : * of on our own because all the previous messages are using the
1314 : * QUORUM as expected and thus our table of locks may not be complete
1315 : * at any one time.
1316 : *
1317 : * \param[in] object_name The name of the object which very
1318 : * first ticket may have changed.
1319 : */
1320 227 : void cluckd::activate_first_lock(std::string const & object_name)
1321 : {
1322 227 : auto ticket(find_first_lock(object_name));
1323 :
1324 227 : if(ticket != nullptr)
1325 : {
1326 : // there is what we think is the first ticket
1327 : // that should be actived now; we need to share
1328 : // with the other 2 leaders to make sure of that
1329 : //
1330 217 : ticket->activate_lock();
1331 : }
1332 454 : }
1333 :
1334 :
1335 341 : ticket::pointer_t cluckd::find_first_lock(std::string const & object_name)
1336 : {
1337 341 : ticket::pointer_t first_ticket;
1338 341 : auto const obj_ticket(f_tickets.find(object_name));
1339 :
1340 341 : if(obj_ticket != f_tickets.end())
1341 : {
1342 : // loop through making sure that we activate a ticket only
1343 : // if the obtention date was not already reached; if that
1344 : // date was reached before we had the time to activate the
1345 : // lock, then the client should have abandonned the lock
1346 : // request anyway...
1347 : //
1348 : // (this is already done in the cleanup(), but a couple of
1349 : // other functions may call the activate_first_lock()
1350 : // function!)
1351 : //
1352 20375 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1353 : {
1354 20044 : if(key_ticket->second->timed_out())
1355 : {
1356 : // that ticket timed out, send an UNLOCKING, UNLOCKED,
1357 : // or LOCK_FAILED message and get rid of it
1358 : //
1359 0 : key_ticket->second->lock_failed("timed out while searching for first lock");
1360 0 : if(key_ticket->second->timed_out())
1361 : {
1362 : // still timed out, remove it
1363 : //
1364 0 : key_ticket = obj_ticket->second.erase(key_ticket);
1365 : }
1366 : }
1367 : else
1368 : {
1369 20044 : if(first_ticket == nullptr)
1370 : {
1371 331 : first_ticket = key_ticket->second;
1372 : }
1373 20044 : ++key_ticket;
1374 : }
1375 : }
1376 :
1377 331 : if(obj_ticket->second.empty())
1378 : {
1379 : // it is empty now, get rid of that set of tickets
1380 : //
1381 0 : f_tickets.erase(obj_ticket);
1382 : }
1383 : }
1384 :
1385 682 : return first_ticket;
1386 0 : }
1387 :
1388 :
1389 : /** \brief Synchronize leaders.
1390 : *
1391 : * This function sends various events to the other two leaders in order
1392 : * to get them to synchronize the tickets this cluck daemon currently holds.
1393 : *
1394 : * Only leaders make use of this function.
1395 : *
1396 : * Synchronization is necessary whenever a leader dies and another gets
1397 : * elected as a replacement. The replacement would have no idea of the
1398 : * old tickets. This function makes sure that such doesn't occur.
1399 : *
1400 : * \note
1401 : * Our test checks the validity when ONE leader is lost. If TWO of the
1402 : * leaders are lost at once, the algorithm may not be 100% reliable.
1403 : * Especially, the remaining leader may not have all the necessary
1404 : * information to restore all the tickets as they were expected to be.
1405 : *
1406 : * \warning
1407 : * A ticket that just arrived to a leader and was not yet forwarded to
1408 : * the others with the LOCK_ENTERING message is going to be lost no
1409 : * matter what.
1410 : */
1411 23 : void cluckd::synchronize_leaders()
1412 : {
1413 : // there is nothing to do if we are by ourselves because we cannot
1414 : // gain any type of concensus unless we are expected to be the only
1415 : // one in which case there is no synchronization requirements anyway
1416 : //
1417 23 : if(f_leaders.size() <= 1)
1418 : {
1419 4 : return;
1420 : }
1421 :
1422 : // only leaders can synchronize each others
1423 : // (other cluck daemons do not have any tickets to synchronize)
1424 : //
1425 19 : if(is_leader() == nullptr)
1426 : {
1427 5 : return;
1428 : }
1429 :
1430 : // determine whether we are leader #0 or not, if zero, then we
1431 : // call msg_lock() directly, otherwise we do a f_messenger->send_message()
1432 : //
1433 : // TODO: review the logic here, I do not think that leader0 has anything
1434 : // to do with lock requests and tickets; instead, I think that the
1435 : // sharing that needs to happen is between the old and the new
1436 : // leaders (i.e. when we lose a leader and assign another computer
1437 : // as a new leader to compensate, that new leader needs to get
1438 : // all the info which is what the LOCK_TICKETS message is about)
1439 : //
1440 14 : bool const leader0(f_leaders[0]->get_id() == f_my_id);
1441 :
1442 : // a vector of messages for which we have to call msg_lock()
1443 : //
1444 14 : ed::message::vector_t local_locks;
1445 :
1446 : // if entering a ticket is definitely not locked, although it
1447 : // could be ready (one step away from being locked!) we still
1448 : // restart the whole process with the new leaders if such
1449 : // exist
1450 : //
1451 : // Note: of course we restart the process only if the owner
1452 : // was that one leader that disappeared, not if the
1453 : // ticket is owned by a remaining leader
1454 : //
1455 15 : for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); ++obj_entering)
1456 : {
1457 2 : for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1458 : {
1459 1 : std::string const owner_name(key_entering->second->get_owner());
1460 1 : auto key_leader(std::find_if(
1461 : f_leaders.begin()
1462 : , f_leaders.end()
1463 2 : , [&owner_name](auto const & l)
1464 : {
1465 2 : return l->get_name() == owner_name;
1466 : }));
1467 1 : if(key_leader == f_leaders.end())
1468 : {
1469 : // give new ownership to leader[0]
1470 : //
1471 0 : ed::message lock_message;
1472 0 : lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1473 0 : lock_message.set_server(f_leaders[0]->get_name());
1474 0 : lock_message.set_service(cluck::g_name_cluck_service_name);
1475 0 : lock_message.set_sent_from_server(key_entering->second->get_server_name());
1476 0 : lock_message.set_sent_from_service(key_entering->second->get_service_name());
1477 0 : lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_entering->second->get_object_name());
1478 0 : lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_entering->second->get_tag());
1479 0 : lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_entering->second->get_client_pid());
1480 0 : lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_entering->second->get_obtention_timeout());
1481 0 : lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_entering->second->get_lock_duration());
1482 0 : lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_entering->second->get_unlock_duration());
1483 0 : if(leader0)
1484 : {
1485 : // we are leader #0 so directly call msg_lock()
1486 : //
1487 : // first we remove the entry otherwise we get a duplicate
1488 : // error since we try to re-add the same ticket
1489 : //
1490 0 : key_entering = obj_entering->second.erase(key_entering);
1491 0 : local_locks.push_back(lock_message);
1492 : }
1493 : else
1494 : {
1495 : // we are not leader #0, so send the message to it
1496 : //
1497 0 : ++key_entering;
1498 0 : lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_entering->second->get_serial());
1499 0 : f_messenger->send_message(lock_message);
1500 : }
1501 0 : }
1502 : else
1503 : {
1504 1 : ++key_entering;
1505 : }
1506 1 : }
1507 : }
1508 :
1509 : // a ticket may still be unlocked in which case we want to
1510 : // restart the lock process as if still entering
1511 : //
1512 : // if locked, a ticket is assigned leader0 as its new owner so
1513 : // further work on that ticket works as expected
1514 : //
1515 14 : std::string serialized;
1516 16 : for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); ++obj_ticket)
1517 : {
1518 4 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1519 : {
1520 2 : std::string const owner_name(key_ticket->second->get_owner());
1521 2 : auto key_leader(std::find_if(
1522 : f_leaders.begin()
1523 : , f_leaders.end()
1524 3 : , [&owner_name](auto const & l)
1525 : {
1526 3 : return l->get_name() == owner_name;
1527 : }));
1528 2 : if(key_ticket->second->is_locked())
1529 : {
1530 : // if ticket was locked by the leader that disappeared, we
1531 : // transfer ownership to leader #0
1532 : //
1533 2 : if(key_leader == f_leaders.end())
1534 : {
1535 0 : key_ticket->second->set_owner(f_leaders[0]->get_name());
1536 : }
1537 :
1538 : // and send that ticket to the other leaders to make sure
1539 : // they all agree on its current state
1540 : //
1541 2 : serialized += key_ticket->second->serialize();
1542 2 : serialized += '\n';
1543 :
1544 2 : ++key_ticket;
1545 : }
1546 : else
1547 : {
1548 : // it was not locked yet, restart the LOCK process from
1549 : // the very beginning
1550 : //
1551 0 : if(key_leader == f_leaders.end())
1552 : {
1553 : // give new ownership to leader[0]
1554 : //
1555 0 : ed::message lock_message;
1556 0 : lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1557 0 : lock_message.set_server(f_leaders[0]->get_name());
1558 0 : lock_message.set_service(cluck::g_name_cluck_service_name);
1559 0 : lock_message.set_sent_from_server(key_ticket->second->get_server_name());
1560 0 : lock_message.set_sent_from_service(key_ticket->second->get_service_name());
1561 0 : lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_ticket->second->get_object_name());
1562 0 : lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_ticket->second->get_tag());
1563 0 : lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_ticket->second->get_client_pid());
1564 0 : lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_ticket->second->get_obtention_timeout());
1565 0 : lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_ticket->second->get_lock_duration());
1566 0 : lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_ticket->second->get_unlock_duration());
1567 0 : if(leader0)
1568 : {
1569 : // we are leader #0 so directly call msg_lock()
1570 : //
1571 0 : key_ticket = obj_ticket->second.erase(key_ticket);
1572 0 : local_locks.push_back(lock_message);
1573 : }
1574 : else
1575 : {
1576 : // we are not leader #0, so send the message to it
1577 : //
1578 0 : ++key_ticket;
1579 0 : lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_ticket->second->get_serial());
1580 0 : f_messenger->send_message(lock_message);
1581 : }
1582 0 : }
1583 : else
1584 : {
1585 0 : ++key_ticket;
1586 : }
1587 : }
1588 2 : }
1589 : }
1590 :
1591 : // we send those after the loops above because the msg_lock() is
1592 : // not unlikely to change the f_entering_tickets map and looping
1593 : // through it when another function is going to modify it is not
1594 : // wise
1595 : //
1596 14 : for(auto lm : local_locks)
1597 : {
1598 0 : msg_lock(lm);
1599 0 : }
1600 :
1601 : // send LOCK_TICkETS if there is serialized ticket data
1602 : //
1603 14 : if(!serialized.empty())
1604 : {
1605 2 : ed::message lock_tickets_message;
1606 2 : lock_tickets_message.set_command(cluck::g_name_cluck_cmd_lock_tickets);
1607 2 : lock_tickets_message.set_service(cluck::g_name_cluck_service_name);
1608 2 : lock_tickets_message.add_parameter(cluck::g_name_cluck_param_tickets, serialized);
1609 :
1610 2 : auto const la(get_leader_a());
1611 2 : if(la != nullptr)
1612 : {
1613 2 : lock_tickets_message.set_server(la->get_name());
1614 2 : f_messenger->send_message(lock_tickets_message);
1615 :
1616 2 : auto const lb(get_leader_b());
1617 2 : if(lb != nullptr)
1618 : {
1619 2 : lock_tickets_message.set_server(lb->get_name());
1620 2 : f_messenger->send_message(lock_tickets_message);
1621 : }
1622 2 : }
1623 2 : }
1624 14 : }
1625 :
1626 :
1627 : /** \brief Forward a user message to a leader.
1628 : *
1629 : * The user may send a LOCK or an UNLOCK command to the cluck daemon.
1630 : * Those messages need to be forwarded to a leader to work as expected.
1631 : * If we are not a leader, then we need to call this function to
1632 : * forward the message (this daemon acts as a proxy).
1633 : *
1634 : * Note that we do not make a copy of the message because we do not expect
1635 : * it to be used any further after this call so we may as well update that
1636 : * message. It should not be destructive at all anyway.
1637 : *
1638 : * \param[in,out] msg The message being forwarded to a leader.
1639 : */
1640 3 : void cluckd::forward_message_to_leader(ed::message & msg)
1641 : {
1642 : // for safety, we do not call this function if the daemon is not
1643 : // considered ready meaning it has at least one leader
1644 : //
1645 3 : if(f_leaders.empty())
1646 : {
1647 : return; // LCOV_EXCL_LINE
1648 : }
1649 :
1650 : // we are not a leader, we work as a proxy by forwarding the
1651 : // message to a leader, we add our trail so the LOCKED and
1652 : // other messages can be proxied back
1653 : //
1654 : // Note: using the get_sent_from_server() means that we may not
1655 : // even see the returned message, it may be proxied to another
1656 : // server directly or through another route
1657 : //
1658 3 : msg.set_service(cluck::g_name_cluck_service_name);
1659 3 : msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_server_name, msg.get_sent_from_server());
1660 3 : msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_service_name, msg.get_sent_from_service());
1661 :
1662 3 : f_next_leader = (f_next_leader + 1) % f_leaders.size();
1663 3 : msg.set_server(f_leaders[f_next_leader]->get_name());
1664 :
1665 3 : f_messenger->send_message(msg);
1666 : }
1667 :
1668 :
1669 : /** \brief Clean timed out entries if any.
1670 : *
1671 : * This function goes through the list of tickets and entering
1672 : * entries and removes any one of them that timed out. This is
1673 : * important if a process dies and does not properly remove
1674 : * its locks.
1675 : *
1676 : * When the timer gets its process_timeout() function called,
1677 : * it ends up calling this function to clean up any lock that
1678 : * has timed out.
1679 : */
1680 628 : void cluckd::cleanup()
1681 : {
1682 628 : cluck::timeout_t next_timeout(snapdev::timespec_ex::max());
1683 :
1684 : // when we receive LOCK requests before we have leaders elected, they
1685 : // get added to our cache, so do some cache clean up when not empty
1686 : //
1687 628 : cluck::timeout_t const now(snapdev::now());
1688 634 : for(auto c(f_message_cache.begin()); c != f_message_cache.end(); )
1689 : {
1690 6 : if(c->f_timeout <= now)
1691 : {
1692 3 : std::string object_name;
1693 3 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
1694 3 : pid_t client_pid(0);
1695 3 : cluck::timeout_t timeout;
1696 3 : if(!get_parameters(c->f_message, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
1697 : {
1698 : // we should never cache messages that are invalid
1699 : //
1700 : throw cluck::logic_error("cluck::cleanup() of LOCK message failed get_parameters()."); // LCOV_EXCL_LINE
1701 : }
1702 :
1703 6 : SNAP_LOG_WARNING
1704 : << "Lock on \""
1705 : << object_name
1706 3 : << "\" / \""
1707 : << client_pid
1708 3 : << "\" / \""
1709 3 : << tag
1710 : << "\" timed out before leaders were known."
1711 : << SNAP_LOG_SEND;
1712 :
1713 12 : std::string const server_name(c->f_message.has_parameter("lock_proxy_server_name")
1714 1 : ? c->f_message.get_parameter("lock_proxy_server_name")
1715 9 : : c->f_message.get_sent_from_server());
1716 12 : std::string const service_name(c->f_message.has_parameter("lock_proxy_service_name")
1717 1 : ? c->f_message.get_parameter("lock_proxy_service_name")
1718 9 : : c->f_message.get_sent_from_service());
1719 9 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
1720 :
1721 3 : ed::message lock_failed_message;
1722 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
1723 3 : lock_failed_message.set_service(service_name);
1724 3 : lock_failed_message.set_server(server_name);
1725 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
1726 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
1727 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
1728 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
1729 : #ifndef CLUCKD_OPTIMIZATIONS
1730 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "cleanup() found a timed out lock");
1731 : #endif
1732 3 : f_messenger->send_message(lock_failed_message);
1733 :
1734 3 : c = f_message_cache.erase(c);
1735 3 : }
1736 : else
1737 : {
1738 3 : if(c->f_timeout < next_timeout)
1739 : {
1740 2 : next_timeout = c->f_timeout;
1741 : }
1742 3 : ++c;
1743 : }
1744 : }
1745 :
1746 : // remove any f_ticket that timed out
1747 : //
1748 1021 : for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); )
1749 : {
1750 393 : bool try_activate(false);
1751 20522 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1752 : {
1753 20129 : bool move_next(true);
1754 20129 : if(key_ticket->second->timed_out())
1755 : {
1756 9 : key_ticket->second->lock_failed("ticket: timed out while cleaning up");
1757 9 : if(key_ticket->second->timed_out())
1758 : {
1759 : // still timed out, remove it
1760 : //
1761 4 : key_ticket = obj_ticket->second.erase(key_ticket);
1762 4 : try_activate = true;
1763 4 : move_next = false;
1764 : }
1765 : }
1766 20129 : if(move_next)
1767 : {
1768 20125 : if(key_ticket->second->get_current_timeout_date() < next_timeout)
1769 : {
1770 356 : next_timeout = key_ticket->second->get_current_timeout_date();
1771 : }
1772 20125 : ++key_ticket;
1773 : }
1774 : }
1775 :
1776 393 : if(obj_ticket->second.empty())
1777 : {
1778 4 : obj_ticket = f_tickets.erase(obj_ticket);
1779 : }
1780 : else
1781 : {
1782 389 : if(try_activate)
1783 : {
1784 : // something was erased, a new ticket may be first
1785 : //
1786 0 : activate_first_lock(obj_ticket->first);
1787 : }
1788 :
1789 389 : ++obj_ticket;
1790 : }
1791 : }
1792 :
1793 : // remove any f_entering_tickets that timed out
1794 : //
1795 1055 : for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); )
1796 : {
1797 16525 : for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1798 : {
1799 16098 : if(key_entering->second->timed_out())
1800 : {
1801 1 : key_entering->second->lock_failed("entering ticket: timed out while cleanup");
1802 1 : if(key_entering->second->timed_out())
1803 : {
1804 : // still timed out, remove it
1805 : //
1806 1 : key_entering = obj_entering->second.erase(key_entering);
1807 : }
1808 : }
1809 : else
1810 : {
1811 16097 : if(key_entering->second->get_current_timeout_date() < next_timeout)
1812 : {
1813 277 : next_timeout = key_entering->second->get_current_timeout_date();
1814 : }
1815 16097 : ++key_entering;
1816 : }
1817 : }
1818 :
1819 427 : if(obj_entering->second.empty())
1820 : {
1821 1 : obj_entering = f_entering_tickets.erase(obj_entering);
1822 : }
1823 : else
1824 : {
1825 426 : ++obj_entering;
1826 : }
1827 : }
1828 :
1829 : // got a new timeout?
1830 : //
1831 628 : if(next_timeout != snapdev::timespec_ex::max())
1832 : {
1833 : // we add one second to avoid looping like crazy
1834 : // if we timeout just around the "wrong" time
1835 : //
1836 596 : f_timer->set_timeout_date(next_timeout + cluck::timeout_t(1, 0));
1837 : }
1838 : else
1839 : {
1840 32 : f_timer->set_timeout_date(-1);
1841 : }
1842 628 : }
1843 :
1844 :
1845 : /** \brief Determine the last ticket defined in this cluck daemon.
1846 : *
1847 : * This function loops through the existing tickets and returns the
1848 : * largest ticket number it finds.
1849 : *
1850 : * Note that the number returned is the last ticket. At some point,
1851 : * the caller needs to add one to this number before assigning the
1852 : * result to a new ticket.
1853 : *
1854 : * If no ticket were defined for \p object_name or we are dealing with
1855 : * that object very first ticket, then the function returns NO_TICKET
1856 : * (which is 0).
1857 : *
1858 : * \param[in] object_name The name of the object for which the last ticket
1859 : * number is being sought.
1860 : *
1861 : * \return The last ticket number or NO_TICKET.
1862 : */
1863 118 : ticket::ticket_id_t cluckd::get_last_ticket(std::string const & object_name)
1864 : {
1865 118 : ticket::ticket_id_t last_ticket(ticket::NO_TICKET);
1866 :
1867 : // Note: There is no need to check the f_entering_tickets list
1868 : // since that one does not yet have any ticket number assigned
1869 : // and thus the maximum there would return 0 every time
1870 : //
1871 118 : auto obj_ticket(f_tickets.find(object_name));
1872 118 : if(obj_ticket != f_tickets.end())
1873 : {
1874 : // note:
1875 : // the std::max_element() algorithm would require many more
1876 : // get_ticket_number() when our loop uses one per ticket
1877 : //
1878 2 : for(auto key_ticket : obj_ticket->second)
1879 : {
1880 1 : ticket::ticket_id_t const ticket_number(key_ticket.second->get_ticket_number());
1881 1 : if(ticket_number > last_ticket)
1882 : {
1883 1 : last_ticket = ticket_number;
1884 : }
1885 1 : }
1886 : }
1887 :
1888 118 : return last_ticket;
1889 : }
1890 :
1891 :
1892 : /** \brief Set the ticket.
1893 : *
1894 : * Once a ticket was assigned a valid identifier (see get_last_ticket())
1895 : * it can be saved as a ticket. This function does that. Now this is
1896 : * an official ticket.
1897 : *
1898 : * \param[in] object_name The name of the object being locked.
1899 : * \param[in] key The ticket key (3 segments).
1900 : * \param[in] ticket The ticket object being added.
1901 : */
1902 117 : void cluckd::set_ticket(
1903 : std::string const & object_name
1904 : , std::string const & key
1905 : , ticket::pointer_t ticket)
1906 : {
1907 117 : f_tickets[object_name][key] = ticket;
1908 117 : }
1909 :
1910 :
1911 : /** \brief Get a reference to the list of entering tickets.
1912 : *
1913 : * This function returns a constant reference to the list of entering
1914 : * tickets. This is used by the ticket::add_ticket() function
1915 : * in order to know once all entering tickets are done so the algorithm
1916 : * can move forward.
1917 : *
1918 : * \param[in] object_name The name of the object being locked.
1919 : *
1920 : * \return A constant copy of the list of entering tickets.
1921 : */
1922 3 : ticket::key_map_t const cluckd::get_entering_tickets(std::string const & object_name)
1923 : {
1924 3 : auto const it(f_entering_tickets.find(object_name));
1925 3 : if(it == f_entering_tickets.end())
1926 : {
1927 : // LCOV_EXCL_START
1928 : //
1929 : // I could not get this covered and I do not think it can happen,
1930 : // also the caller function does not verify that the returned map
1931 : // is valid so it is safer this way
1932 : //
1933 : throw std::logic_error(
1934 : "could not find entering ticket with object name: \""
1935 : + object_name
1936 : + "\".");
1937 : // LCOV_EXCL_STOP
1938 : }
1939 :
1940 6 : return it->second;
1941 : }
1942 :
1943 :
1944 : /** \brief Used to simulate a LOCK_EXITING message.
1945 : *
1946 : * This function is called to simulate sending a LOCK_EXITING to the
1947 : * cluckd object from the ticket object.
1948 : *
1949 : * \param[in] msg The LOCK_EXITING message with proper object name, tag,
1950 : * and key.
1951 : */
1952 114 : void cluckd::lock_exiting(ed::message & msg)
1953 : {
1954 114 : msg_lock_exiting(msg);
1955 114 : }
1956 :
1957 :
1958 :
1959 :
1960 :
1961 :
1962 2 : std::string cluckd::serialized_tickets()
1963 : {
1964 2 : std::stringstream result;
1965 :
1966 4 : for(auto const & obj_ticket : f_tickets)
1967 : {
1968 4 : for(auto const & key_ticket : obj_ticket.second)
1969 : {
1970 : result
1971 4 : << key_ticket.second->serialize()
1972 4 : << '\n';
1973 : }
1974 : }
1975 :
1976 4 : return result.str();
1977 2 : }
1978 :
1979 :
1980 : /** \brief Try to get a set of parameters.
1981 : *
1982 : * This function attempts to get the specified set of parameters from the
1983 : * specified message.
1984 : *
1985 : * The function throws if a parameter is missing or invalid (i.e. passed
1986 : * a string when an integer was expected).
1987 : *
1988 : * When defined, the \p client_pid parameter is expected to be a positive
1989 : * integer. Any other number makes the function emit an error and return
1990 : * false.
1991 : *
1992 : * \note
1993 : * The timeout parameter is always viewed as optional. It is set to
1994 : * "now + cluck::CLUCK_LOCK_DEFAULT_TIMEOUT" if undefined in the message.
1995 : * If specified in the message, there is no minimum or maximum
1996 : * (i.e. it may already have timed out).
1997 : *
1998 : * \param[in] message The message from which we get parameters.
1999 : * \param[out] object_name A pointer to an std::string that receives the object name.
2000 : * \param[out] tag A pointer to a tag_t that receives the tag number.
2001 : * \param[out] client_pid A pointer to a pid_t that receives the client pid.
2002 : * \param[out] timeout A pointer to an cluck::timeout_t that receives the timeout date.
2003 : * \param[out] key A pointer to an std::string that receives the key parameter.
2004 : * \param[out] source A pointer to an std::string that receives the source parameter.
2005 : *
2006 : * \return true if all specified parameters could be retrieved as expected,
2007 : * false if parameters are either missing or invalid.
2008 : */
2009 1005 : bool cluckd::get_parameters(
2010 : ed::message const & msg
2011 : , std::string * object_name
2012 : , ed::dispatcher_match::tag_t * tag
2013 : , pid_t * client_pid
2014 : , cluck::timeout_t * timeout
2015 : , std::string * key
2016 : , std::string * source)
2017 : {
2018 : // get the "object name" (what we are locking)
2019 : // in Snap, the object name is often a URI plus the action we are performing
2020 : //
2021 1005 : if(object_name != nullptr)
2022 : {
2023 1005 : *object_name = msg.get_parameter(cluck::g_name_cluck_param_object_name);
2024 : }
2025 :
2026 : // the same application may want to hold multiple locks simultaneously
2027 : // and this is made possible by using a tag (a 16 bits number)
2028 : //
2029 1005 : if(tag != nullptr)
2030 : {
2031 1005 : *tag = msg.get_integer_parameter(cluck::g_name_cluck_param_tag);
2032 : }
2033 :
2034 : // get the pid (process identifier) of the process that is
2035 : // requesting the lock; this is important to be able to distinguish
2036 : // multiple processes on the same computer requesting a lock
2037 : //
2038 1005 : if(client_pid != nullptr)
2039 : {
2040 262 : *client_pid = msg.get_integer_parameter(cluck::g_name_cluck_param_pid);
2041 262 : if(*client_pid < 1)
2042 : {
2043 : // invalid pid
2044 : //
2045 6 : SNAP_LOG_NOISY_ERROR
2046 : << "cluckd::get_parameters(): invalid pid specified for a lock ("
2047 4 : << std::to_string(*client_pid)
2048 : << "); it must be a positive decimal number."
2049 : << SNAP_LOG_SEND;
2050 2 : return false;
2051 : }
2052 : }
2053 :
2054 : // get the time limit we will wait up to before we decide we
2055 : // cannot obtain that lock
2056 : //
2057 1003 : if(timeout != nullptr)
2058 : {
2059 163 : if(msg.has_parameter(cluck::g_name_cluck_param_timeout))
2060 : {
2061 : // this timeout may already be out of date in which case
2062 : // the lock immediately fails
2063 : //
2064 53 : *timeout = msg.get_timespec_parameter(cluck::g_name_cluck_param_timeout);
2065 : }
2066 : else
2067 : {
2068 110 : *timeout = snapdev::now() + cluck::get_lock_obtention_timeout();
2069 : }
2070 : }
2071 :
2072 : // get the key of a ticket or entering object
2073 : //
2074 1003 : if(key != nullptr)
2075 : {
2076 743 : *key = msg.get_parameter(cluck::g_name_cluck_param_key);
2077 : }
2078 :
2079 : // get the source of a ticket (i.e. <server> '/' <service>)
2080 : //
2081 1003 : if(source != nullptr)
2082 : {
2083 8 : *source = msg.get_parameter(cluck::g_name_cluck_param_source);
2084 : }
2085 :
2086 1003 : return true;
2087 : }
2088 :
2089 :
2090 : /** \brief Lock the resource after confirmation that client is alive.
2091 : *
2092 : * This message is expected just after we sent an ALIVE message to
2093 : * the client.
2094 : *
2095 : * Whenever a leader dies, we suspect that the client may have died
2096 : * with it so we send it an ALIVE message to know whether it is worth
2097 : * the trouble of entering that lock.
2098 : *
2099 : * \param[in] msg The ABSOLUTELY message to handle.
2100 : */
2101 4 : void cluckd::msg_absolutely(ed::message & msg)
2102 : {
2103 : // we may receive the ABSOLUTELY message from anywhere so don't expect
2104 : // the "serial" parameter to be defined
2105 : //
2106 4 : if(!msg.has_parameter(ed::g_name_ed_param_serial))
2107 : {
2108 1 : return;
2109 : }
2110 :
2111 9 : std::string const serial(msg.get_parameter(ed::g_name_ed_param_serial));
2112 3 : std::vector<std::string> segments;
2113 3 : snapdev::tokenize_string(segments, serial, "/");
2114 :
2115 3 : if(segments[0] == "cluckd")
2116 : {
2117 : // check serial as defined in msg_lock()
2118 : //
2119 2 : if(segments.size() != 4)
2120 : {
2121 1 : SNAP_LOG_WARNING
2122 : << "ABSOLUTELY reply has an invalid cluckd serial parameters \""
2123 : << serial
2124 : << "\" was expected to have exactly 4 segments."
2125 : << SNAP_LOG_SEND;
2126 :
2127 1 : ed::message invalid;
2128 1 : invalid.set_command(ed::g_name_ed_cmd_invalid);
2129 1 : invalid.reply_to(msg);
2130 1 : invalid.add_parameter(
2131 : ed::g_name_ed_param_command
2132 : , msg.get_command());
2133 2 : invalid.add_parameter(
2134 : ed::g_name_ed_param_message
2135 : , "invalid number of segments in \""
2136 2 : + serial
2137 3 : + "\".");
2138 1 : f_messenger->send_message(invalid);
2139 :
2140 1 : return;
2141 1 : }
2142 :
2143 : // notice how the split() re-split the entering key
2144 : //
2145 1 : std::string const object_name(segments[1]);
2146 1 : std::string const server_name(segments[2]);
2147 1 : std::string const client_pid(segments[3]);
2148 :
2149 1 : auto entering_ticket(f_entering_tickets.find(object_name));
2150 1 : if(entering_ticket != f_entering_tickets.end())
2151 : {
2152 2 : std::string const entering_key(server_name + '/' + client_pid);
2153 1 : auto key_ticket(entering_ticket->second.find(entering_key));
2154 1 : if(key_ticket != entering_ticket->second.end())
2155 : {
2156 : // remove the alive timeout
2157 : //
2158 1 : key_ticket->second->set_alive_timeout(cluck::timeout_t());
2159 :
2160 : // got it! start the bakery algorithm
2161 : //
2162 1 : key_ticket->second->entering();
2163 : }
2164 1 : }
2165 1 : }
2166 :
2167 : // ignore other messages
2168 4 : }
2169 :
2170 :
2171 : /** \brief Acknowledge the ACTIVATE_LOCK with what we think is our first lock.
2172 : *
2173 : * This function replies to an ACTIVATE_LOCK request with what we think is
2174 : * the first lock for the specified object.
2175 : *
2176 : * Right now, we disregard the specified key. There is nothing we can really
2177 : * do with it here.
2178 : *
2179 : * If we do not have a ticket for the specified object (something that could
2180 : * happen if the ticket just timed out) then we still have to reply, only
2181 : * we let the other leader know that we have no clue what he is talking about.
2182 : *
2183 : * \param[in] msg The ACTIVATE_LOCK message.
2184 : */
2185 114 : void cluckd::msg_activate_lock(ed::message & msg)
2186 : {
2187 114 : std::string object_name;
2188 114 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2189 114 : std::string key;
2190 114 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2191 : {
2192 : return; // LCOV_EXCL_LINE
2193 : }
2194 :
2195 228 : std::string first_key("no-key");
2196 :
2197 114 : auto ticket(find_first_lock(object_name));
2198 114 : if(ticket != nullptr)
2199 : {
2200 : // found a lock
2201 : //
2202 114 : first_key = ticket->get_ticket_key();
2203 :
2204 114 : if(key == first_key)
2205 : {
2206 : // we can mark this ticket as activated
2207 : //
2208 114 : ticket->lock_activated();
2209 : }
2210 : }
2211 :
2212 : // always reply, if we could not find the key, then we returned 'no-key'
2213 : // as the key parameter
2214 : //
2215 114 : ed::message lock_activated_message;
2216 114 : lock_activated_message.set_command(cluck::g_name_cluck_cmd_lock_activated);
2217 114 : lock_activated_message.reply_to(msg);
2218 114 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2219 114 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2220 114 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_key, key);
2221 114 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_other_key, first_key);
2222 114 : f_messenger->send_message(lock_activated_message);
2223 :
2224 : // the list of tickets is not unlikely changed so we need to make
2225 : // a call to cleanup to make sure the timer is reset appropriately
2226 : //
2227 114 : cleanup();
2228 114 : }
2229 :
2230 :
2231 : /** \brief Add a ticket from another cluckd.
2232 : *
2233 : * Tickets get duplicated on the cluckd leaders.
2234 : *
2235 : * \note
2236 : * Although we only need a QUORUM number of nodes to receive a copy of
2237 : * the data, the data still get broadcast to all the cluck leaders.
2238 : * After this message arrives any one of the cluck process can
2239 : * handle the unlock if the UNLOCK message gets sent to another process
2240 : * instead of the one which first created the ticket. This is the point
2241 : * of the implementation since we want to be fault tolerant (as in if one
2242 : * of the leaders goes down, the locking mechanism still works).
2243 : *
2244 : * \param[in] msg The ADD_TICKET message being handled.
2245 : */
2246 8 : void cluckd::msg_add_ticket(ed::message & msg)
2247 : {
2248 8 : std::string object_name;
2249 8 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2250 8 : std::string key;
2251 8 : cluck::timeout_t timeout;
2252 8 : if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, nullptr))
2253 : {
2254 : return; // LCOV_EXCL_LINE
2255 : }
2256 :
2257 : // make sure the ticket is unique
2258 : //
2259 8 : auto const obj_ticket(f_tickets.find(object_name));
2260 8 : if(obj_ticket != f_tickets.end())
2261 : {
2262 1 : auto const key_ticket(obj_ticket->second.find(key));
2263 1 : if(key_ticket != obj_ticket->second.end())
2264 : {
2265 2 : SNAP_LOG_ERROR
2266 : << "an existing ticket has the same object name \""
2267 : << object_name
2268 1 : << "\" ("
2269 1 : << tag
2270 : << ") and key \""
2271 : << key
2272 : << "\"."
2273 : << SNAP_LOG_SEND;
2274 :
2275 1 : ed::message lock_failed_message;
2276 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2277 1 : lock_failed_message.reply_to(msg);
2278 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2279 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2280 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2281 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2282 : #ifndef CLUCKD_OPTIMIZATIONS
2283 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an existing ticket name and key");
2284 : #endif
2285 1 : f_messenger->send_message(lock_failed_message);
2286 :
2287 9 : communicatord::flag::pointer_t flag(COMMUNICATORD_FLAG_UP(
2288 : "cluckd"
2289 : , "ticket"
2290 : , "invalid-algorithm"
2291 : , "msg_add_ticket() received a second call to add the"
2292 : " same ticket. This either means there is a bug in our"
2293 : " algorithm or there is a hacker sending us messages"
2294 : " trying to create invalid tickets."
2295 4 : ));
2296 1 : flag->set_priority(25);
2297 1 : flag->add_tag("bug");
2298 1 : flag->set_manual_down(true);
2299 1 : flag->save();
2300 :
2301 1 : return;
2302 1 : }
2303 : }
2304 :
2305 : // the client_pid parameter is part of the key (3rd segment)
2306 : //
2307 7 : std::vector<std::string> segments;
2308 7 : snapdev::tokenize_string(segments, key, "/");
2309 7 : if(segments.size() != 3)
2310 : {
2311 1 : SNAP_LOG_ERROR
2312 : << "Expected exactly 3 segments in \""
2313 : << key
2314 : << "\" to add a ticket."
2315 : << SNAP_LOG_SEND;
2316 :
2317 1 : ed::message lock_failed_message;
2318 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2319 1 : lock_failed_message.reply_to(msg);
2320 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2321 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2322 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2323 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2324 : #ifndef CLUCKD_OPTIMIZATIONS
2325 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (expected three segments)");
2326 : #endif
2327 1 : f_messenger->send_message(lock_failed_message);
2328 :
2329 1 : return;
2330 1 : }
2331 :
2332 : // TODO: we probably want to look at using a function which returns false
2333 : // instead of having to do a try/catch
2334 : //
2335 6 : bool ok(true);
2336 6 : std::uint32_t number(0);
2337 : try
2338 : {
2339 6 : number = snapdev::hex_to_int<std::uint32_t>(segments[0]);
2340 : }
2341 2 : catch(snapdev::hexadecimal_string_exception const &)
2342 : {
2343 1 : ok = false;
2344 1 : }
2345 1 : catch(snapdev::hexadecimal_string_out_of_range const &)
2346 : {
2347 1 : ok = false;
2348 1 : }
2349 6 : if(!ok)
2350 : {
2351 4 : SNAP_LOG_ERROR
2352 : << "somehow ticket number \""
2353 2 : << segments[0]
2354 : << "\" is not a valid hexadecimal number."
2355 : << SNAP_LOG_SEND;
2356 :
2357 2 : ed::message lock_failed_message;
2358 2 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2359 2 : lock_failed_message.reply_to(msg);
2360 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2361 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2362 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2363 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2364 : #ifndef CLUCKD_OPTIMIZATIONS
2365 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (first segment is not a valid hexadecimal number)");
2366 : #endif
2367 2 : f_messenger->send_message(lock_failed_message);
2368 :
2369 2 : return;
2370 2 : }
2371 :
2372 : // by now all the leaders should already have
2373 : // an entering ticket for that one ticket
2374 : //
2375 4 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
2376 4 : if(obj_entering_ticket == f_entering_tickets.end())
2377 : {
2378 1 : SNAP_LOG_ERROR
2379 : << "Expected entering ticket object for \""
2380 : << object_name
2381 : << "\" not found when adding a ticket."
2382 : << SNAP_LOG_SEND;
2383 :
2384 1 : ed::message lock_failed_message;
2385 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2386 1 : lock_failed_message.reply_to(msg);
2387 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2388 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2389 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2390 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2391 : #ifndef CLUCKD_OPTIMIZATIONS
2392 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find an entering ticket with the specified object_name");
2393 : #endif
2394 1 : f_messenger->send_message(lock_failed_message);
2395 :
2396 1 : return;
2397 1 : }
2398 :
2399 : // the key we need to search is not the new ticket key but the
2400 : // entering key, build it from the segments
2401 : //
2402 6 : std::string const entering_key(segments[1] + '/' + segments[2]);
2403 3 : auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2404 3 : if(key_entering_ticket == obj_entering_ticket->second.end())
2405 : {
2406 1 : SNAP_LOG_ERROR
2407 : << "Expected entering ticket key for \""
2408 : << object_name
2409 : << "\" not found when adding a ticket."
2410 : << SNAP_LOG_SEND;
2411 :
2412 1 : ed::message lock_failed_message;
2413 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2414 1 : lock_failed_message.reply_to(msg);
2415 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2416 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2417 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2418 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2419 : #ifndef CLUCKD_OPTIMIZATIONS
2420 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find the very entering ticket with the specified key");
2421 : #endif
2422 1 : f_messenger->send_message(lock_failed_message);
2423 :
2424 1 : return;
2425 1 : }
2426 :
2427 : // make it an official ticket now
2428 : //
2429 : // this should happen on all cluck daemon other than the one that
2430 : // first received the LOCK message
2431 : //
2432 2 : set_ticket(object_name, key, key_entering_ticket->second);
2433 :
2434 : // WARNING: the set_ticket_number() function has the same side
2435 : // effects as the add_ticket() function without the
2436 : // f_messenger->send_message() call
2437 : //
2438 2 : f_tickets[object_name][key]->set_ticket_number(number);
2439 :
2440 2 : ed::message ticket_added_message;
2441 2 : ticket_added_message.set_command(cluck::g_name_cluck_cmd_ticket_added);
2442 2 : ticket_added_message.reply_to(msg);
2443 2 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2444 2 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_key, key);
2445 2 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2446 2 : f_messenger->send_message(ticket_added_message);
2447 20 : }
2448 :
2449 :
2450 : /** \brief The communicatord lost too many connections.
2451 : *
2452 : * When the cluster is not complete, the CLUSTER_DOWN message gets sent
2453 : * by the communicatord. We need to stop offering locks at that point.
2454 : * Locks that are up are fine, but new locks are not possible.
2455 : *
2456 : * \param[in] msg The CLUSTER_DOWN message.
2457 : */
2458 1 : void cluckd::msg_cluster_down(ed::message & msg)
2459 : {
2460 1 : snapdev::NOT_USED(msg);
2461 :
2462 1 : SNAP_LOG_INFO
2463 : << "cluster is down, canceling existing locks and we have to"
2464 : " refuse any further lock requests for a while."
2465 : << SNAP_LOG_SEND;
2466 :
2467 : // in this case, we cannot safely keep the leaders
2468 : //
2469 1 : f_leaders.clear();
2470 :
2471 : // in case services listen to the NO_LOCK, let them know it's gone
2472 : //
2473 1 : check_lock_status();
2474 :
2475 : // we do not call the lock_gone() because the HANGUP will be sent
2476 : // if required so we do not have to do that twice
2477 1 : }
2478 :
2479 :
2480 : /** \brief Cluster is ready, send the LOCK_STARTED message.
2481 : *
2482 : * Our cluster is finally ready, so we can send the LOCK_STARTED and work
2483 : * on a leader election if still required.
2484 : *
2485 : * \param[in] msg CLUSTER_UP message we are dealing with.
2486 : */
2487 24 : void cluckd::msg_cluster_up(ed::message & msg)
2488 : {
2489 24 : f_neighbors_count = msg.get_integer_parameter("neighbors_count");
2490 24 : f_neighbors_quorum = f_neighbors_count / 2 + 1;
2491 :
2492 24 : computer::priority_t priority(computer::PRIORITY_OFF);
2493 72 : std::string candidate_priority(f_opts.get_string("candidate-priority"));
2494 24 : if(candidate_priority != "off")
2495 : {
2496 23 : priority = f_opts.get_long("candidate-priority"
2497 : , 0
2498 : , computer::PRIORITY_USER_MIN
2499 : , computer::PRIORITY_MAX);
2500 : }
2501 :
2502 : #ifdef _DEBUG
2503 : // the READY message is expected to happen first and setup this parameter
2504 : //
2505 24 : if(f_my_ip_address.is_default())
2506 : {
2507 : throw cluck::logic_error("cluckd::msg_cluster_up(): somehow f_my_ip_address is still the default in msg_cluster_up()."); // LCOV_EXCL_LINE
2508 : }
2509 : #endif
2510 :
2511 : // add ourselves to the list of computers; mark us connected; get our ID
2512 : //
2513 24 : f_computers[f_server_name] = std::make_shared<computer>(f_server_name, priority, f_my_ip_address);
2514 24 : f_computers[f_server_name]->set_start_time(f_start_time);
2515 24 : f_computers[f_server_name]->set_connected(true);
2516 24 : f_my_id = f_computers[f_server_name]->get_id();
2517 :
2518 48 : SNAP_LOG_INFO
2519 24 : << "cluster is up with "
2520 24 : << f_neighbors_count
2521 : << " neighbors, attempt an election"
2522 : " then check for leaders by sending a LOCK_STARTED message."
2523 : << SNAP_LOG_SEND;
2524 :
2525 24 : election_status();
2526 24 : send_lock_started(nullptr);
2527 24 : check_lock_status();
2528 48 : }
2529 :
2530 :
2531 : /** \brief One of the cluckd processes asked for a ticket to be dropped.
2532 : *
2533 : * This function searches for the specified ticket and removes it from
2534 : * this cluckd.
2535 : *
2536 : * If the specified ticket does not exist, nothing happens.
2537 : *
2538 : * \warning
2539 : * The DROP_TICKET event includes either the ticket key (if available)
2540 : * or the entering key (when the ticket key was not yet available).
2541 : * Note that the ticket key should always exists by the time a DROP_TICKET
2542 : * happens, but just in case this allows the dropping of a ticket at any
2543 : * time.
2544 : *
2545 : * \param[in] msg The DROP_TICKET message.
2546 : */
2547 2 : void cluckd::msg_drop_ticket(ed::message & msg)
2548 : {
2549 2 : std::string object_name;
2550 2 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2551 2 : std::string key;
2552 2 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2553 : {
2554 : return; // LCOV_EXCL_LINE
2555 : }
2556 :
2557 2 : std::vector<std::string> segments;
2558 2 : snapdev::tokenize_string(segments, key, "/");
2559 :
2560 : // drop the regular ticket
2561 : //
2562 : // if we have only 2 segments, then there is no corresponding ticket
2563 : // since tickets are added only once we have a ticket_id
2564 : //
2565 2 : std::string entering_key;
2566 2 : if(segments.size() == 3)
2567 : {
2568 1 : auto obj_ticket(f_tickets.find(object_name));
2569 1 : if(obj_ticket != f_tickets.end())
2570 : {
2571 1 : auto key_ticket(obj_ticket->second.find(key));
2572 1 : if(key_ticket != obj_ticket->second.end())
2573 : {
2574 1 : obj_ticket->second.erase(key_ticket);
2575 : }
2576 :
2577 1 : if(obj_ticket->second.empty())
2578 : {
2579 1 : f_tickets.erase(obj_ticket);
2580 : }
2581 :
2582 : // one ticket was erased, another may be first now
2583 : //
2584 1 : activate_first_lock(object_name);
2585 : }
2586 :
2587 : // we received the ticket_id in the message, so
2588 : // we have to regenerate the entering_key without
2589 : // the ticket_id (which is the first element)
2590 : //
2591 1 : entering_key = segments[1] + '/' + segments[2];
2592 : }
2593 : else
2594 : {
2595 : // we received the entering_key in the message, use as is
2596 : //
2597 1 : entering_key = key;
2598 : }
2599 :
2600 : // drop the entering ticket
2601 : //
2602 2 : auto obj_entering_ticket(f_entering_tickets.find(object_name));
2603 2 : if(obj_entering_ticket != f_entering_tickets.end())
2604 : {
2605 1 : auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2606 1 : if(key_entering_ticket != obj_entering_ticket->second.end())
2607 : {
2608 1 : obj_entering_ticket->second.erase(key_entering_ticket);
2609 : }
2610 :
2611 1 : if(obj_entering_ticket->second.empty())
2612 : {
2613 1 : f_entering_tickets.erase(obj_entering_ticket);
2614 : }
2615 : }
2616 :
2617 : // the list of tickets is not unlikely changed so we need to make
2618 : // a call to cleanup to make sure the timer is reset appropriately
2619 : //
2620 2 : cleanup();
2621 2 : }
2622 :
2623 :
2624 : /** \brief Search for the largest ticket.
2625 : *
2626 : * This function searches the list of tickets for the largest one
2627 : * and returns that number.
2628 : *
2629 : * Note that this is specific to a given lock as defined by the object
2630 : * name of that lock. It is not the largest of all tickets across the
2631 : * entire set.
2632 : *
2633 : * The function replies with the MAX_TICKET message.
2634 : *
2635 : * \param[in] msg The message just received.
2636 : *
2637 : * \return The largest ticket number that currently exist in the list
2638 : * of tickets.
2639 : */
2640 2 : void cluckd::msg_get_max_ticket(ed::message & msg)
2641 : {
2642 2 : std::string object_name;
2643 2 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2644 2 : std::string key;
2645 2 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2646 : {
2647 : return; // LCOV_EXCL_LINE
2648 : }
2649 :
2650 : // remove any f_tickets that timed out by now because these should
2651 : // not be taken in account in the max. computation
2652 : //
2653 2 : cleanup();
2654 :
2655 2 : ticket::ticket_id_t const last_ticket(get_last_ticket(object_name));
2656 :
2657 2 : ed::message reply;
2658 2 : reply.set_command(cluck::g_name_cluck_cmd_max_ticket);
2659 2 : reply.reply_to(msg);
2660 2 : reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2661 2 : reply.add_parameter(cluck::g_name_cluck_param_key, key);
2662 2 : reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
2663 2 : reply.add_parameter(cluck::g_name_cluck_param_ticket_id, last_ticket);
2664 2 : f_messenger->send_message(reply);
2665 2 : }
2666 :
2667 :
2668 : /** \brief Reply to the LIST_TICKETS message with the TICKET_LIST.
2669 : *
2670 : * This function gets called whenever the command line tool (`cluckd`)
2671 : * is run with the `--list` command line option. It generates a list of
2672 : * tickets and sends that back to the tool as a TICKET_LIST message.
2673 : *
2674 : * \param[in] msg The LIST_TICKETS message.
2675 : */
2676 4 : void cluckd::msg_list_tickets(ed::message & msg)
2677 : {
2678 4 : ed::message list_message;
2679 4 : list_message.set_command(cluck::g_name_cluck_cmd_ticket_list);
2680 4 : list_message.reply_to(msg);
2681 4 : list_message.add_parameter(cluck::g_name_cluck_param_list, ticket_list());
2682 4 : f_messenger->send_message(list_message);
2683 8 : }
2684 :
2685 :
2686 : /** \brief Lock the named resource.
2687 : *
2688 : * This function locks the specified resource \p object_name. It returns
2689 : * when the resource is locked or when the lock timeout is reached.
2690 : *
2691 : * See the ticket class for more details about the locking
2692 : * mechanisms (algorithm and MSC implementation).
2693 : *
2694 : * Note that if lock() is called with an empty string then the function
2695 : * unlocks the lock and returns immediately with false. This is equivalent
2696 : * to calling unlock().
2697 : *
2698 : * \note
2699 : * The function reloads all the parameters (outside of the table) because
2700 : * we need to support a certain amount of dynamism. For example, an
2701 : * administrator may want to add a new host on the system. In that case,
2702 : * the list of host changes and it has to be detected here.
2703 : *
2704 : * \attention
2705 : * The function accepts a "serial" parameter in the message. This is only
2706 : * used internally when a leader is lost and a new one is assigned a lock
2707 : * which would otherwise fail.
2708 : *
2709 : * \warning
2710 : * The object name is left available in the lock table. Do not use any
2711 : * secure/secret name/word, etc. as the object name.
2712 : *
2713 : * \bug
2714 : * At this point there is no proper protection to recover from errors
2715 : * that would happen while working on locking this entry. This means
2716 : * failures may result in a lock that never ends.
2717 : *
2718 : * \param[in] msg The lock message.
2719 : *
2720 : * \return true if the lock was successful, false otherwise.
2721 : *
2722 : * \sa unlock()
2723 : */
2724 145 : void cluckd::msg_lock(ed::message & msg)
2725 : {
2726 145 : std::string object_name;
2727 145 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2728 145 : pid_t client_pid(0);
2729 145 : cluck::timeout_t timeout;
2730 145 : if(!get_parameters(msg, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
2731 : {
2732 1 : return;
2733 : }
2734 :
2735 : // do some cleanup as well
2736 : //
2737 144 : cleanup();
2738 :
2739 : // if we are a leader, create an entering key
2740 : //
2741 576 : std::string const server_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2742 : ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2743 290 : : msg.get_sent_from_server());
2744 :
2745 576 : std::string const service_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2746 : ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2747 290 : : msg.get_sent_from_service());
2748 :
2749 432 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
2750 :
2751 144 : if(timeout <= snapdev::now())
2752 : {
2753 4 : SNAP_LOG_WARNING
2754 : << "Lock on \""
2755 : << object_name
2756 2 : << "\" ("
2757 2 : << tag
2758 2 : << ")/ \""
2759 : << client_pid
2760 : << "\" timed out before we could start the locking process."
2761 : << SNAP_LOG_SEND;
2762 :
2763 2 : ed::message lock_failed_message;
2764 2 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2765 2 : lock_failed_message.reply_to(msg);
2766 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2767 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2768 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2769 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
2770 : #ifndef CLUCKD_OPTIMIZATIONS
2771 2 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK timeout date is already in the past");
2772 : #endif
2773 2 : f_messenger->send_message(lock_failed_message);
2774 :
2775 2 : return;
2776 2 : }
2777 :
2778 142 : cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
2779 142 : if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
2780 : {
2781 : // duration too small
2782 : //
2783 1 : SNAP_LOG_ERROR
2784 : << duration
2785 : << " is an invalid duration, the minimum accepted is "
2786 : << cluck::CLUCK_MINIMUM_TIMEOUT
2787 : << '.'
2788 : << SNAP_LOG_SEND;
2789 :
2790 1 : ed::message lock_failed_message;
2791 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2792 1 : lock_failed_message.reply_to(msg);
2793 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2794 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2795 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2796 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2797 : #ifndef CLUCKD_OPTIMIZATIONS
2798 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with a duration that is too small");
2799 : #endif
2800 1 : f_messenger->send_message(lock_failed_message);
2801 :
2802 1 : return;
2803 1 : }
2804 :
2805 141 : cluck::timeout_t unlock_duration(cluck::CLUCK_DEFAULT_TIMEOUT);
2806 141 : if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
2807 : {
2808 5 : unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
2809 5 : if(unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
2810 : {
2811 : // invalid duration, minimum is cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
2812 : //
2813 1 : SNAP_LOG_ERROR
2814 : << unlock_duration
2815 : << " is an invalid unlock duration, the minimum accepted is "
2816 : << cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
2817 : << '.'
2818 : << SNAP_LOG_SEND;
2819 :
2820 1 : ed::message lock_failed_message;
2821 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2822 1 : lock_failed_message.reply_to(msg);
2823 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2824 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2825 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2826 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2827 : #ifndef CLUCKD_OPTIMIZATIONS
2828 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with an unlock duration that is too small");
2829 : #endif
2830 1 : f_messenger->send_message(lock_failed_message);
2831 :
2832 1 : return;
2833 1 : }
2834 : }
2835 :
2836 140 : if(!is_daemon_ready())
2837 : {
2838 12 : SNAP_LOG_TRACE
2839 : << "caching LOCK message for \""
2840 : << object_name
2841 6 : << "\" ("
2842 6 : << tag
2843 : << ") as the cluck system is not yet considered ready."
2844 : << SNAP_LOG_SEND;
2845 :
2846 6 : f_message_cache.emplace_back(timeout, msg);
2847 :
2848 : // make sure the cache gets cleaned up if the message times out
2849 : //
2850 6 : std::int64_t const timeout_date(f_timer->get_timeout_date());
2851 8 : if(timeout_date == -1
2852 8 : || cluck::timeout_t(timeout_date / 1'000'000, timeout_date % 1'000'000) > timeout)
2853 : {
2854 6 : f_timer->set_timeout_date(timeout);
2855 : }
2856 6 : return;
2857 : }
2858 :
2859 134 : if(is_leader() == nullptr)
2860 : {
2861 : // we are not a leader, we need to forward the message to one
2862 : // of the leaders instead
2863 : //
2864 2 : forward_message_to_leader(msg);
2865 2 : return;
2866 : }
2867 :
2868 : // make sure this is a new ticket
2869 : //
2870 132 : auto entering_ticket(f_entering_tickets.find(object_name));
2871 132 : if(entering_ticket != f_entering_tickets.end())
2872 : {
2873 114 : auto key_ticket(entering_ticket->second.find(entering_key));
2874 114 : if(key_ticket != entering_ticket->second.end())
2875 : {
2876 : // if this is a re-LOCK, then it may be a legitimate duplicate
2877 : // in which case we do not want to generate a LOCK_FAILED error
2878 : //
2879 2 : if(msg.has_parameter(cluck::g_name_cluck_param_serial))
2880 : {
2881 1 : ticket::serial_t const serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
2882 1 : if(key_ticket->second->get_serial() == serial)
2883 : {
2884 : // legitimate double request from leaders
2885 : // (this happens when a leader dies and we have to restart
2886 : // a lock negotiation)
2887 : //
2888 1 : return;
2889 : }
2890 : }
2891 :
2892 : // the object already exists... do not allow duplicates
2893 : //
2894 2 : SNAP_LOG_ERROR
2895 : << "an entering ticket has the same object name \""
2896 : << object_name
2897 1 : << "\" ("
2898 1 : << tag
2899 : << ") and entering key \""
2900 : << entering_key
2901 : << "\"."
2902 : << SNAP_LOG_SEND;
2903 :
2904 1 : ed::message lock_failed_message;
2905 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2906 1 : lock_failed_message.reply_to(msg);
2907 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2908 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2909 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2910 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2911 : #ifndef CLUCKD_OPTIMIZATIONS
2912 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same entering ticket object_name and entering_key");
2913 : #endif
2914 1 : f_messenger->send_message(lock_failed_message);
2915 :
2916 1 : return;
2917 1 : }
2918 112 : if(entering_ticket->second.size() >= cluck::CLUCK_MAXIMUM_ENTERING_LOCKS)
2919 : {
2920 : // this is a failure in the algorithm (unfortunately), if you
2921 : // send LOCK commands without much pause, the number of entering
2922 : // ticket can grow forever; the following is a way to avoid
2923 : // that situation by preventing such inconsiderate growth.
2924 : //
2925 10 : SNAP_LOG_ERROR
2926 : << "too many entering tickets for object name \""
2927 : << object_name
2928 : << "\"."
2929 : << SNAP_LOG_SEND;
2930 :
2931 10 : ed::message lock_failed_message;
2932 10 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2933 10 : lock_failed_message.reply_to(msg);
2934 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2935 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2936 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2937 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_overflow);
2938 : #ifndef CLUCKD_OPTIMIZATIONS
2939 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called too quickly that the number of entering tickets overflowed");
2940 : #endif
2941 10 : f_messenger->send_message(lock_failed_message);
2942 :
2943 10 : return;
2944 10 : }
2945 : }
2946 :
2947 : // make sure there is not a ticket with the same name already defined
2948 : //
2949 : // (this is is really important so we can actually properly UNLOCK an
2950 : // existing lock since we use the same search and if two entries were
2951 : // to be the same we could not know which to unlock; there are a few
2952 : // other places where such a search is used actually...)
2953 : //
2954 120 : auto obj_ticket(f_tickets.find(object_name));
2955 120 : if(obj_ticket != f_tickets.end())
2956 : {
2957 6 : auto key_ticket(std::find_if(
2958 2 : obj_ticket->second.begin()
2959 2 : , obj_ticket->second.end()
2960 2 : , [&entering_key](auto const & t)
2961 : {
2962 2 : return t.second->get_entering_key() == entering_key;
2963 : }));
2964 2 : if(key_ticket != obj_ticket->second.end())
2965 : {
2966 : // there is already a ticket with this object name/entering key
2967 : //
2968 2 : SNAP_LOG_ERROR
2969 : << "a ticket has the same object name \""
2970 : << object_name
2971 1 : << "\" ("
2972 1 : << tag
2973 : << ") and entering key \""
2974 : << entering_key
2975 : << "\"."
2976 : << SNAP_LOG_SEND;
2977 :
2978 1 : ed::message lock_failed_message;
2979 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2980 1 : lock_failed_message.reply_to(msg);
2981 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2982 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2983 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2984 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2985 : #ifndef CLUCKD_OPTIMIZATIONS
2986 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same ticket object_name and entering_key");
2987 : #endif
2988 1 : f_messenger->send_message(lock_failed_message);
2989 :
2990 1 : return;
2991 1 : }
2992 : }
2993 :
2994 119 : ticket::pointer_t ticket(std::make_shared<ticket>(
2995 119 : this
2996 119 : , f_messenger
2997 : , object_name
2998 : , tag
2999 : , entering_key
3000 : , timeout
3001 : , duration
3002 : , server_name
3003 119 : , service_name));
3004 :
3005 119 : f_entering_tickets[object_name][entering_key] = ticket;
3006 :
3007 : // finish up ticket initialization
3008 : //
3009 119 : ticket->set_unlock_duration(unlock_duration);
3010 :
3011 : // generate a serial number for that ticket
3012 : //
3013 119 : f_ticket_serial = (f_ticket_serial + 1) & 0x00FFFFFF; // 0 is a valid serial number (-1 is not)
3014 119 : if(f_leaders[0]->get_id() != f_my_id)
3015 : {
3016 12 : if(f_leaders.size() >= 2
3017 12 : && f_leaders[1]->get_id() != f_my_id)
3018 : {
3019 0 : f_ticket_serial |= 1 << 24;
3020 : }
3021 12 : else if(f_leaders.size() >= 3
3022 12 : && f_leaders[2]->get_id() != f_my_id)
3023 : {
3024 11 : f_ticket_serial |= 2 << 24;
3025 : }
3026 : }
3027 119 : ticket->set_serial(f_ticket_serial);
3028 :
3029 119 : if(msg.has_parameter(cluck::g_name_cluck_param_serial))
3030 : {
3031 : // if we have a "serial" number in that message, we lost a leader
3032 : // and when that happens we are not unlikely to have lost the
3033 : // client that requested the LOCK, send an ALIVE message to make
3034 : // sure that the client still exists before entering the ticket
3035 : //
3036 : // TODO: we may want to make this 5s a parameter that we can change
3037 : //
3038 1 : ticket->set_alive_timeout(snapdev::now() + cluck::timeout_t(5, 0));
3039 :
3040 1 : ed::message alive_message;
3041 1 : alive_message.set_command(ed::g_name_ed_cmd_alive);
3042 1 : alive_message.set_server(server_name);
3043 1 : alive_message.set_service(service_name);
3044 1 : alive_message.add_parameter(ed::g_name_ed_param_serial, "cluckd/" + object_name + '/' + entering_key);
3045 1 : alive_message.add_parameter(ed::g_name_ed_param_timestamp, snapdev::now());
3046 1 : f_messenger->send_message(alive_message);
3047 1 : }
3048 : else
3049 : {
3050 : // act on the new ticket
3051 : //
3052 118 : ticket->entering();
3053 : }
3054 :
3055 : // the list of tickets changed, make sure we update the timeout timer
3056 : //
3057 119 : cleanup();
3058 220 : }
3059 :
3060 :
3061 : /** \brief Acknowledgement of the lock to activate.
3062 : *
3063 : * This function is an acknowledgement that the lock can now be
3064 : * activated. This is true only if the 'key' and 'other_key'
3065 : * are a match, though.
3066 : *
3067 : * \param[in] msg The LOCK_ACTIVATED message.
3068 : */
3069 17 : void cluckd::msg_lock_activated(ed::message & msg)
3070 : {
3071 17 : std::string object_name;
3072 17 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3073 17 : std::string key;
3074 17 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3075 : {
3076 : return; // LCOV_EXCL_LINE
3077 : }
3078 :
3079 17 : std::string const & other_key(msg.get_parameter(cluck::g_name_cluck_param_other_key));
3080 17 : if(other_key == key)
3081 : {
3082 17 : auto obj_ticket(f_tickets.find(object_name));
3083 17 : if(obj_ticket != f_tickets.end())
3084 : {
3085 17 : auto key_ticket(obj_ticket->second.find(key));
3086 17 : if(key_ticket != obj_ticket->second.end())
3087 : {
3088 : // that key is still here!
3089 : // time to activate
3090 : //
3091 17 : key_ticket->second->lock_activated();
3092 : }
3093 : }
3094 : }
3095 17 : }
3096 :
3097 :
3098 : /** \brief Tell the specified ticket LOCK_ENTERED was received.
3099 : *
3100 : * This function calls the specified ticket entered() function, which
3101 : * processes the LOCK_ENTERED message and sends the GET_MAX_TICKET
3102 : * message to the other leaders.
3103 : *
3104 : * \param[in] msg The LOCK_ENTERED message.
3105 : */
3106 123 : void cluckd::msg_lock_entered(ed::message & msg)
3107 : {
3108 123 : std::string object_name;
3109 123 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3110 123 : std::string key;
3111 123 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3112 : {
3113 : return; // LCOV_EXCL_LINE
3114 : }
3115 :
3116 123 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3117 123 : if(obj_entering_ticket != f_entering_tickets.end())
3118 : {
3119 123 : auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3120 123 : if(key_entering_ticket != obj_entering_ticket->second.end())
3121 : {
3122 123 : key_entering_ticket->second->entered();
3123 : }
3124 : }
3125 123 : }
3126 :
3127 :
3128 : /** \brief Create an entering ticket.
3129 : *
3130 : * When the cluck daemon receives a LOCK message, it sends a LOCK_ENTERING
3131 : * to the other leaders so that way of all them have a complete list of all
3132 : * the tickets.
3133 : *
3134 : * This function creates an "entering ticket." This is a "real" ticket,
3135 : * but it is still in an "entering" state.
3136 : *
3137 : * The function sends a LOCK_ENTERED as a reply to the LOCK_ENTERING
3138 : * message.
3139 : *
3140 : * \note
3141 : * Since a cluck daemon may receive this message multiple times, if it
3142 : * finds an existing entering ticket with the same parameters (object
3143 : * name and key), then it does not re-create yet another instance.
3144 : *
3145 : * \param[in] msg The LOCK_ENTERING message.
3146 : */
3147 8 : void cluckd::msg_lock_entering(ed::message & msg)
3148 : {
3149 8 : std::string object_name;
3150 8 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3151 8 : cluck::timeout_t timeout;
3152 8 : std::string key;
3153 8 : std::string source;
3154 8 : if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, &source))
3155 : {
3156 : return; // LCOV_EXCL_LINE
3157 : }
3158 :
3159 : // lock still in the future?
3160 : //
3161 8 : if(timeout <= snapdev::now())
3162 : {
3163 1 : SNAP_LOG_DEBUG
3164 : << "received LOCK_ENTERING for \""
3165 : << object_name
3166 : << "\" that already timed out."
3167 : << SNAP_LOG_SEND;
3168 1 : return;
3169 : }
3170 :
3171 : // do we have enough leaders?
3172 : //
3173 7 : if(!is_daemon_ready())
3174 : {
3175 1 : SNAP_LOG_DEBUG
3176 : << "received LOCK_ENTERING while we are thinking we are not ready."
3177 : << SNAP_LOG_SEND;
3178 1 : return;
3179 : }
3180 :
3181 : // the entering is just a flag (i.e. entering[i] = true)
3182 : // in our case the existance of a ticket is enough to know
3183 : // that we entered
3184 : //
3185 6 : bool allocate(true);
3186 6 : auto const obj_ticket(f_entering_tickets.find(object_name));
3187 6 : if(obj_ticket != f_entering_tickets.end())
3188 : {
3189 1 : auto const key_ticket(obj_ticket->second.find(key));
3190 1 : allocate = key_ticket == obj_ticket->second.end();
3191 : }
3192 6 : if(allocate)
3193 : {
3194 : // ticket does not exist, so create it now
3195 : // (note: ticket should only exist on originator)
3196 : //
3197 5 : cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
3198 5 : if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
3199 : {
3200 : // invalid duration
3201 : //
3202 1 : SNAP_LOG_ERROR
3203 : << duration
3204 : << " is an invalid duration, the minimum accepted is "
3205 : << cluck::CLUCK_MINIMUM_TIMEOUT
3206 : << "."
3207 : << SNAP_LOG_SEND;
3208 :
3209 1 : ed::message lock_failed_message;
3210 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3211 1 : lock_failed_message.reply_to(msg);
3212 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3213 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3214 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3215 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3216 : #ifndef CLUCKD_OPTIMIZATIONS
3217 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with a duration which is too small");
3218 : #endif
3219 1 : f_messenger->send_message(lock_failed_message);
3220 :
3221 1 : return;
3222 1 : }
3223 :
3224 4 : cluck::timeout_t unlock_duration(cluck::CLUCK_DEFAULT_TIMEOUT);
3225 4 : if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
3226 : {
3227 4 : unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
3228 4 : if(unlock_duration != cluck::CLUCK_DEFAULT_TIMEOUT
3229 4 : && unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
3230 : {
3231 : // invalid duration, minimum is 60
3232 : //
3233 1 : SNAP_LOG_ERROR
3234 : << unlock_duration
3235 : << " is an invalid unlock duration, the minimum accepted is "
3236 : << cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
3237 : << "."
3238 : << SNAP_LOG_SEND;
3239 :
3240 1 : ed::message lock_failed_message;
3241 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3242 1 : lock_failed_message.reply_to(msg);
3243 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3244 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3245 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3246 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3247 : #ifndef CLUCKD_OPTIMIZATIONS
3248 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an unlock duration which is too small or set to the default value");
3249 : #endif
3250 1 : f_messenger->send_message(lock_failed_message);
3251 :
3252 1 : return;
3253 1 : }
3254 : }
3255 :
3256 : // we have to know where this message comes from
3257 : //
3258 3 : std::vector<std::string> source_segments;
3259 3 : if(snapdev::tokenize_string(source_segments, source, "/") != 2)
3260 : {
3261 2 : SNAP_LOG_ERROR
3262 : << "Invalid number of parameters in source parameter \""
3263 : << source
3264 1 : << "\" (found "
3265 1 : << source_segments.size()
3266 : << ", expected 2)."
3267 : << SNAP_LOG_SEND;
3268 :
3269 1 : ed::message lock_failed_message;
3270 1 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3271 1 : lock_failed_message.reply_to(msg);
3272 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3273 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3274 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3275 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3276 : #ifndef CLUCKD_OPTIMIZATIONS
3277 1 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an invalid source parameter");
3278 : #endif
3279 1 : f_messenger->send_message(lock_failed_message);
3280 :
3281 1 : return;
3282 1 : }
3283 :
3284 2 : ticket::pointer_t ticket(std::make_shared<ticket>(
3285 2 : this
3286 2 : , f_messenger
3287 : , object_name
3288 : , tag
3289 : , key
3290 : , timeout
3291 : , duration
3292 2 : , source_segments[0]
3293 4 : , source_segments[1]));
3294 :
3295 2 : f_entering_tickets[object_name][key] = ticket;
3296 :
3297 : // finish up on ticket initialization
3298 : //
3299 2 : ticket->set_owner(msg.get_sent_from_server());
3300 2 : ticket->set_unlock_duration(unlock_duration);
3301 2 : ticket->set_serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
3302 3 : }
3303 :
3304 3 : ed::message reply;
3305 3 : reply.set_command(cluck::g_name_cluck_cmd_lock_entered);
3306 3 : reply.reply_to(msg);
3307 3 : reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3308 3 : reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
3309 3 : reply.add_parameter(cluck::g_name_cluck_param_key, key);
3310 3 : f_messenger->send_message(reply);
3311 :
3312 3 : cleanup();
3313 18 : }
3314 :
3315 :
3316 : /** \brief Exit a ticket.
3317 : *
3318 : * This command exits a ticket, which means we got the GET_MAX_TICKET
3319 : * result and thus can be sure that the ticket is properly sorted in
3320 : * the list of tickets.
3321 : *
3322 : * \param[in] msg The LOCK_EXITING message.
3323 : */
3324 117 : void cluckd::msg_lock_exiting(ed::message & msg)
3325 : {
3326 117 : std::string object_name;
3327 117 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3328 117 : std::string key;
3329 117 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3330 : {
3331 : return; // LCOV_EXCL_LINE
3332 : }
3333 :
3334 : // when exiting we just remove the entry with that key
3335 : //
3336 117 : auto const obj_entering(f_entering_tickets.find(object_name));
3337 117 : if(obj_entering != f_entering_tickets.end())
3338 : {
3339 115 : auto const key_entering(obj_entering->second.find(key));
3340 115 : if(key_entering != obj_entering->second.end())
3341 : {
3342 114 : obj_entering->second.erase(key_entering);
3343 :
3344 : // we also want to remove it from the ticket f_entering
3345 : // map if it is there (older ones are there!)
3346 : //
3347 114 : bool run_activation(false);
3348 114 : auto const obj_ticket(f_tickets.find(object_name));
3349 114 : if(obj_ticket != f_tickets.end())
3350 : {
3351 10135 : for(auto const & key_ticket : obj_ticket->second)
3352 : {
3353 10021 : key_ticket.second->remove_entering(key);
3354 10021 : run_activation = true;
3355 : }
3356 : }
3357 114 : if(run_activation)
3358 : {
3359 : // hmm... looking at the code closer, it seems that this is
3360 : // still very much necessary
3361 : //
3362 : // --------------------------------------------------
3363 : //
3364 : // try to activate the lock right now since it could
3365 : // very well be the only ticket and that is exactly
3366 : // when it is viewed as active!
3367 : //
3368 : // Note: this is from my old version, if I am correct
3369 : // it cannot happen anymore because (1) this is
3370 : // not the owner so the activation would not
3371 : // take anyway and (2) the ticket is not going
3372 : // to be marked as being ready at this point
3373 : // (that happens later)
3374 : //
3375 : // XXX we probably should remove this statement
3376 : // and the run_activation flag which would
3377 : // then be useless
3378 : //
3379 114 : activate_first_lock(object_name);
3380 : }
3381 :
3382 114 : if(obj_entering->second.empty())
3383 : {
3384 13 : f_entering_tickets.erase(obj_entering);
3385 : }
3386 : }
3387 : else
3388 : {
3389 1 : SNAP_LOG_WARNING
3390 : << "entering lock \""
3391 : << object_name
3392 : << "\" with key \""
3393 : << key
3394 : << "\" in LOCK_EXITING specified lock not found."
3395 : << SNAP_LOG_SEND;
3396 : }
3397 : }
3398 : else
3399 : {
3400 2 : SNAP_LOG_WARNING
3401 : << "LOCK_EXITING specified lock \""
3402 : << object_name
3403 : << "\" not found."
3404 : << SNAP_LOG_SEND;
3405 : }
3406 :
3407 : // the list of tickets is not unlikely changed so we need to make
3408 : // a call to cleanup to make sure the timer is reset appropriately
3409 : //
3410 117 : cleanup();
3411 117 : }
3412 :
3413 :
3414 : /** \brief Acknowledge a lock failure.
3415 : *
3416 : * This function handles the LOCK_FAILED event that another leader may send
3417 : * to us. In that case we have to stop the process.
3418 : *
3419 : * LOCK_FAILED can happen mainly because of tainted data so we should never
3420 : * get here within a leader. However, with time we may add a few errors
3421 : * which could happen for other reasons than just tainted data.
3422 : *
3423 : * When this function finds an entering ticket or a plain ticket to remove
3424 : * according to the object name and key found in the LOCK_FAILED message,
3425 : * it forwards the LOCK_FAILED message to the server and service found in
3426 : * the ticket.
3427 : *
3428 : * \todo
3429 : * This function destroys a ticket even if it is already considered locked.
3430 : * Make double sure that this is okay with a LOCK_FAILED sent to the client.
3431 : *
3432 : * \warning
3433 : * Although this event should not occur, it is problematic since anyone
3434 : * can send a LOCK_FAILED message here and as a side effect destroy a
3435 : * perfectly valid ticket.
3436 : *
3437 : * \param[in] msg The LOCK_FAILED message.
3438 : */
3439 3 : void cluckd::msg_lock_failed(ed::message & msg)
3440 : {
3441 3 : std::string object_name;
3442 3 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3443 3 : std::string key;
3444 3 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3445 : {
3446 : return; // LCOV_EXCL_LINE
3447 : }
3448 :
3449 6 : std::string errmsg("get LOCK_FAILED: ");
3450 3 : errmsg += msg.get_parameter("error");
3451 :
3452 3 : std::string forward_server;
3453 3 : std::string forward_service;
3454 :
3455 : // remove f_entering_tickets entries if we find matches there
3456 : //
3457 3 : auto obj_entering(f_entering_tickets.find(object_name));
3458 3 : if(obj_entering != f_entering_tickets.end())
3459 : {
3460 3 : auto key_entering(obj_entering->second.find(key));
3461 3 : if(key_entering != obj_entering->second.end())
3462 : {
3463 3 : forward_server = key_entering->second->get_server_name();
3464 3 : forward_service = key_entering->second->get_service_name();
3465 :
3466 3 : obj_entering->second.erase(key_entering);
3467 :
3468 3 : errmsg += " -- happened while entering";
3469 : }
3470 :
3471 3 : if(obj_entering->second.empty())
3472 : {
3473 2 : f_entering_tickets.erase(obj_entering);
3474 : }
3475 : }
3476 :
3477 : // remove any f_tickets entries if we find matches there
3478 : //
3479 3 : auto obj_ticket(f_tickets.find(object_name));
3480 3 : if(obj_ticket != f_tickets.end())
3481 : {
3482 2 : bool try_activate(false);
3483 2 : auto key_ticket(obj_ticket->second.find(key));
3484 2 : if(key_ticket == obj_ticket->second.end())
3485 : {
3486 4 : key_ticket = std::find_if(
3487 2 : obj_ticket->second.begin()
3488 2 : , obj_ticket->second.end()
3489 2 : , [&key](auto const & t)
3490 : {
3491 2 : return t.second->get_entering_key() == key;
3492 : });
3493 : }
3494 2 : if(key_ticket != obj_ticket->second.end())
3495 : {
3496 : // Note: if we already found it in the f_entering_tickets then
3497 : // the server and service names are going to be exactly
3498 : // the same so there is no need to test that here
3499 : //
3500 2 : forward_server = key_ticket->second->get_server_name();
3501 2 : forward_service = key_ticket->second->get_service_name();
3502 :
3503 2 : obj_ticket->second.erase(key_ticket);
3504 2 : try_activate = true;
3505 :
3506 2 : errmsg += " -- happened when locked"; // TBD: are we really always locked in this case?
3507 : }
3508 :
3509 2 : if(obj_ticket->second.empty())
3510 : {
3511 1 : f_tickets.erase(obj_ticket);
3512 : }
3513 1 : else if(try_activate)
3514 : {
3515 : // something was erased, a new ticket may be first
3516 : //
3517 1 : activate_first_lock(obj_ticket->first);
3518 : }
3519 : }
3520 :
3521 3 : if(!forward_server.empty()
3522 3 : && !forward_service.empty())
3523 : {
3524 : // we deleted an entry, forward the message to the service
3525 : // that requested that lock
3526 : //
3527 3 : msg.set_server(forward_server);
3528 3 : msg.set_service(forward_service);
3529 3 : f_messenger->send_message(msg);
3530 :
3531 3 : errmsg += " -> ";
3532 3 : errmsg += forward_server;
3533 3 : errmsg += ":";
3534 3 : errmsg += forward_service;
3535 : }
3536 :
3537 3 : SNAP_LOG_IMPORTANT
3538 : << errmsg
3539 : << '.'
3540 : << SNAP_LOG_SEND;
3541 :
3542 : // the list of tickets is not unlikely changed so we need to make
3543 : // a call to cleanup to make sure the timer is reset appropriately
3544 : //
3545 3 : cleanup();
3546 3 : }
3547 :
3548 :
3549 : /** \brief The list of leaders.
3550 : *
3551 : * This function receives the list of leaders after an election.
3552 : *
3553 : * \param[in] msg The LOCK_LEADERS message.
3554 : */
3555 9 : void cluckd::msg_lock_leaders(ed::message & msg)
3556 : {
3557 9 : f_election_date = msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date);
3558 :
3559 : // save the new leaders in our own list
3560 : //
3561 9 : f_leaders.clear();
3562 36 : for(int idx(0); idx < 3; ++idx)
3563 : {
3564 54 : std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3565 27 : if(msg.has_parameter(param_name))
3566 : {
3567 24 : computer::pointer_t leader(std::make_shared<computer>());
3568 24 : std::string const lockid(msg.get_parameter(param_name));
3569 24 : if(leader->set_id(lockid))
3570 : {
3571 24 : computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3572 24 : if(exists != f_computers.end())
3573 : {
3574 : // it already exists, use our existing instance
3575 : //
3576 23 : f_leaders.push_back(exists->second);
3577 : }
3578 : else
3579 : {
3580 : // we do not yet know of that computer, even though
3581 : // it is a leader! (i.e. we are not yet aware that
3582 : // somehow we are connected to it)
3583 : //
3584 1 : leader->set_connected(false);
3585 1 : f_computers[leader->get_name()] = leader;
3586 :
3587 1 : f_leaders.push_back(leader);
3588 : }
3589 : }
3590 24 : }
3591 27 : }
3592 :
3593 9 : if(!f_leaders.empty())
3594 : {
3595 9 : synchronize_leaders();
3596 :
3597 : // set the round-robin position to a random value
3598 : //
3599 : // note: I know the result is likely skewed, c will be set to
3600 : // a number between 0 and 255 and modulo 3 means that you get
3601 : // one extra zero (255 % 3 == 0); however, there are 85 times
3602 : // 3 in 255 so it probably won't be noticeable.
3603 : //
3604 9 : std::uint8_t c;
3605 9 : RAND_bytes(reinterpret_cast<unsigned char *>(&c), sizeof(c));
3606 9 : f_next_leader = c % f_leaders.size();
3607 : }
3608 :
3609 : // the is_daemon_ready() function depends on having f_leaders defined
3610 : // and when that happens we may need to empty our cache
3611 : //
3612 9 : check_lock_status();
3613 9 : }
3614 :
3615 :
3616 : /** \brief Called whenever a cluck computer is acknowledging itself.
3617 : *
3618 : * This function gets called on a LOCK_STARTED event which is sent whenever
3619 : * a cluck process is initialized on a computer.
3620 : *
3621 : * The message is expected to include the computer name. At this time
3622 : * we cannot handle having more than one instance one the same computer.
3623 : *
3624 : * \param[in] message The LOCK_STARTED message.
3625 : */
3626 53 : void cluckd::msg_lock_started(ed::message & msg)
3627 : {
3628 : // I do not think we would ever message ourselves, but in case it happens
3629 : // the rest of the function does not support that case
3630 : //
3631 159 : std::string const server_name(msg.get_parameter(communicatord::g_name_communicatord_param_server_name));
3632 53 : if(server_name == f_server_name)
3633 : {
3634 1 : return;
3635 : }
3636 :
3637 52 : cluck::timeout_t const start_time(msg.get_timespec_parameter(cluck::g_name_cluck_param_start_time));
3638 :
3639 52 : computer::map_t::iterator it(f_computers.find(server_name));
3640 52 : bool new_computer(it == f_computers.end());
3641 52 : if(new_computer)
3642 : {
3643 : // create a computer instance so we know it exists
3644 : //
3645 48 : computer::pointer_t computer(std::make_shared<computer>());
3646 :
3647 : // fill the fields from the "lock_id" parameter
3648 : //
3649 48 : if(!computer->set_id(msg.get_parameter(cluck::g_name_cluck_param_lock_id)))
3650 : {
3651 : // this is not a valid identifier, ignore altogether
3652 : //
3653 1 : return;
3654 : }
3655 47 : computer->set_start_time(start_time);
3656 :
3657 : #ifdef _DEBUG
3658 : // LCOV_EXCL_START
3659 : if(computer->get_name() != server_name)
3660 : {
3661 : throw cluck::logic_error("cluckd::msg_lock_started(): server_name ("
3662 : + server_name
3663 : + ") does not match the new computer name ("
3664 : + computer->get_name()
3665 : + ").");
3666 : }
3667 : // LCOV_EXCL_STOP
3668 : #endif
3669 47 : f_computers[computer->get_name()] = computer;
3670 48 : }
3671 : else
3672 : {
3673 4 : if(!it->second->get_connected())
3674 : {
3675 : // we heard of this computer (because it is/was a leader) but
3676 : // we had not yet received a LOCK_STARTED message from it; so here
3677 : // we consider it a new computer and will reply to the LOCK_STARTED
3678 : //
3679 2 : new_computer = true;
3680 2 : it->second->set_connected(true);
3681 : }
3682 :
3683 4 : if(it->second->get_start_time() != start_time)
3684 : {
3685 : // when the start time changes that means cluckd
3686 : // restarted which can happen without communicatord
3687 : // restarting so here we would not know about the feat
3688 : // without this parameter and in this case it is very
3689 : // much the same as a new computer so send it a
3690 : // LOCK_STARTED message back
3691 : //
3692 4 : new_computer = true;
3693 4 : it->second->set_start_time(start_time);
3694 : }
3695 : }
3696 :
3697 : // keep the newest election results
3698 : //
3699 102 : computer::pointer_t old_leader(is_leader());
3700 51 : if(msg.has_parameter(cluck::g_name_cluck_param_election_date))
3701 : {
3702 6 : snapdev::timespec_ex const election_date(msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date));
3703 6 : if(election_date > f_election_date)
3704 : {
3705 3 : f_election_date = election_date;
3706 3 : f_leaders.clear();
3707 : }
3708 : }
3709 :
3710 51 : if(f_leaders.empty())
3711 : {
3712 164 : for(int idx(0); idx < 3; ++idx)
3713 : {
3714 246 : std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3715 123 : if(msg.has_parameter(param_name))
3716 : {
3717 9 : computer::pointer_t leader(std::make_shared<computer>());
3718 9 : std::string const lockid(msg.get_parameter(param_name));
3719 9 : if(leader->set_id(lockid))
3720 : {
3721 9 : computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3722 9 : if(exists != f_computers.end())
3723 : {
3724 : // it already exists, use our existing instance
3725 : //
3726 8 : f_leaders.push_back(exists->second);
3727 : }
3728 : else
3729 : {
3730 : // we do not yet know of that computer, even though
3731 : // it is a leader! (i.e. we are not yet aware that
3732 : // somehow we are connected to it)
3733 : //
3734 1 : leader->set_connected(false);
3735 1 : f_computers[leader->get_name()] = leader;
3736 :
3737 1 : f_leaders.push_back(leader);
3738 : }
3739 : }
3740 9 : }
3741 123 : }
3742 : }
3743 :
3744 51 : election_status();
3745 :
3746 : // this can have an effect on the lock statuses
3747 : //
3748 51 : check_lock_status();
3749 :
3750 51 : if(new_computer
3751 51 : || old_leader != is_leader())
3752 : {
3753 : // send a reply if that was a new computer
3754 : //
3755 51 : send_lock_started(&msg);
3756 : }
3757 53 : }
3758 :
3759 :
3760 : /** \brief A service asked about the lock status.
3761 : *
3762 : * The lock status is whether the cluck service is ready to receive
3763 : * LOCK messages (LOCK_READY) or is still waiting on a CLUSTER_UP and
3764 : * LOCK_LEADERS to happen (NO_LOCK).
3765 : *
3766 : * Note that LOCK messages are accepted while the lock service is not
3767 : * yet ready, however, those are cached and it is more likely that they
3768 : * timeout before the system is ready to process the request.
3769 : *
3770 : * \param[in] msg The message to reply to.
3771 : */
3772 13 : void cluckd::msg_lock_status(ed::message & msg)
3773 : {
3774 13 : ed::message status_message;
3775 13 : status_message.set_command(is_daemon_ready()
3776 : ? cluck::g_name_cluck_cmd_lock_ready
3777 : : cluck::g_name_cluck_cmd_no_lock);
3778 :
3779 13 : status_message.reply_to(msg);
3780 13 : status_message.add_parameter(communicatord::g_name_communicatord_param_cache, communicatord::g_name_communicatord_value_no);
3781 13 : f_messenger->send_message(status_message);
3782 26 : }
3783 :
3784 :
3785 : /** \brief Another cluckd is sending us its list of tickets.
3786 : *
3787 : * Whenever a cluckd dies, a new one is quickly promoted as a leader
3788 : * and that new leader would have no idea about the existing tickets
3789 : * (locks) so the other two send it a LOCK_TICKETS message.
3790 : *
3791 : * The tickets are defined in the parameter of the same name using
3792 : * the serialization function to transform the objects in a string.
3793 : * Here we can unserialize that string accordingly.
3794 : *
3795 : * First we extract the object name and entering key to see whether
3796 : * we have that ticket already defined. If so, then we unserialize
3797 : * in that existing object. The extraction is additive so we can do
3798 : * it any number of times.
3799 : *
3800 : * \param[in] msg The LOCK_TICKETS message.
3801 : */
3802 2 : void cluckd::msg_lock_tickets(ed::message & msg)
3803 : {
3804 6 : std::string const tickets(msg.get_parameter(cluck::g_name_cluck_param_tickets));
3805 :
3806 : // we have one ticket per line, so we first split per line and then
3807 : // work on lines one at a time
3808 : //
3809 2 : bool added_tickets(false);
3810 2 : std::list<std::string> lines;
3811 2 : snapdev::tokenize_string(lines, tickets, "\n", true);
3812 4 : for(auto const & l : lines)
3813 : {
3814 2 : ticket::pointer_t t;
3815 2 : std::list<std::string> vars;
3816 2 : snapdev::tokenize_string(vars, tickets, "|", true);
3817 2 : auto object_name_value(std::find_if(
3818 : vars.begin()
3819 : , vars.end()
3820 18 : , [](std::string const & vv)
3821 : {
3822 18 : return vv.starts_with("object_name=");
3823 : }));
3824 2 : if(object_name_value != vars.end())
3825 : {
3826 2 : auto entering_key_value(std::find_if(
3827 : vars.begin()
3828 : , vars.end()
3829 6 : , [](std::string const & vv)
3830 : {
3831 6 : return vv.starts_with("entering_key=");
3832 : }));
3833 2 : if(entering_key_value != vars.end())
3834 : {
3835 : // extract the values which start after the '=' sign
3836 : //
3837 2 : std::string const object_name(object_name_value->substr(12));
3838 2 : std::string const entering_key(entering_key_value->substr(13));
3839 :
3840 2 : auto entering_ticket(f_entering_tickets.find(object_name));
3841 2 : if(entering_ticket != f_entering_tickets.end())
3842 : {
3843 0 : auto key_ticket(entering_ticket->second.find(entering_key));
3844 0 : if(key_ticket != entering_ticket->second.end())
3845 : {
3846 0 : t = key_ticket->second;
3847 : }
3848 : }
3849 2 : if(t == nullptr)
3850 : {
3851 2 : auto obj_ticket(f_tickets.find(object_name));
3852 2 : if(obj_ticket != f_tickets.end())
3853 : {
3854 3 : auto key_ticket(std::find_if(
3855 1 : obj_ticket->second.begin()
3856 1 : , obj_ticket->second.end()
3857 1 : , [&entering_key](auto const & o)
3858 : {
3859 1 : return o.second->get_entering_key() == entering_key;
3860 : }));
3861 1 : if(key_ticket != obj_ticket->second.end())
3862 : {
3863 1 : t = key_ticket->second;
3864 : }
3865 : }
3866 : }
3867 :
3868 : // ticket exists? if not create a new one
3869 : //
3870 2 : bool const new_ticket(t == nullptr);
3871 2 : if(new_ticket)
3872 : {
3873 : // create a new ticket, some of the parameters are there just
3874 : // because they are required; they will be replaced by the
3875 : // unserialize call below...
3876 : //
3877 2 : t = std::make_shared<ticket>(
3878 2 : this
3879 1 : , f_messenger
3880 : , object_name
3881 : , ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG
3882 : , entering_key
3883 2 : , cluck::CLUCK_DEFAULT_TIMEOUT + snapdev::now()
3884 : , cluck::CLUCK_DEFAULT_TIMEOUT
3885 1 : , f_server_name
3886 1 : , cluck::g_name_cluck_service_name);
3887 : }
3888 :
3889 2 : t->unserialize(l);
3890 2 : added_tickets = true;
3891 :
3892 : // do a couple of additional sanity tests to
3893 : // make sure that we want to keep new tickets
3894 : //
3895 : // first make sure it is marked as "locked"
3896 : //
3897 : // second check that the owner is a leader that
3898 : // exists (the sender uses a LOCK message for
3899 : // locks that are not yet locked or require
3900 : // a new owner)
3901 : //
3902 2 : if(new_ticket
3903 2 : && t->is_locked())
3904 : {
3905 1 : auto li(std::find_if(
3906 : f_leaders.begin()
3907 : , f_leaders.end()
3908 6 : , [&t](auto const & c)
3909 : {
3910 3 : return t->get_owner() == c->get_name();
3911 : }));
3912 1 : if(li != f_leaders.end())
3913 : {
3914 1 : f_tickets[object_name][t->get_ticket_key()] = t;
3915 : }
3916 : }
3917 2 : }
3918 : }
3919 2 : }
3920 :
3921 : // if we updated some tickets, we need to make sure our timer is setup
3922 : // appropriately
3923 : //
3924 2 : if(added_tickets)
3925 : {
3926 2 : cleanup();
3927 : }
3928 4 : }
3929 :
3930 :
3931 : /** \brief Got the largest ticket from another leader.
3932 : *
3933 : * This function searches the list of entering tickets for the specified
3934 : * object name and key parameters found in the MAX_TICKET message.
3935 : *
3936 : * On the first MAX_TICKET received for an entering ticket, that maximum + 1
3937 : * gets saved in the ticket as its identifier. In other words, we place that
3938 : * ticket at the end of the list.
3939 : *
3940 : * \note
3941 : * Whenever a list of locks goes empty, its maximum ticket number returns to
3942 : * the default: NO_TICKET (0). In the unlikely case where the locks happen
3943 : * back to back and thus the list never becomes empty, the maximum ticket
3944 : * number could end up wrapping around. This is an error we can and it
3945 : * has the side effect of killing the cluck daemon.
3946 : *
3947 : * \param[in] msg The MAX_TICKET message being handled.
3948 : */
3949 124 : void cluckd::msg_max_ticket(ed::message & msg)
3950 : {
3951 124 : std::string object_name;
3952 124 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3953 124 : std::string key;
3954 124 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3955 : {
3956 : return; // LCOV_EXCL_LINE
3957 : }
3958 :
3959 : // the MAX_TICKET is an answer that has to go in a still un-added ticket
3960 : //
3961 124 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3962 124 : if(obj_entering_ticket != f_entering_tickets.end())
3963 : {
3964 124 : auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3965 124 : if(key_entering_ticket != obj_entering_ticket->second.end())
3966 : {
3967 127 : key_entering_ticket->second->max_ticket(msg.get_integer_parameter(cluck::g_name_cluck_param_ticket_id));
3968 : }
3969 : }
3970 125 : }
3971 :
3972 :
3973 : /** \brief Called whenever a remote connection is disconnected.
3974 : *
3975 : * This function is used to know that a remote connection was
3976 : * disconnected.
3977 : *
3978 : * This function handles the HANGUP and DISCONNECTED nessages as required.
3979 : *
3980 : * This allows us to manage the f_computers list of computers running
3981 : * cluckd services. If a cluckd was affected, we verify we still have
3982 : * enough leaders.
3983 : *
3984 : * \note
3985 : * The STATUS message does not directly call this function. Instead, it
3986 : * calls msg_status() which checks that (1) the status is in link with
3987 : * a communicator service and (2) the new service state is "down" and only
3988 : * if so, this function gets called.
3989 : *
3990 : * \param[in] msg The DISCONNECTED, HANGUP, or STATUS message.
3991 : */
3992 22 : void cluckd::msg_server_gone(ed::message & msg)
3993 : {
3994 : // if the server is not defined, ignore that message
3995 : // (already tested by the dispatcher)
3996 : //
3997 22 : if(!msg.has_parameter(communicatord::g_name_communicatord_param_server_name))
3998 : {
3999 : return; // LCOV_EXCL_LINE
4000 : }
4001 :
4002 : // is it us?
4003 : //
4004 66 : std::string const server_name(msg.get_parameter(communicatord::g_name_communicatord_param_server_name));
4005 22 : if(server_name.empty()
4006 22 : || server_name == f_server_name)
4007 : {
4008 : // we never want to remove ourselves
4009 : //
4010 1 : return;
4011 : }
4012 :
4013 : // is "server_name" known?
4014 : //
4015 21 : auto it(f_computers.find(server_name));
4016 21 : if(it == f_computers.end())
4017 : {
4018 : // no computer found, nothing else to do here
4019 : //
4020 1 : return;
4021 : }
4022 20 : computer::pointer_t c(it->second);
4023 :
4024 : // got it, remove it
4025 : //
4026 20 : f_computers.erase(it);
4027 20 : SNAP_LOG_WARNING << "removed \"" << server_name << "\"" << SNAP_LOG_SEND;
4028 :
4029 : // is that computer a leader?
4030 : //
4031 20 : auto li(std::find(
4032 : f_leaders.begin()
4033 : , f_leaders.end()
4034 : , c));
4035 20 : if(li != f_leaders.end())
4036 : {
4037 6 : f_leaders.erase(li);
4038 :
4039 : // elect another computer in case the one we just erased was a leader
4040 : //
4041 : // (of course, no elections occur unless we are the computer with the
4042 : // smallest IP address)
4043 : //
4044 6 : election_status();
4045 :
4046 : // if too many leaders were dropped, we may go back to the NO_LOCK status
4047 : //
4048 : // we only send a NO_LOCK if the election could not re-assign another
4049 : // computer to replace the missing leader(s)
4050 : //
4051 6 : check_lock_status();
4052 : }
4053 22 : }
4054 :
4055 :
4056 : /** \brief With the STATUS message we know of new communicatord services.
4057 : *
4058 : * This function captures the STATUS message and if it sees that the
4059 : * name of the service is a remote communicator daemon then it
4060 : * sends a new LOCK_STARTED message to make sure that all cluck daemons
4061 : * are aware of us.
4062 : *
4063 : * \param[in] msg The STATUS message.
4064 : */
4065 39 : void cluckd::msg_status(ed::message & msg)
4066 : {
4067 : // check the service name, it has to be one that means it is a remote
4068 : // connection with another communicator daemon
4069 : //
4070 117 : std::string const service(msg.get_parameter(communicatord::g_name_communicatord_param_service));
4071 :
4072 39 : if(service.starts_with(communicatord::g_name_communicatord_connection_remote_communicator_in) // remote host connected to us
4073 39 : || service.starts_with(communicatord::g_name_communicatord_connection_remote_communicator_out)) // we connected to remote host
4074 : {
4075 : // check what the status is now: "up" or "down"
4076 : //
4077 42 : std::string const status(msg.get_parameter(communicatord::g_name_communicatord_param_status));
4078 14 : if(status == communicatord::g_name_communicatord_value_up)
4079 : {
4080 : // we already broadcast a LOCK_STARTED from CLUSTER_UP
4081 : // and that's enough
4082 : //
4083 : ;
4084 : }
4085 : else
4086 : {
4087 : // host is down, remove from our list of hosts
4088 : //
4089 10 : msg_server_gone(msg);
4090 : }
4091 14 : }
4092 78 : }
4093 :
4094 :
4095 : /** \brief Acknowledgement that the ticket was properly added.
4096 : *
4097 : * This function gets called whenever the ticket was added on another
4098 : * leader.
4099 : *
4100 : * \param[in] msg The TICKET_ADDED message being handled.
4101 : */
4102 122 : void cluckd::msg_ticket_added(ed::message & msg)
4103 : {
4104 122 : std::string object_name;
4105 122 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4106 122 : std::string key;
4107 122 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4108 : {
4109 : return; // LCOV_EXCL_LINE
4110 : }
4111 :
4112 122 : auto const obj_ticket(f_tickets.find(object_name));
4113 122 : if(obj_ticket != f_tickets.end())
4114 : {
4115 121 : auto const key_ticket(obj_ticket->second.find(key));
4116 121 : if(key_ticket != obj_ticket->second.end())
4117 : {
4118 : // this ticket exists on this system
4119 : //
4120 120 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
4121 120 : if(obj_entering_ticket == f_entering_tickets.end())
4122 : {
4123 : // this happens all the time because the entering ticket
4124 : // gets removed on the first TICKET_ADDED we receive so
4125 : // on the second one we get here...
4126 : //
4127 7 : SNAP_LOG_TRACE
4128 : << "called with object \""
4129 : << object_name
4130 : << "\" not present in f_entering_ticket (key: \""
4131 : << key
4132 : << "\")."
4133 : << SNAP_LOG_SEND;
4134 7 : return;
4135 : }
4136 113 : key_ticket->second->ticket_added(obj_entering_ticket->second);
4137 : }
4138 : else
4139 : {
4140 1 : SNAP_LOG_WARNING
4141 : << "found object \""
4142 : << object_name
4143 : << "\" but could not find a corresponding ticket with key \""
4144 : << key
4145 : << "\"..."
4146 : << SNAP_LOG_SEND;
4147 : }
4148 : }
4149 : else
4150 : {
4151 1 : SNAP_LOG_WARNING
4152 : << "object \""
4153 : << object_name
4154 : << "\" not found."
4155 : << SNAP_LOG_SEND;
4156 : }
4157 129 : }
4158 :
4159 :
4160 : /** \brief Let other leaders know that the ticket is ready.
4161 : *
4162 : * This message is received when the owner of a ticket marks a
4163 : * ticket as ready. This means the ticket is available for locking.
4164 : *
4165 : * \param[in] msg The TICKET_READY message.
4166 : */
4167 103 : void cluckd::msg_ticket_ready(ed::message & msg)
4168 : {
4169 103 : std::string object_name;
4170 103 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4171 103 : std::string key;
4172 103 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4173 : {
4174 : return; // LCOV_EXCL_LINE
4175 : }
4176 :
4177 103 : auto obj_ticket(f_tickets.find(object_name));
4178 103 : if(obj_ticket != f_tickets.end())
4179 : {
4180 103 : auto key_ticket(obj_ticket->second.find(key));
4181 103 : if(key_ticket != obj_ticket->second.end())
4182 : {
4183 : // we can mark this ticket as activated
4184 : //
4185 103 : key_ticket->second->set_ready();
4186 : }
4187 : }
4188 103 : }
4189 :
4190 :
4191 : /** \brief Unlock the resource.
4192 : *
4193 : * This function unlocks the resource specified in the call to lock().
4194 : *
4195 : * \param[in] msg The unlock message.
4196 : *
4197 : * \sa msg_lock()
4198 : */
4199 116 : void cluckd::msg_unlock(ed::message & msg)
4200 : {
4201 116 : if(!is_daemon_ready())
4202 : {
4203 1 : SNAP_LOG_ERROR
4204 : << "received an UNLOCK when cluckd is not ready to receive lock related messages."
4205 : << SNAP_LOG_SEND;
4206 1 : return;
4207 : }
4208 :
4209 115 : if(is_leader() == nullptr)
4210 : {
4211 : // we are not a leader, we need to forward to a leader to handle
4212 : // the message properly
4213 : //
4214 1 : forward_message_to_leader(msg);
4215 1 : return;
4216 : }
4217 :
4218 114 : std::string object_name;
4219 114 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4220 114 : pid_t client_pid(0);
4221 114 : if(!get_parameters(msg, &object_name, &tag, &client_pid, nullptr, nullptr, nullptr))
4222 : {
4223 1 : return;
4224 : }
4225 :
4226 : // if the ticket still exists, send the UNLOCKED and then erase it
4227 : //
4228 113 : auto obj_ticket(f_tickets.find(object_name));
4229 113 : if(obj_ticket != f_tickets.end())
4230 : {
4231 448 : std::string const server_name(msg.has_parameter("lock_proxy_server_name")
4232 : ? msg.get_parameter("lock_proxy_server_name")
4233 224 : : msg.get_sent_from_server());
4234 :
4235 : //std::string const service_name(msg.has_parameter("lock_proxy_service_name")
4236 : // ? msg.get_parameter("lock_proxy_service_name")
4237 : // : msg.get_sent_from_service());
4238 :
4239 336 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
4240 336 : auto key_ticket(std::find_if(
4241 112 : obj_ticket->second.begin()
4242 112 : , obj_ticket->second.end()
4243 112 : , [&entering_key](auto const & t)
4244 : {
4245 112 : return t.second->get_entering_key() == entering_key;
4246 : }));
4247 112 : if(key_ticket != obj_ticket->second.end())
4248 : {
4249 : // this function will send a DROPTICKET to the other leaders
4250 : // and the UNLOCKED to the source (unless we already sent the
4251 : // UNLOCKED which gets sent at most once.)
4252 : //
4253 111 : key_ticket->second->drop_ticket();
4254 :
4255 111 : obj_ticket->second.erase(key_ticket);
4256 111 : if(obj_ticket->second.empty())
4257 : {
4258 : // we are done with this one!
4259 : //
4260 9 : f_tickets.erase(obj_ticket);
4261 : }
4262 :
4263 : // TBD: the clean up calls this function if it removes a lock
4264 : // that timed out but not otherwise; maybe we could consider
4265 : // doing it slightly differently where we pass a flag to
4266 : // the clean up function to force the call either way
4267 : //
4268 111 : activate_first_lock(object_name);
4269 : }
4270 : else
4271 : {
4272 1 : SNAP_LOG_MAJOR
4273 : << "UNLOCK could not find key \""
4274 : << entering_key
4275 : << "\" in object \""
4276 : << object_name
4277 : << "\"."
4278 : << SNAP_LOG_SEND;
4279 : }
4280 112 : }
4281 : else
4282 : {
4283 1 : SNAP_LOG_WARNING
4284 : << "UNLOCK could not find object \""
4285 : << object_name
4286 : << "\"."
4287 : << SNAP_LOG_SEND;
4288 : }
4289 :
4290 : // reset the timeout with the other locks
4291 : //
4292 113 : cleanup();
4293 114 : }
4294 :
4295 :
4296 :
4297 : } // namespace cluck_daemon
4298 : // vim: ts=4 sw=4 et
|