Line data Source code
1 : // Copyright (c) 2016-2025 Made to Order Software Corp. All Rights Reserved
2 : //
3 : // https://snapwebsites.org/project/cluck
4 : // contact@m2osw.com
5 : //
6 : // This program is free software: you can redistribute it and/or modify
7 : // it under the terms of the GNU General Public License as published by
8 : // the Free Software Foundation, either version 3 of the License, or
9 : // (at your option) any later version.
10 : //
11 : // This program is distributed in the hope that it will be useful,
12 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : // GNU General Public License for more details.
15 : //
16 : // You should have received a copy of the GNU General Public License
17 : // along with this program. If not, see <https://www.gnu.org/licenses/>.
18 :
19 :
20 : // self
21 : //
22 : #include "cluckd.h"
23 :
24 :
25 :
26 : // cluck
27 : //
28 : #include <cluck/exception.h>
29 : #include <cluck/names.h>
30 : #include <cluck/version.h>
31 :
32 :
33 : // communicatord
34 : //
35 : #include <communicatord/flags.h>
36 : #include <communicatord/names.h>
37 :
38 :
39 : // as2js
40 : //
41 : #include <as2js/json.h>
42 :
43 :
44 : // eventdispatcher
45 : //
46 : #include <eventdispatcher/names.h>
47 :
48 :
49 : // snapdev
50 : //
51 : #include <snapdev/gethostname.h>
52 : #include <snapdev/hexadecimal_string.h>
53 : #include <snapdev/stringize.h>
54 : #include <snapdev/tokenize_string.h>
55 : #include <snapdev/to_string_literal.h>
56 :
57 :
58 : // snaplogger
59 : //
60 : #include <snaplogger/logger.h>
61 : #include <snaplogger/options.h>
62 : #include <snaplogger/severity.h>
63 :
64 :
65 : // advgetopt
66 : //
67 : #include <advgetopt/advgetopt.h>
68 : #include <advgetopt/exception.h>
69 :
70 :
71 : // C++
72 : //
73 : #include <algorithm>
74 : #include <iostream>
75 : #include <sstream>
76 :
77 :
78 : // openssl
79 : //
80 : #include <openssl/rand.h>
81 :
82 :
83 : // last include
84 : //
85 : #include <snapdev/poison.h>
86 :
87 :
88 :
89 : /** \file
90 : * \brief Implementation of the inter-process lock mechanism.
91 : *
92 : * This file implements an inter-process lock that functions between
93 : * any number of machines. The basic algorithm used is the Bakery
94 : * Algorithm by Lamport. The concept is simple: you get a waiting
95 : * ticket and loop through the tickets until it yours is picked.
96 : *
97 : * Contrary to a multi-processor environment thread synchronization,
98 : * this lock system uses messages and arrays to know its current
99 : * state. A user interested in obtaining a lock sends a LOCK
100 : * message. The cluck daemon then waits until the lock is
101 : * obtained and sends a LOCKED as a reply. Once done with the lock,
102 : * the user sends UNLOCK to release the lock.
103 : *
104 : * The implementation makes use of any number of cluck instances.
105 : * The locking mechanism makes use of the QUORUM voting system to
106 : * know that enough of the other cluck agree on a statement.
107 : * This allows the cluck daemon to obtain/release locks in an
108 : * unknown network environment (i.e. any one of the machines may
109 : * be up or down and the locking mechanism still functions as
110 : * expected).
111 : *
112 : * The QUORUM is computed using the following formula:
113 : *
114 : * \f[
115 : * {number of computers} over 2 + 1
116 : * \f]
117 : *
118 : * Note that the lock mechanism itself only uses 3 computers (2 or 1 if
119 : * your cluster is that small). So in a cluster of, say, 100 computers,
120 : * you would still end up using just 3 for the locks. However, the
121 : * elections still require you to have a quorum of all the computers
122 : * (100 / 2 + 1 is at least 51 computers).
123 : *
124 : * \note
125 : * The cluck implementation checks parameters and throws away
126 : * messages that are definitely not going to be understood. However,
127 : * it is, like most Snap! daemons, very trustworthy of other cluck
128 : * daemons and does not expect other daemons to mess around with its
129 : * sequence of lock messages used to ensure that everything worked as
130 : * expected.
131 : */
132 :
133 :
134 :
135 : namespace cluck_daemon
136 : {
137 :
138 :
139 : namespace
140 : {
141 :
142 :
143 :
144 : constexpr std::string_view g_default_candidate_priority =
145 : snapdev::integer_to_string_literal<computer::PRIORITY_DEFAULT>.data();
146 :
147 :
148 : advgetopt::option const g_options[] =
149 : {
150 : advgetopt::define_option(
151 : advgetopt::Name("candidate-priority")
152 : , advgetopt::ShortName('p')
153 : , advgetopt::Flags(advgetopt::all_flags<
154 : advgetopt::GETOPT_FLAG_REQUIRED
155 : , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
156 : , advgetopt::Help("Define the priority of this candidate (1 to 14) to gain a leader position or \"off\".")
157 : , advgetopt::DefaultValue(g_default_candidate_priority.data())
158 : ),
159 : advgetopt::define_option(
160 : advgetopt::Name("server-name")
161 : , advgetopt::ShortName('n')
162 : , advgetopt::Flags(advgetopt::all_flags<
163 : advgetopt::GETOPT_FLAG_DYNAMIC_CONFIGURATION
164 : , advgetopt::GETOPT_FLAG_REQUIRED
165 : , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
166 : , advgetopt::Help("Set the name of this server instance.")
167 : ),
168 : advgetopt::end_options()
169 : };
170 :
171 :
172 : advgetopt::group_description const g_group_descriptions[] =
173 : {
174 : advgetopt::define_group(
175 : advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_COMMANDS)
176 : , advgetopt::GroupName("command")
177 : , advgetopt::GroupDescription("Commands:")
178 : ),
179 : advgetopt::define_group(
180 : advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_OPTIONS)
181 : , advgetopt::GroupName("option")
182 : , advgetopt::GroupDescription("Options:")
183 : ),
184 : advgetopt::end_groups()
185 : };
186 :
187 :
188 : constexpr char const * const g_configuration_files[] =
189 : {
190 : "/etc/cluck/cluckd.conf",
191 : nullptr
192 : };
193 :
194 :
195 : advgetopt::options_environment const g_options_environment =
196 : {
197 : .f_project_name = "cluckd",
198 : .f_group_name = "cluck",
199 : .f_options = g_options,
200 : .f_environment_variable_name = "CLUCKD_OPTIONS",
201 : .f_configuration_files = g_configuration_files,
202 : .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_SYSTEM_PARAMETERS
203 : | advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
204 : .f_help_header = "Usage: %p [-<opt>]\n"
205 : "where -<opt> is one or more of:",
206 : .f_help_footer = "%c",
207 : .f_version = CLUCK_VERSION_STRING,
208 : .f_license = "GNU GPL v3",
209 : .f_copyright = "Copyright (c) 2013-"
210 : SNAPDEV_STRINGIZE(UTC_BUILD_YEAR)
211 : " by Made to Order Software Corporation -- All Rights Reserved",
212 : .f_groups = g_group_descriptions,
213 : };
214 :
215 :
216 :
217 : }
218 : // no name namespace
219 :
220 :
221 :
222 :
223 :
224 :
225 :
226 :
227 :
228 :
229 :
230 :
231 :
232 : /** \class cluckd
233 : * \brief Class handling intercomputer locking.
234 : *
235 : * This class is used in order to create intercomputer locks on request.
236 : *
237 : * The class uses Snap! Communicator messages and implements
238 : * the LOCK and UNLOCK commands and sends the LOCKED and UNLOCKED
239 : * commands to its senders.
240 : *
241 : * The system makes use of the Lamport's Bakery Algorithm. This is
242 : * explained in the ticket class.
243 : *
244 : * \note
245 : * At this time, there is one potential problem that can arise: the
246 : * lock may fail to concretize because the computer to which you
247 : * first sent the LOCK message goes down in some way. The other
248 : * cluck computers will have no clue by which computer the lock
249 : * was being worked on and whether one of them should take over.
250 : * One way to remediate is to run one instance of cluck on each
251 : * computer on which a lock is likely to happen. Then the LOCK
252 : * message will be proxied to the proper destination (a leader).
253 : *
254 : * \warning
255 : * The LOCK mechanism uses the system clock of each computer to know when
256 : * a lock times out. You are responsible for making sure that all those
257 : * computers have a synchronized clocked (i.e. run a timed daemon such
258 : * as ntpd). The difference in time should be as small as possible. The
259 : * precision required by cluck is around 1 second.
260 : *
261 : * The following shows the messages used to promote 3 leaders, in other
262 : * words it shows how the election process happens. The election itself
263 : * is done on the computer that is part of the cluster considered to be up
264 : * and which has the smallest IP address. That's the one computer that will
265 : * send the LOCKLEADERS. As soon as that happens all the other nodes on the
266 : * cluster know the leaders and inform new nodes through the LOCKSTARTED
267 : * message.
268 : *
269 : * \msc
270 : * Communicator,A,B,C,D,E,F;
271 : *
272 : * A->Communicator [label="REGISTER"];
273 : * Communicator->A [label="HELP"];
274 : * Communicator->A [label="READY"];
275 : *
276 : * A->Communicator [label="CLUSTERSTATUS"];
277 : * Communicator->A [label="CLUSTERUP"];
278 : *
279 : * # Broadcast to B to F, but we do not know who's up at this point
280 : * A->* [label="LOCKSTARTED"];
281 : *
282 : * # A answers each one of those because for it B, C, D, ... are new
283 : * B->A [label="LOCKSTARTED"];
284 : * A->B [label="LOCKSTARTED"];
285 : *
286 : * C->A [label="LOCKSTARTED"];
287 : * A->C [label="LOCKSTARTED"];
288 : *
289 : * D->A [label="LOCKSTARTED"];
290 : * A->D [label="LOCKSTARTED"];
291 : *
292 : * # When we reach here we have a CLUSTERUP in terms of cluck daemons
293 : * # Again here we broadcast, maybe we should send to known computers instead?
294 : * # IMPORTANT: A determines the leaders only if its IP is the smallest
295 : * A->* [label="LOCKLEADERS"];
296 : *
297 : * # Here the replies from A will include the leaders
298 : * # Of course, as shown above, E will have sent the message to all and
299 : * # so it will receive the leaders from multiple sources
300 : * E->A [label="LOCKSTARTED"];
301 : * A->E [label="LOCKSTARTED"];
302 : *
303 : * F->A [label="LOCKSTARTED"];
304 : * A->F [label="LOCKSTARTED"];
305 : * \endmsc
306 : *
307 : * \sa ticket
308 : */
309 :
310 :
311 :
312 : /** \brief Initializes a cluckd object.
313 : *
314 : * This function parses the command line arguments, reads configuration
315 : * files, setups the logger.
316 : *
317 : * It also immediately executes a --help or a --version command line
318 : * option and exits the process if these are present.
319 : *
320 : * \param[in] argc The number of arguments in the argv array.
321 : * \param[in] argv The array of argument strings.
322 : */
323 34 : cluckd::cluckd(int argc, char * argv[])
324 34 : : f_opts(g_options_environment)
325 : {
326 34 : snaplogger::add_logger_options(f_opts);
327 :
328 : // before we can parse command line arguments, we must create the
329 : // fluid settings & communicator client objects which happen to
330 : // dynamically add command line options to f_opts
331 : //
332 34 : f_messenger = std::make_shared<messenger>(this, f_opts);
333 :
334 34 : f_opts.finish_parsing(argc, argv);
335 96 : if(!snaplogger::process_logger_options(f_opts, "/etc/cluck/logger"))
336 : {
337 3 : throw advgetopt::getopt_exit("logger options generated an error.", 0);
338 : }
339 :
340 : // determine this server name
341 : //
342 : // TODO: if the name of the server is changed, we should reboot, but
343 : // to the minimum we need to restart cluckd (among other daemons)
344 : // remember that snapmanager.cgi gives you that option;
345 : // we CANNOT use fluid-settings for this one since each computer
346 : // must have a different name
347 : //
348 93 : if(f_opts.is_defined("server-name"))
349 : {
350 6 : f_server_name = f_opts.get_string("server-name");
351 : }
352 31 : if(f_server_name.empty())
353 : {
354 29 : f_server_name = snapdev::gethostname();
355 : }
356 :
357 31 : f_start_time = snapdev::now();
358 70 : }
359 :
360 :
361 : /** \brief Do some clean ups.
362 : *
363 : * At this point, the destructor is present mainly because we have
364 : * some virtual functions.
365 : */
366 31 : cluckd::~cluckd()
367 : {
368 31 : }
369 :
370 :
371 : /** \brief Finish the cluck daemon initialization.
372 : *
373 : * This function creates all the connections used by the cluck daemon.
374 : *
375 : * \note
376 : * This is separate from the run() function so we can run unit tests
377 : * against the cluck daemon.
378 : *
379 : * \sa run()
380 : */
381 25 : void cluckd::add_connections()
382 : {
383 25 : f_communicator = ed::communicator::instance();
384 :
385 : // capture Ctrl-C (SIGINT) to get a clean exit by default
386 : //
387 25 : f_interrupt = std::make_shared<interrupt>(this);
388 25 : f_communicator->add_connection(f_interrupt);
389 :
390 : // timer so we can timeout locks
391 : //
392 25 : f_timer = std::make_shared<timer>(this);
393 25 : f_communicator->add_connection(f_timer);
394 :
395 : // add the messenger used to communicate with the communicator daemon
396 : // and other services as required
397 : //
398 25 : f_communicator->add_connection(f_messenger);
399 :
400 : // the following call actually connects the messenger to the
401 : // communicator daemon
402 : //
403 25 : f_messenger->finish_parsing();
404 25 : }
405 :
406 :
407 25 : void cluckd::set_my_ip_address(addr::addr const & a)
408 : {
409 25 : f_my_ip_address = a;
410 25 : }
411 :
412 :
413 : /** \brief Run the cluck daemon.
414 : *
415 : * This function is the core function of the daemon. It runs the loop
416 : * used to lock processes from any number of computers that have access
417 : * to the cluck daemon network.
418 : *
419 : * \sa add_connections()
420 : */
421 25 : void cluckd::run()
422 : {
423 50 : SNAP_LOG_INFO
424 : << "--------------------------------- cluckd started."
425 : << SNAP_LOG_SEND;
426 :
427 : // now run our listening loop
428 : //
429 25 : f_communicator->run();
430 24 : }
431 :
432 :
433 :
434 :
435 :
436 : /** \brief Return the number of known computers running cluckd.
437 : *
438 : * This function is used by the ticket objects to calculate
439 : * the quorum so as to know how many computers need to reply to
440 : * our messages before we can be sure we got the correct
441 : * results.
442 : *
443 : * \return The number of instances of cluckd running and connected.
444 : */
445 483 : int cluckd::get_computer_count() const
446 : {
447 483 : return f_computers.size();
448 : }
449 :
450 :
451 : /** \brief Get the name of the server we are running on.
452 : *
453 : * This function returns the name of the server this instance of
454 : * cluck is running. It is used by the ticket implementation
455 : * to know whether to send a reply to the cluck object (i.e.
456 : * at this time we can send messages to that object only from the
457 : * server it was sent from).
458 : *
459 : * \return The name of the server cluck is running on.
460 : */
461 261 : std::string const & cluckd::get_server_name() const
462 : {
463 261 : return f_server_name;
464 : }
465 :
466 :
467 : /** \brief Check whether the cluck daemon is ready to process lock requests.
468 : *
469 : * This function checks whether the cluck daemon is ready by looking
470 : * at whether it has leaders and if so, whether each leader is connected.
471 : *
472 : * Once both tests succeeds, this cluck daemon can forward the locks to
473 : * the leaders. If it is a leader itself, it can enter a ticket in
474 : * the selection and message both of the other leaders about it.
475 : *
476 : * \return true once locks can be processed.
477 : */
478 396 : bool cluckd::is_daemon_ready() const
479 : {
480 : // we are not yet properly registered
481 : //
482 396 : if(f_messenger == nullptr || !f_messenger->is_ready())
483 : {
484 9 : return false;
485 : }
486 :
487 : // we need a stable clock otherwise we cannot guarantee that the time
488 : // between servers is going to be sufficiently close
489 : //
490 387 : if(!f_stable_clock)
491 : {
492 9 : return false;
493 : }
494 :
495 : // without at least one leader we are definitely not ready
496 : //
497 378 : if(f_leaders.empty())
498 : {
499 138 : SNAP_LOG_TRACE
500 : << "not considered ready: no leaders."
501 : << SNAP_LOG_SEND;
502 69 : return false;
503 : }
504 :
505 : // enough leaders for that cluster?
506 : //
507 : // we consider that having at least 2 leaders is valid because locks
508 : // will still work, an election should be happening when we lose a
509 : // leader fixing that temporary state
510 : //
511 : // the test below allows for the case where we have a single computer
512 : // too (i.e. "one neighbor")
513 : //
514 : // notice how not having received the CLUSTERUP would be taken in
515 : // account here since f_neighbors_count will still be 0 in that case
516 : // (however, the previous empty() test already takes that in account)
517 : //
518 309 : if(f_leaders.size() == 1
519 309 : && f_neighbors_count != 1)
520 : {
521 4 : SNAP_LOG_TRACE
522 : << "not considered ready: not enough leaders for this cluster."
523 : << SNAP_LOG_SEND;
524 2 : return false;
525 : }
526 :
527 : // the election_status() function verifies that the quorum is
528 : // attained, but it can change if the cluster grows or shrinks
529 : // so we have to check here again as the lock system becomes
530 : // "unready" when the quorum is lost; see that other function
531 : // for additional info
532 :
533 : // this one probably looks complicated...
534 : //
535 : // if our quorum is 1 or 2 then we need a number of computers
536 : // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
537 : // status which we verify here)
538 : //
539 614 : if(f_neighbors_quorum < 3
540 307 : && f_computers.size() < f_neighbors_count)
541 : {
542 3 : SNAP_LOG_TRACE
543 1 : << "not considered ready: quorum changed, re-election expected soon (number of neighbors: "
544 1 : << f_neighbors_count
545 1 : << ", neighbors quorum: "
546 1 : << f_neighbors_quorum
547 1 : << ", number of computers: "
548 1 : << f_computers.size()
549 : << "."
550 : << SNAP_LOG_SEND;
551 1 : return false;
552 : }
553 :
554 : // the neighbors count & quorum can change over time so
555 : // we have to verify that the number of computers is
556 : // still acceptable here
557 : //
558 306 : if(f_computers.size() < f_neighbors_quorum)
559 : {
560 4 : SNAP_LOG_TRACE
561 : << "not considered ready: quorum lost, re-election expected soon."
562 : << SNAP_LOG_SEND;
563 2 : return false;
564 : }
565 :
566 : // are all leaders connected to us?
567 : //
568 304 : std::size_t ready(0);
569 304 : computer::pointer_t last_leader;
570 950 : for(auto const & l : f_leaders)
571 : {
572 646 : if(l->get_connected())
573 : {
574 644 : ++ready;
575 : }
576 : else
577 : {
578 : // attempt resending a LOCK_STARTED because it could be that it
579 : // did not work quite right and the cluck daemons are not
580 : // going to ever talk with each others otherwise
581 : //
582 : // we also make sure we do not send the message too many times,
583 : // it should be resolved in five seconds or less...
584 : //
585 2 : time_t const now(time(nullptr));
586 2 : if(now >= f_pace_lockstarted)
587 : {
588 : // pause for 5 to 6 seconds in case this happens a lot
589 : //
590 2 : f_pace_lockstarted = now + 5;
591 :
592 : // only send it to that specific cluck daemon
593 : //
594 2 : ed::message temporary_message;
595 2 : temporary_message.set_sent_from_server(l->get_name());
596 6 : temporary_message.set_sent_from_service(cluck::g_name_cluck_service_name);
597 2 : const_cast<cluckd *>(this)->send_lock_started(&temporary_message);
598 2 : }
599 :
600 2 : last_leader = l;
601 : }
602 : }
603 :
604 : // we only need to have 2 leaders to have a functional system with
605 : // 2 or 3 leaders total
606 : //
607 304 : if(ready >= 2
608 304 : || ready == f_leaders.size())
609 : {
610 304 : return true;
611 : }
612 :
613 : // we're still not ready
614 : //
615 0 : SNAP_LOG_TRACE
616 : << "not considered ready: no direct connection with leader: \""
617 0 : << last_leader->get_name()
618 : << "\"."
619 : << SNAP_LOG_SEND;
620 :
621 0 : return false;
622 304 : }
623 :
624 :
625 : /** \brief Search for a leader.
626 : *
627 : * This function goes through the list of leaders to determine whether
628 : * the specified identifier represents a leader. If so it returns a pointer
629 : * to that leader computer object.
630 : *
631 : * When the function is called with an empty string as the computer
632 : * identifier, this computer is checked to see whether it is a leader.
633 : *
634 : * \warning
635 : * This function is considered slow since it goes through the list of leaders
636 : * on each call. On the other hand, it's only 1 to 3 leaders. Yet, you should
637 : * cache the result within your function if you need the result multiple
638 : * times, as in:
639 : *
640 : * \code
641 : * computer::pointer_t leader(is_leader());
642 : * if(leader != nullptr)
643 : * {
644 : * // ... use `leader` any number of times ...
645 : * }
646 : * \endcode
647 : *
648 : * \par
649 : * This is done that way because the function may return a different result
650 : * over time (i.e. if a re-election happens). So you do not want to cache
651 : * the results between calls to your function.
652 : *
653 : * \param[in] id The identifier of the leader to search, if empty, default
654 : * to f_my_id (i.e. whether this cluckd is a leader).
655 : *
656 : * \return The leader computer::pointer_t or a null pointer.
657 : */
658 1937 : computer::pointer_t cluckd::is_leader(std::string id) const
659 : {
660 1937 : if(id.empty())
661 : {
662 1937 : id = f_my_id;
663 : }
664 :
665 3874 : auto const l(std::find_if(
666 : f_leaders.begin()
667 : , f_leaders.end()
668 3874 : , [id](auto const & c){
669 2055 : return c->get_id() == id;
670 1937 : }));
671 1937 : if(l != f_leaders.end())
672 : {
673 1861 : return *l;
674 : }
675 :
676 76 : return computer::pointer_t();
677 : }
678 :
679 :
680 : /** \brief Get pointer to leader A.
681 : *
682 : * We have 1 to 3 leaders in each cluck daemon. There is "self", leader A,
683 : * and leader B. This function returns leader A or a null pointer if there
684 : * is only one leader.
685 : *
686 : * Leader A is either f_leaders[0] or f_leaders[1]. If "self" is f_leaders[0]
687 : * then the function returns f_leaders[1], If "self" is f_leaders[1], then
688 : * the function returns f_leaders[0].
689 : *
690 : * \note
691 : * In a setup where you have only 1 computer total, this function always
692 : * returns a null pointer.
693 : *
694 : * \note
695 : * If the elections have not yet happened, this function always returns
696 : * a null pionter.
697 : *
698 : * \return A pointer to the leader A computer or nullptr if there is no
699 : * such computer.
700 : */
701 814 : computer::pointer_t cluckd::get_leader_a() const
702 : {
703 : #ifdef _DEBUG
704 814 : if(is_leader() == nullptr)
705 : {
706 27 : throw cluck::logic_error("cluckd::get_leader_a(): only a leader can call this function.");
707 : }
708 : #endif
709 :
710 805 : switch(f_leaders.size())
711 : {
712 : case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
713 : default:
714 : throw cluck::logic_error("cluckd::get_leader_a(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
715 :
716 19 : case 1:
717 19 : return computer::pointer_t();
718 :
719 786 : case 2:
720 : case 3:
721 786 : return f_leaders[f_leaders[0]->is_self() ? 1 : 0];
722 :
723 : }
724 : }
725 :
726 :
727 : /** \brief Get pointer to leader B.
728 : *
729 : * We have 1 to 3 leaders in each cluck daemon. There is "self", leader A,
730 : * and leader B. This function returns leader B or a null pointer if there
731 : * is only one leader.
732 : *
733 : * Leader B is either f_leaders[1] or f_leaders[2]. If "self" is f_leaders[1]
734 : * then the function returns f_leaders[2], If "self" is f_leaders[2], then
735 : * the function returns f_leaders[1].
736 : *
737 : * \note
738 : * In a setup where you have only 1 or 2 computers total, this function
739 : * always returns a null pointer.
740 : *
741 : * \note
742 : * If the elections have not yet happened, this function always returns
743 : * a null pionter.
744 : *
745 : * \return A pointer to the leader B computer or nullptr if there is no
746 : * such computer.
747 : */
748 795 : computer::pointer_t cluckd::get_leader_b() const
749 : {
750 : #ifdef _DEBUG
751 795 : if(is_leader() == nullptr)
752 : {
753 27 : throw cluck::logic_error("cluckd::get_leader_b(): only a leader can call this function.");
754 : }
755 : #endif
756 :
757 786 : switch(f_leaders.size())
758 : {
759 : case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
760 : default:
761 : throw cluck::unexpected_case("cluckd::get_leader_b(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
762 :
763 714 : case 1:
764 : case 2: // we have a leader A but no leader B when we have only 2 leaders
765 714 : return computer::pointer_t();
766 :
767 72 : case 3:
768 72 : return f_leaders[f_leaders[2]->is_self() ? 1 : 2];
769 :
770 : }
771 : }
772 :
773 :
774 :
775 : /** \brief Return a JSON with the state of this cluckd object.
776 : *
777 : * This function generates a JSON string of the current state of this
778 : * cluck daemon and replies with that message back to the caller.
779 : *
780 : * This is primarily used to debug a cluckd instance and make sure that the
781 : * state is how you would otherwise expect it to be.
782 : *
783 : * \todo
784 : * The list of tickets uses the internal serialization mechanism, which
785 : * creates a much small array of tickets. At some point, we should transform
786 : * that to output JSON instead.
787 : *
788 : * \param[in] msg The INFO message.
789 : */
790 20 : void cluckd::msg_info(ed::message & msg)
791 : {
792 20 : std::stringstream ss;
793 :
794 20 : as2js::position p;
795 60 : p.set_filename("cluckd.cpp");
796 60 : p.set_function("msg_info");
797 :
798 20 : as2js::json::json_value::object_t obj;
799 20 : as2js::json::json_value::pointer_t result(std::make_shared<as2js::json::json_value>(p, obj));
800 :
801 : {
802 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, is_daemon_ready()));
803 60 : result->set_member("daemon_ready", value);
804 20 : }
805 :
806 20 : if(!f_my_id.empty())
807 : {
808 19 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, f_my_id));
809 57 : result->set_member("id", value);
810 19 : }
811 :
812 : {
813 20 : addr::addr zero;
814 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
815 20 : f_my_ip_address == zero
816 40 : ? "<not assigned>"
817 40 : : f_my_ip_address.to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
818 60 : result->set_member("ip", value);
819 20 : }
820 :
821 : {
822 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_count)));
823 60 : result->set_member("neighbors_count", value);
824 20 : }
825 :
826 : {
827 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_quorum)));
828 60 : result->set_member("neighbors_quorum", value);
829 20 : }
830 :
831 : {
832 20 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_leaders.size())));
833 60 : result->set_member("leaders_count", value);
834 20 : }
835 :
836 20 : if(!f_message_cache.empty())
837 : {
838 1 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_message_cache.size())));
839 3 : result->set_member("cache_size", value);
840 1 : }
841 :
842 : {
843 20 : as2js::json::json_value::array_t computers;
844 20 : as2js::json::json_value::pointer_t list(std::make_shared<as2js::json::json_value>(p, computers));
845 :
846 112 : for(auto const & c : f_computers)
847 : {
848 92 : as2js::json::json_value::object_t computer;
849 92 : as2js::json::json_value::pointer_t item(std::make_shared<as2js::json::json_value>(p, computer));
850 :
851 : {
852 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_name()));
853 276 : item->set_member("name", value);
854 92 : }
855 :
856 : {
857 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_id()));
858 276 : item->set_member("id", value);
859 92 : }
860 :
861 : {
862 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_ip_address().to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
863 276 : item->set_member("ip", value);
864 92 : }
865 :
866 : {
867 92 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_connected()));
868 276 : item->set_member("connected", value);
869 92 : }
870 :
871 : {
872 92 : auto const it(std::find_if(
873 : f_leaders.begin()
874 : , f_leaders.end()
875 179 : , [&c](auto const & l)
876 : {
877 179 : return c.second == l;
878 : }));
879 92 : std::string leader;
880 92 : if(it != f_leaders.end())
881 : {
882 31 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(it - f_leaders.begin())));
883 93 : item->set_member("leader", value);
884 31 : }
885 92 : }
886 :
887 92 : list->set_item(list->get_array().size(), item);
888 92 : }
889 60 : result->set_member("computers", list);
890 20 : }
891 :
892 80 : if(msg.has_parameter(cluck::g_name_cluck_param_mode)
893 78 : && msg.get_parameter(cluck::g_name_cluck_param_mode) == cluck::g_name_cluck_value_debug)
894 : {
895 : // TODO: create a serialization to JSON instead of a specialized string
896 : //
897 2 : as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, serialized_tickets()));
898 6 : result->set_member("tickets", value);
899 2 : }
900 :
901 20 : ed::message reply_message;
902 60 : reply_message.set_command(cluck::g_name_cluck_cmd_cluckd_status);
903 20 : reply_message.reply_to(msg);
904 60 : reply_message.add_parameter(communicatord::g_name_communicatord_param_status, result->to_string());
905 20 : f_messenger->send_message(reply_message);
906 40 : }
907 :
908 :
909 :
910 : /** \brief Generate the output for "cluck-status --list"
911 : *
912 : * This function loops over the list of tickets and outputs a string that
913 : * it sends back to the `cluck-status --list` tool for printing to the administrator.
914 : *
915 : * \return A string with the list of all the tickets.
916 : */
917 4 : std::string cluckd::ticket_list() const
918 : {
919 4 : std::stringstream list;
920 11 : for(auto const & obj_ticket : f_tickets)
921 : {
922 18 : for(auto const & key_ticket : obj_ticket.second)
923 : {
924 : list
925 11 : << "ticket_id: "
926 11 : << key_ticket.second->get_ticket_number()
927 : << " object name: \""
928 11 : << key_ticket.second->get_object_name()
929 : << "\" key: "
930 22 : << key_ticket.second->get_entering_key();
931 :
932 11 : cluck::timeout_t const lock_timeout(key_ticket.second->get_lock_timeout_date());
933 11 : if(lock_timeout != cluck::timeout_t())
934 : {
935 : list
936 : << " timeout "
937 3 : << lock_timeout.to_string();
938 : }
939 : else
940 : {
941 8 : cluck::timeout_t const obtention_timeout(key_ticket.second->get_obtention_timeout());
942 : list
943 : << " obtention "
944 8 : << obtention_timeout.to_string();
945 : }
946 :
947 11 : list << '\n';
948 : }
949 : }
950 :
951 8 : return list.str();
952 4 : }
953 :
954 :
955 : /** \brief Check the status of the election.
956 : *
957 : * This function checks whether the leaders were already elected. If so,
958 : * then nothing happens.
959 : *
960 : * Otherwise, it checks the state of the election:
961 : *
962 : * * leaders are not already elected
963 : * * this cluckd received its own IP address
964 : * * we do not yet know how many neighbors there are in our cluster
965 : * * the cluster as a sufficient (quorum) number of computers to support
966 : * the lock mechanism properly (2 of the 3 leaders or a cluster quorum
967 : * when the cluster has 4 or more computers)
968 : * * we have the lowest IP address (only the cluck daemon with the lowest
969 : * IP can decide which computers are the leaders
970 : * * not too many of the cluckd are opt out of leadership
971 : */
972 81 : void cluckd::election_status()
973 : {
974 : // we already have election results?
975 : //
976 81 : if(!f_leaders.empty())
977 : {
978 : // the results may have been "tempered" with (i.e. one of
979 : // the leaders was lost)
980 : //
981 19 : if(f_leaders.size() == 3
982 19 : || (f_neighbors_count < 3 && f_leaders.size() == f_neighbors_count))
983 : {
984 : // status is fine
985 : //
986 9 : return;
987 : }
988 : }
989 :
990 : // we do not yet know our IP address, we cannot support an election just yet
991 : //
992 72 : if(f_my_ip_address.is_default())
993 : {
994 : // this should not be possible since we expect to receive the
995 : // REGISTER's reply before any other message
996 : //
997 : return; // LCOV_EXCL_LINE
998 : }
999 :
1000 : // neighbors count is 0 until we receive a very first CLUSTER_UP
1001 : // (note that it does not go back to zero on CLUSTER_DOWN, however,
1002 : // the quorum as checked in the next if() is never going to be
1003 : // reached if the cluster is down)
1004 : //
1005 72 : if(f_neighbors_count == 0)
1006 : {
1007 0 : return;
1008 : }
1009 :
1010 : // this one probably looks complicated...
1011 : //
1012 : // if our quorum is 1 or 2 then we need a number of computers
1013 : // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
1014 : // status which we compute here)
1015 : //
1016 144 : if(f_neighbors_quorum < 3
1017 72 : && f_computers.size() < f_neighbors_count)
1018 : {
1019 28 : return;
1020 : }
1021 :
1022 : // since the neighbors count & quorum never go back to zero (on a
1023 : // CLUSTER_DOWN) we have to verify that the number of computers is
1024 : // acceptable here
1025 : //
1026 : // Note: further we will not count computers marked disabled, which
1027 : // is done below when sorting by ID, however, that does not
1028 : // prevent the quorum to be attained, even with disabled
1029 : // computers
1030 : //
1031 44 : if(f_computers.size() < f_neighbors_quorum)
1032 : {
1033 12 : return;
1034 : }
1035 :
1036 : // to proceed with an election we must have the smallest IP address
1037 : // (it is not absolutely required, but that way we avoid many
1038 : // consensus problems, in effect we have one "temporary-leader" that ends
1039 : // up telling us who the final three leaders are)
1040 : //
1041 : // TODO: verify that this works properly in a non-complete cluster
1042 : //
1043 118 : for(auto & c : f_computers)
1044 : {
1045 : // Note: the test fails when we compare to ourselves so we do not
1046 : // need any special case
1047 : //
1048 93 : if(c.second->get_ip_address() < f_my_ip_address)
1049 : {
1050 7 : return;
1051 : }
1052 : }
1053 :
1054 : // to select the leaders sort them by identifier and take the first
1055 : // three (i.e. lower priority, random, IP, pid.)
1056 : //
1057 25 : int off(0);
1058 25 : computer::map_t sort_by_id;
1059 104 : for(auto const & c : f_computers)
1060 : {
1061 : // ignore nodes with a priority of 15 (i.e. OFF)
1062 : //
1063 79 : if(c.second->get_priority() != computer::PRIORITY_OFF)
1064 : {
1065 50 : sort_by_id[c.second->get_id()] = c.second;
1066 : }
1067 : else
1068 : {
1069 29 : ++off;
1070 : }
1071 : }
1072 :
1073 : //for(auto const & s : sort_by_id)
1074 : //{
1075 : //SNAP_LOG_WARNING << "--- sort by ID: " << s.first << SNAP_LOG_SEND;
1076 : //}
1077 :
1078 25 : bool too_many_computers_off(false);
1079 25 : if(f_computers.size() <= 3)
1080 : {
1081 18 : if(off != 0
1082 18 : && f_computers.size() >= f_neighbors_count)
1083 : {
1084 18 : SNAP_LOG_FATAL
1085 : << "you cannot have any cluck computer turned OFF when you"
1086 : " have three or less computers total in your cluster."
1087 : " The elections cannot be completed in these"
1088 : " conditions."
1089 : << SNAP_LOG_SEND;
1090 9 : too_many_computers_off = true;
1091 : }
1092 : }
1093 7 : else if(f_computers.size() - off < 3
1094 7 : && f_computers.size() >= f_neighbors_count)
1095 : {
1096 3 : SNAP_LOG_FATAL
1097 1 : << "you have a total of "
1098 1 : << f_computers.size()
1099 1 : << " computers in your cluster. You turned off "
1100 : << off
1101 : << " of them, which means less than three are left"
1102 : " as candidates for leadership which is not enough."
1103 1 : " You can have a maximum of "
1104 1 : << f_computers.size() - 3
1105 : << " that are turned off on this cluster."
1106 : << SNAP_LOG_SEND;
1107 1 : too_many_computers_off = true;
1108 : }
1109 25 : if(too_many_computers_off)
1110 : {
1111 : // only generate the flag once we reach the CLUSTER_COMPLETE status
1112 : // (we cannot be sure that the `off` variable is valid until then)
1113 : //
1114 10 : if(f_computers.size() >= f_neighbors_count)
1115 : {
1116 : // this is something that breaks the entire system so someone
1117 : // needs to fix it and thus it has a really high priority
1118 : //
1119 100 : communicatord::flag::pointer_t flag(COMMUNICATORD_FLAG_UP(
1120 : "cluckd"
1121 : , "election"
1122 : , "instances-off"
1123 : , "the cluck daemon detected that too many of the"
1124 : " daemons have their priority set to OFF;"
1125 : " you must turn some of these back ON."
1126 40 : ));
1127 10 : flag->set_priority(99);
1128 30 : flag->add_tag("settings");
1129 10 : flag->set_manual_down(true);
1130 10 : flag->save();
1131 10 : }
1132 10 : return;
1133 : }
1134 :
1135 : // an election works as soon as we have at least 2 leaders
1136 : // or we reached the total number of computers
1137 : //
1138 15 : if(sort_by_id.size() < 2
1139 15 : && sort_by_id.size() != f_computers.size())
1140 : {
1141 1 : return;
1142 : }
1143 :
1144 : //std::cerr << f_communicator_port << " is conducting an election:\n";
1145 : //for(auto s : sort_by_id)
1146 : //{
1147 : //std::cerr << " " << s.second->get_name() << " " << s.first << "\n";
1148 : //}
1149 :
1150 : // the first three are the new leaders
1151 : //
1152 14 : ed::message lock_leaders_message;
1153 42 : lock_leaders_message.set_command(cluck::g_name_cluck_cmd_lock_leaders);
1154 42 : lock_leaders_message.set_service(communicatord::g_name_communicatord_server_any);
1155 14 : f_leaders.clear();
1156 14 : f_election_date = snapdev::now();
1157 42 : lock_leaders_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1158 14 : auto leader(sort_by_id.begin());
1159 14 : std::size_t const max(std::min(static_cast<computer::map_t::size_type>(3), sort_by_id.size()));
1160 42 : for(std::size_t idx(0); idx < max; ++idx, ++leader)
1161 : {
1162 56 : lock_leaders_message.add_parameter(
1163 56 : cluck::g_name_cluck_param_leader + std::to_string(idx)
1164 28 : , leader->second->get_id());
1165 28 : f_leaders.push_back(leader->second);
1166 : }
1167 14 : f_messenger->send_message(lock_leaders_message);
1168 :
1169 : #if 1
1170 42 : SNAP_LOG_WARNING
1171 14 : << "election status = add leader(s)... "
1172 14 : << f_computers.size()
1173 14 : << " computers and "
1174 14 : << f_leaders.size()
1175 : << " leaders."
1176 : << SNAP_LOG_SEND;
1177 : #endif
1178 :
1179 : // we need to synchronize from this cluckd daemon if it is one of the
1180 : // leaders otherwise we won't get the LOCK_STARTED which would also
1181 : // call that function
1182 : //
1183 14 : synchronize_leaders();
1184 25 : }
1185 :
1186 :
1187 91 : void cluckd::check_lock_status()
1188 : {
1189 91 : bool const lock_status(is_daemon_ready());
1190 91 : if(f_lock_status == lock_status)
1191 : {
1192 72 : return;
1193 : }
1194 19 : f_lock_status = lock_status;
1195 :
1196 19 : ed::message status_message;
1197 57 : status_message.set_command(f_lock_status
1198 : ? cluck::g_name_cluck_cmd_lock_ready
1199 : : cluck::g_name_cluck_cmd_no_lock);
1200 57 : status_message.set_service(communicatord::g_name_communicatord_server_me);
1201 95 : status_message.add_parameter(communicatord::g_name_communicatord_param_cache, communicatord::g_name_communicatord_value_no);
1202 19 : f_messenger->send_message(status_message);
1203 :
1204 19 : if(lock_status
1205 19 : && !f_message_cache.empty())
1206 : {
1207 : // we still have a cache of locks that can now be processed
1208 : //
1209 : // note:
1210 : // although msg_lock() could re-add some of those messages
1211 : // in the f_message_cache vector, it should not since it
1212 : // calls the same is_daemon_ready() function which we know returns
1213 : // true and therefore no cache is required
1214 : //
1215 2 : message_cache::list_t cache;
1216 2 : cache.swap(f_message_cache);
1217 4 : for(auto & mc : cache)
1218 : {
1219 2 : msg_lock(mc.f_message);
1220 : }
1221 2 : }
1222 19 : }
1223 :
1224 :
1225 77 : void cluckd::send_lock_started(ed::message const * msg)
1226 : {
1227 : // tell other cluck daemon instances that are already listening that
1228 : // we are ready; this way we can calculate the number of computers
1229 : // available in our network and use that to calculate the QUORUM
1230 : //
1231 77 : ed::message lock_started_message;
1232 231 : lock_started_message.set_command(cluck::g_name_cluck_cmd_lock_started);
1233 77 : if(msg == nullptr)
1234 : {
1235 72 : lock_started_message.set_service(communicatord::g_name_communicatord_service_public_broadcast);
1236 :
1237 : // unfortunately, the following does NOT work as expected...
1238 : // (i.e. the following ends up sending the message to ourselves only
1239 : // and does not forward to any remote communicators).
1240 : //
1241 : //lock_started_message.set_server(communicatord::g_name_communicatord_server_any);
1242 : //lock_started_message.set_service(cluck::g_name_cluck_service_name);
1243 : }
1244 : else
1245 : {
1246 53 : lock_started_message.reply_to(*msg);
1247 : }
1248 :
1249 : // our info: server name and id
1250 : //
1251 231 : lock_started_message.add_parameter(communicatord::g_name_communicatord_param_server_name, f_server_name);
1252 231 : lock_started_message.add_parameter(cluck::g_name_cluck_param_lock_id, f_my_id);
1253 231 : lock_started_message.add_parameter(cluck::g_name_cluck_param_start_time, f_start_time);
1254 :
1255 : // include the leaders if present
1256 : //
1257 77 : if(!f_leaders.empty())
1258 : {
1259 78 : lock_started_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1260 88 : for(size_t idx(0); idx < f_leaders.size(); ++idx)
1261 : {
1262 124 : lock_started_message.add_parameter(
1263 124 : cluck::g_name_cluck_param_leader + std::to_string(idx)
1264 62 : , f_leaders[idx]->get_id());
1265 : }
1266 : }
1267 :
1268 77 : f_messenger->send_message(lock_started_message);
1269 154 : }
1270 :
1271 :
1272 : /** \brief Called whenever we receive the STOP command or equivalent.
1273 : *
1274 : * This function makes sure the cluck daemon exits as quickly as
1275 : * possible. This means unregistering all the daemon's connections
1276 : * from the communicator.
1277 : *
1278 : * If possible, the function sends an UNREGISTER message to the
1279 : * communicator daemon.
1280 : *
1281 : * \param[in] quitting Set to true if we received a QUITTING message (false
1282 : * usually means we received a STOP message).
1283 : */
1284 49 : void cluckd::stop(bool quitting)
1285 : {
1286 49 : if(f_messenger != nullptr)
1287 : {
1288 25 : f_messenger->unregister_fluid_settings(quitting);
1289 25 : f_communicator->remove_connection(f_messenger);
1290 25 : f_messenger.reset();
1291 : }
1292 :
1293 49 : if(f_communicator != nullptr)
1294 : {
1295 49 : f_communicator->remove_connection(f_interrupt);
1296 49 : f_interrupt.reset();
1297 :
1298 49 : f_communicator->remove_connection(f_timer);
1299 49 : f_timer.reset();
1300 : }
1301 49 : }
1302 :
1303 :
1304 : /** \brief Make sure the very first ticket is marked as LOCKED.
1305 : *
1306 : * This function is called whenever the f_tickets map changes
1307 : * (more specifically, one of its children) to make sure
1308 : * that the first ticket is clearly marked as being locked.
1309 : * Most of the time this happens when we add and when we remove
1310 : * tickets.
1311 : *
1312 : * Note that the function may be called many times even though the
1313 : * first ticket does not actually change. Generally this is fine
1314 : * although each time it sends an ACTIVATE_LOCK message so we want
1315 : * to limit the number of calls to make sure we do not send too
1316 : * many possibly confusing messages.
1317 : *
1318 : * \note
1319 : * We need the ACTIVATE_LOCK and LOCK_ACTIVATED messages to make sure
1320 : * that we only activate the very first lock which we cannot be sure
1321 : * of on our own because all the previous messages are using the
1322 : * QUORUM as expected and thus our table of locks may not be complete
1323 : * at any one time.
1324 : *
1325 : * \param[in] object_name The name of the object which very
1326 : * first ticket may have changed.
1327 : */
1328 227 : void cluckd::activate_first_lock(std::string const & object_name)
1329 : {
1330 227 : auto ticket(find_first_lock(object_name));
1331 :
1332 227 : if(ticket != nullptr)
1333 : {
1334 : // there is what we think is the first ticket
1335 : // that should be actived now; we need to share
1336 : // with the other 2 leaders to make sure of that
1337 : //
1338 217 : ticket->activate_lock();
1339 : }
1340 454 : }
1341 :
1342 :
1343 341 : ticket::pointer_t cluckd::find_first_lock(std::string const & object_name)
1344 : {
1345 341 : ticket::pointer_t first_ticket;
1346 341 : auto const obj_ticket(f_tickets.find(object_name));
1347 :
1348 341 : if(obj_ticket != f_tickets.end())
1349 : {
1350 : // loop through making sure that we activate a ticket only
1351 : // if the obtention date was not already reached; if that
1352 : // date was reached before we had the time to activate the
1353 : // lock, then the client should have abandonned the lock
1354 : // request anyway...
1355 : //
1356 : // (this is already done in the cleanup(), but a couple of
1357 : // other functions may call the activate_first_lock()
1358 : // function!)
1359 : //
1360 20375 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1361 : {
1362 20044 : if(key_ticket->second->timed_out())
1363 : {
1364 : // that ticket timed out, send an UNLOCKING, UNLOCKED,
1365 : // or LOCK_FAILED message and get rid of it
1366 : //
1367 0 : key_ticket->second->lock_failed("timed out while searching for first lock");
1368 0 : if(key_ticket->second->timed_out())
1369 : {
1370 : // still timed out, remove it
1371 : //
1372 0 : key_ticket = obj_ticket->second.erase(key_ticket);
1373 : }
1374 : }
1375 : else
1376 : {
1377 20044 : if(first_ticket == nullptr)
1378 : {
1379 331 : first_ticket = key_ticket->second;
1380 : }
1381 20044 : ++key_ticket;
1382 : }
1383 : }
1384 :
1385 331 : if(obj_ticket->second.empty())
1386 : {
1387 : // it is empty now, get rid of that set of tickets
1388 : //
1389 0 : f_tickets.erase(obj_ticket);
1390 : }
1391 : }
1392 :
1393 682 : return first_ticket;
1394 0 : }
1395 :
1396 :
1397 : /** \brief Synchronize leaders.
1398 : *
1399 : * This function sends various events to the other two leaders in order
1400 : * to get them to synchronize the tickets this cluck daemon currently holds.
1401 : *
1402 : * Only leaders make use of this function.
1403 : *
1404 : * Synchronization is necessary whenever a leader dies and another gets
1405 : * elected as a replacement. The replacement would have no idea of the
1406 : * old tickets. This function makes sure that such doesn't occur.
1407 : *
1408 : * \note
1409 : * Our test checks the validity when ONE leader is lost. If TWO of the
1410 : * leaders are lost at once, the algorithm may not be 100% reliable.
1411 : * Especially, the remaining leader may not have all the necessary
1412 : * information to restore all the tickets as they were expected to be.
1413 : *
1414 : * \warning
1415 : * A ticket that just arrived to a leader and was not yet forwarded to
1416 : * the others with the LOCK_ENTERING message is going to be lost no
1417 : * matter what.
1418 : */
1419 23 : void cluckd::synchronize_leaders()
1420 : {
1421 : // there is nothing to do if we are by ourselves because we cannot
1422 : // gain any type of concensus unless we are expected to be the only
1423 : // one in which case there is no synchronization requirements anyway
1424 : //
1425 23 : if(f_leaders.size() <= 1)
1426 : {
1427 4 : return;
1428 : }
1429 :
1430 : // only leaders can synchronize each others
1431 : // (other cluck daemons do not have any tickets to synchronize)
1432 : //
1433 19 : if(is_leader() == nullptr)
1434 : {
1435 5 : return;
1436 : }
1437 :
1438 : // determine whether we are leader #0 or not, if zero, then we
1439 : // call msg_lock() directly, otherwise we do a f_messenger->send_message()
1440 : //
1441 : // TODO: review the logic here, I do not think that leader0 has anything
1442 : // to do with lock requests and tickets; instead, I think that the
1443 : // sharing that needs to happen is between the old and the new
1444 : // leaders (i.e. when we lose a leader and assign another computer
1445 : // as a new leader to compensate, that new leader needs to get
1446 : // all the info which is what the LOCK_TICKETS message is about)
1447 : //
1448 14 : bool const leader0(f_leaders[0]->get_id() == f_my_id);
1449 :
1450 : // a vector of messages for which we have to call msg_lock()
1451 : //
1452 14 : ed::message::vector_t local_locks;
1453 :
1454 : // if entering a ticket is definitely not locked, although it
1455 : // could be ready (one step away from being locked!) we still
1456 : // restart the whole process with the new leaders if such
1457 : // exist
1458 : //
1459 : // Note: of course we restart the process only if the owner
1460 : // was that one leader that disappeared, not if the
1461 : // ticket is owned by a remaining leader
1462 : //
1463 15 : for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); ++obj_entering)
1464 : {
1465 2 : for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1466 : {
1467 1 : std::string const owner_name(key_entering->second->get_owner());
1468 1 : auto key_leader(std::find_if(
1469 : f_leaders.begin()
1470 : , f_leaders.end()
1471 2 : , [&owner_name](auto const & l)
1472 : {
1473 2 : return l->get_name() == owner_name;
1474 : }));
1475 1 : if(key_leader == f_leaders.end())
1476 : {
1477 : // give new ownership to leader[0]
1478 : //
1479 0 : ed::message lock_message;
1480 0 : lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1481 0 : lock_message.set_server(f_leaders[0]->get_name());
1482 0 : lock_message.set_service(cluck::g_name_cluck_service_name);
1483 0 : lock_message.set_sent_from_server(key_entering->second->get_server_name());
1484 0 : lock_message.set_sent_from_service(key_entering->second->get_service_name());
1485 0 : lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_entering->second->get_object_name());
1486 0 : lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_entering->second->get_tag());
1487 0 : lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_entering->second->get_client_pid());
1488 0 : lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_entering->second->get_obtention_timeout());
1489 0 : lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_entering->second->get_lock_duration());
1490 0 : lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_entering->second->get_unlock_duration());
1491 0 : if(leader0)
1492 : {
1493 : // we are leader #0 so directly call msg_lock()
1494 : //
1495 : // first we remove the entry otherwise we get a duplicate
1496 : // error since we try to re-add the same ticket
1497 : //
1498 0 : key_entering = obj_entering->second.erase(key_entering);
1499 0 : local_locks.push_back(lock_message);
1500 : }
1501 : else
1502 : {
1503 : // we are not leader #0, so send the message to it
1504 : //
1505 0 : ++key_entering;
1506 0 : lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_entering->second->get_serial());
1507 0 : f_messenger->send_message(lock_message);
1508 : }
1509 0 : }
1510 : else
1511 : {
1512 1 : ++key_entering;
1513 : }
1514 1 : }
1515 : }
1516 :
1517 : // a ticket may still be unlocked in which case we want to
1518 : // restart the lock process as if still entering
1519 : //
1520 : // if locked, a ticket is assigned leader0 as its new owner so
1521 : // further work on that ticket works as expected
1522 : //
1523 14 : std::string serialized;
1524 16 : for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); ++obj_ticket)
1525 : {
1526 4 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1527 : {
1528 2 : std::string const owner_name(key_ticket->second->get_owner());
1529 2 : auto key_leader(std::find_if(
1530 : f_leaders.begin()
1531 : , f_leaders.end()
1532 3 : , [&owner_name](auto const & l)
1533 : {
1534 3 : return l->get_name() == owner_name;
1535 : }));
1536 2 : if(key_ticket->second->is_locked())
1537 : {
1538 : // if ticket was locked by the leader that disappeared, we
1539 : // transfer ownership to leader #0
1540 : //
1541 2 : if(key_leader == f_leaders.end())
1542 : {
1543 0 : key_ticket->second->set_owner(f_leaders[0]->get_name());
1544 : }
1545 :
1546 : // and send that ticket to the other leaders to make sure
1547 : // they all agree on its current state
1548 : //
1549 2 : serialized += key_ticket->second->serialize();
1550 2 : serialized += '\n';
1551 :
1552 2 : ++key_ticket;
1553 : }
1554 : else
1555 : {
1556 : // it was not locked yet, restart the LOCK process from
1557 : // the very beginning
1558 : //
1559 0 : if(key_leader == f_leaders.end())
1560 : {
1561 : // give new ownership to leader[0]
1562 : //
1563 0 : ed::message lock_message;
1564 0 : lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1565 0 : lock_message.set_server(f_leaders[0]->get_name());
1566 0 : lock_message.set_service(cluck::g_name_cluck_service_name);
1567 0 : lock_message.set_sent_from_server(key_ticket->second->get_server_name());
1568 0 : lock_message.set_sent_from_service(key_ticket->second->get_service_name());
1569 0 : lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_ticket->second->get_object_name());
1570 0 : lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_ticket->second->get_tag());
1571 0 : lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_ticket->second->get_client_pid());
1572 0 : lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_ticket->second->get_obtention_timeout());
1573 0 : lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_ticket->second->get_lock_duration());
1574 0 : lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_ticket->second->get_unlock_duration());
1575 0 : if(leader0)
1576 : {
1577 : // we are leader #0 so directly call msg_lock()
1578 : //
1579 0 : key_ticket = obj_ticket->second.erase(key_ticket);
1580 0 : local_locks.push_back(lock_message);
1581 : }
1582 : else
1583 : {
1584 : // we are not leader #0, so send the message to it
1585 : //
1586 0 : ++key_ticket;
1587 0 : lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_ticket->second->get_serial());
1588 0 : f_messenger->send_message(lock_message);
1589 : }
1590 0 : }
1591 : else
1592 : {
1593 0 : ++key_ticket;
1594 : }
1595 : }
1596 2 : }
1597 : }
1598 :
1599 : // we send those after the loops above because the msg_lock() is
1600 : // not unlikely to change the f_entering_tickets map and looping
1601 : // through it when another function is going to modify it is not
1602 : // wise
1603 : //
1604 14 : for(auto lm : local_locks)
1605 : {
1606 0 : msg_lock(lm);
1607 0 : }
1608 :
1609 : // send LOCK_TICkETS if there is serialized ticket data
1610 : //
1611 14 : if(!serialized.empty())
1612 : {
1613 2 : ed::message lock_tickets_message;
1614 6 : lock_tickets_message.set_command(cluck::g_name_cluck_cmd_lock_tickets);
1615 6 : lock_tickets_message.set_service(cluck::g_name_cluck_service_name);
1616 6 : lock_tickets_message.add_parameter(cluck::g_name_cluck_param_tickets, serialized);
1617 :
1618 2 : auto const la(get_leader_a());
1619 2 : if(la != nullptr)
1620 : {
1621 2 : lock_tickets_message.set_server(la->get_name());
1622 2 : f_messenger->send_message(lock_tickets_message);
1623 :
1624 2 : auto const lb(get_leader_b());
1625 2 : if(lb != nullptr)
1626 : {
1627 2 : lock_tickets_message.set_server(lb->get_name());
1628 2 : f_messenger->send_message(lock_tickets_message);
1629 : }
1630 2 : }
1631 2 : }
1632 14 : }
1633 :
1634 :
1635 : /** \brief Forward a user message to a leader.
1636 : *
1637 : * The user may send a LOCK or an UNLOCK command to the cluck daemon.
1638 : * Those messages need to be forwarded to a leader to work as expected.
1639 : * If we are not a leader, then we need to call this function to
1640 : * forward the message (this daemon acts as a proxy).
1641 : *
1642 : * Note that we do not make a copy of the message because we do not expect
1643 : * it to be used any further after this call so we may as well update that
1644 : * message. It should not be destructive at all anyway.
1645 : *
1646 : * \param[in,out] msg The message being forwarded to a leader.
1647 : */
1648 3 : void cluckd::forward_message_to_leader(ed::message & msg)
1649 : {
1650 : // for safety, we do not call this function if the daemon is not
1651 : // considered ready meaning it has at least one leader
1652 : //
1653 3 : if(f_leaders.empty())
1654 : {
1655 : return; // LCOV_EXCL_LINE
1656 : }
1657 :
1658 : // we are not a leader, we work as a proxy by forwarding the
1659 : // message to a leader, we add our trail so the LOCKED and
1660 : // other messages can be proxied back
1661 : //
1662 : // Note: using the get_sent_from_server() means that we may not
1663 : // even see the returned message, it may be proxied to another
1664 : // server directly or through another route
1665 : //
1666 9 : msg.set_service(cluck::g_name_cluck_service_name);
1667 9 : msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_server_name, msg.get_sent_from_server());
1668 9 : msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_service_name, msg.get_sent_from_service());
1669 :
1670 3 : f_next_leader = (f_next_leader + 1) % f_leaders.size();
1671 3 : msg.set_server(f_leaders[f_next_leader]->get_name());
1672 :
1673 3 : f_messenger->send_message(msg);
1674 : }
1675 :
1676 :
1677 : /** \brief Clean timed out entries if any.
1678 : *
1679 : * This function goes through the list of tickets and entering
1680 : * entries and removes any one of them that timed out. This is
1681 : * important if a process dies and does not properly remove
1682 : * its locks.
1683 : *
1684 : * When the timer gets its process_timeout() function called,
1685 : * it ends up calling this function to clean up any lock that
1686 : * has timed out.
1687 : */
1688 628 : void cluckd::cleanup()
1689 : {
1690 628 : cluck::timeout_t next_timeout(snapdev::timespec_ex::max());
1691 :
1692 : // when we receive LOCK requests before we have leaders elected, they
1693 : // get added to our cache, so do some cache clean up when not empty
1694 : //
1695 628 : cluck::timeout_t const now(snapdev::now());
1696 634 : for(auto c(f_message_cache.begin()); c != f_message_cache.end(); )
1697 : {
1698 6 : if(c->f_timeout <= now)
1699 : {
1700 3 : std::string object_name;
1701 3 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
1702 3 : pid_t client_pid(0);
1703 3 : cluck::timeout_t timeout;
1704 3 : if(!get_parameters(c->f_message, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
1705 : {
1706 : // we should never cache messages that are invalid
1707 : //
1708 : throw cluck::logic_error("cluck::cleanup() of LOCK message failed get_parameters()."); // LCOV_EXCL_LINE
1709 : }
1710 :
1711 9 : SNAP_LOG_WARNING
1712 : << "Lock on \""
1713 : << object_name
1714 3 : << "\" / \""
1715 : << client_pid
1716 3 : << "\" / \""
1717 3 : << tag
1718 : << "\" timed out before leaders were known."
1719 : << SNAP_LOG_SEND;
1720 :
1721 15 : std::string const server_name(c->f_message.has_parameter("lock_proxy_server_name")
1722 1 : ? c->f_message.get_parameter("lock_proxy_server_name")
1723 9 : : c->f_message.get_sent_from_server());
1724 15 : std::string const service_name(c->f_message.has_parameter("lock_proxy_service_name")
1725 1 : ? c->f_message.get_parameter("lock_proxy_service_name")
1726 9 : : c->f_message.get_sent_from_service());
1727 3 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
1728 :
1729 3 : ed::message lock_failed_message;
1730 9 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
1731 3 : lock_failed_message.set_service(service_name);
1732 3 : lock_failed_message.set_server(server_name);
1733 9 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
1734 9 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
1735 9 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
1736 15 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
1737 : #ifndef CLUCKD_OPTIMIZATIONS
1738 15 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "cleanup() found a timed out lock");
1739 : #endif
1740 3 : f_messenger->send_message(lock_failed_message);
1741 :
1742 3 : c = f_message_cache.erase(c);
1743 3 : }
1744 : else
1745 : {
1746 3 : if(c->f_timeout < next_timeout)
1747 : {
1748 2 : next_timeout = c->f_timeout;
1749 : }
1750 3 : ++c;
1751 : }
1752 : }
1753 :
1754 : // remove any f_ticket that timed out
1755 : //
1756 1021 : for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); )
1757 : {
1758 393 : bool try_activate(false);
1759 20522 : for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1760 : {
1761 20129 : bool move_next(true);
1762 20129 : if(key_ticket->second->timed_out())
1763 : {
1764 27 : key_ticket->second->lock_failed("ticket: timed out while cleaning up");
1765 9 : if(key_ticket->second->timed_out())
1766 : {
1767 : // still timed out, remove it
1768 : //
1769 4 : key_ticket = obj_ticket->second.erase(key_ticket);
1770 4 : try_activate = true;
1771 4 : move_next = false;
1772 : }
1773 : }
1774 20129 : if(move_next)
1775 : {
1776 20125 : if(key_ticket->second->get_current_timeout_date() < next_timeout)
1777 : {
1778 356 : next_timeout = key_ticket->second->get_current_timeout_date();
1779 : }
1780 20125 : ++key_ticket;
1781 : }
1782 : }
1783 :
1784 393 : if(obj_ticket->second.empty())
1785 : {
1786 4 : obj_ticket = f_tickets.erase(obj_ticket);
1787 : }
1788 : else
1789 : {
1790 389 : if(try_activate)
1791 : {
1792 : // something was erased, a new ticket may be first
1793 : //
1794 0 : activate_first_lock(obj_ticket->first);
1795 : }
1796 :
1797 389 : ++obj_ticket;
1798 : }
1799 : }
1800 :
1801 : // remove any f_entering_tickets that timed out
1802 : //
1803 1055 : for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); )
1804 : {
1805 16525 : for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1806 : {
1807 16098 : if(key_entering->second->timed_out())
1808 : {
1809 3 : key_entering->second->lock_failed("entering ticket: timed out while cleanup");
1810 1 : if(key_entering->second->timed_out())
1811 : {
1812 : // still timed out, remove it
1813 : //
1814 1 : key_entering = obj_entering->second.erase(key_entering);
1815 : }
1816 : }
1817 : else
1818 : {
1819 16097 : if(key_entering->second->get_current_timeout_date() < next_timeout)
1820 : {
1821 277 : next_timeout = key_entering->second->get_current_timeout_date();
1822 : }
1823 16097 : ++key_entering;
1824 : }
1825 : }
1826 :
1827 427 : if(obj_entering->second.empty())
1828 : {
1829 1 : obj_entering = f_entering_tickets.erase(obj_entering);
1830 : }
1831 : else
1832 : {
1833 426 : ++obj_entering;
1834 : }
1835 : }
1836 :
1837 : // got a new timeout?
1838 : //
1839 628 : if(next_timeout != snapdev::timespec_ex::max())
1840 : {
1841 : // we add one second to avoid looping like crazy
1842 : // if we timeout just around the "wrong" time
1843 : //
1844 596 : f_timer->set_timeout_date(next_timeout + cluck::timeout_t(1, 0));
1845 : }
1846 : else
1847 : {
1848 32 : f_timer->set_timeout_date(-1);
1849 : }
1850 628 : }
1851 :
1852 :
1853 : /** \brief Determine the last ticket defined in this cluck daemon.
1854 : *
1855 : * This function loops through the existing tickets and returns the
1856 : * largest ticket number it finds.
1857 : *
1858 : * Note that the number returned is the last ticket. At some point,
1859 : * the caller needs to add one to this number before assigning the
1860 : * result to a new ticket.
1861 : *
1862 : * If no ticket were defined for \p object_name or we are dealing with
1863 : * that object very first ticket, then the function returns NO_TICKET
1864 : * (which is 0).
1865 : *
1866 : * \param[in] object_name The name of the object for which the last ticket
1867 : * number is being sought.
1868 : *
1869 : * \return The last ticket number or NO_TICKET.
1870 : */
1871 118 : ticket::ticket_id_t cluckd::get_last_ticket(std::string const & object_name)
1872 : {
1873 118 : ticket::ticket_id_t last_ticket(ticket::NO_TICKET);
1874 :
1875 : // Note: There is no need to check the f_entering_tickets list
1876 : // since that one does not yet have any ticket number assigned
1877 : // and thus the maximum there would return 0 every time
1878 : //
1879 118 : auto obj_ticket(f_tickets.find(object_name));
1880 118 : if(obj_ticket != f_tickets.end())
1881 : {
1882 : // note:
1883 : // the std::max_element() algorithm would require many more
1884 : // get_ticket_number() when our loop uses one per ticket
1885 : //
1886 2 : for(auto key_ticket : obj_ticket->second)
1887 : {
1888 1 : ticket::ticket_id_t const ticket_number(key_ticket.second->get_ticket_number());
1889 1 : if(ticket_number > last_ticket)
1890 : {
1891 1 : last_ticket = ticket_number;
1892 : }
1893 1 : }
1894 : }
1895 :
1896 118 : return last_ticket;
1897 : }
1898 :
1899 :
1900 : /** \brief Set the ticket.
1901 : *
1902 : * Once a ticket was assigned a valid identifier (see get_last_ticket())
1903 : * it can be saved as a ticket. This function does that. Now this is
1904 : * an official ticket.
1905 : *
1906 : * \param[in] object_name The name of the object being locked.
1907 : * \param[in] key The ticket key (3 segments).
1908 : * \param[in] ticket The ticket object being added.
1909 : */
1910 117 : void cluckd::set_ticket(
1911 : std::string const & object_name
1912 : , std::string const & key
1913 : , ticket::pointer_t ticket)
1914 : {
1915 117 : f_tickets[object_name][key] = ticket;
1916 117 : }
1917 :
1918 :
1919 : /** \brief Get a reference to the list of entering tickets.
1920 : *
1921 : * This function returns a constant reference to the list of entering
1922 : * tickets. This is used by the ticket::add_ticket() function
1923 : * in order to know once all entering tickets are done so the algorithm
1924 : * can move forward.
1925 : *
1926 : * \param[in] object_name The name of the object being locked.
1927 : *
1928 : * \return A constant copy of the list of entering tickets.
1929 : */
1930 3 : ticket::key_map_t const cluckd::get_entering_tickets(std::string const & object_name)
1931 : {
1932 3 : auto const it(f_entering_tickets.find(object_name));
1933 3 : if(it == f_entering_tickets.end())
1934 : {
1935 : // LCOV_EXCL_START
1936 : //
1937 : // I could not get this covered and I do not think it can happen,
1938 : // also the caller function does not verify that the returned map
1939 : // is valid so it is safer this way
1940 : //
1941 : throw std::logic_error(
1942 : "could not find entering ticket with object name: \""
1943 : + object_name
1944 : + "\".");
1945 : // LCOV_EXCL_STOP
1946 : }
1947 :
1948 6 : return it->second;
1949 : }
1950 :
1951 :
1952 : /** \brief Used to simulate a LOCK_EXITING message.
1953 : *
1954 : * This function is called to simulate sending a LOCK_EXITING to the
1955 : * cluckd object from the ticket object.
1956 : *
1957 : * \param[in] msg The LOCK_EXITING message with proper object name, tag,
1958 : * and key.
1959 : */
1960 114 : void cluckd::lock_exiting(ed::message & msg)
1961 : {
1962 114 : msg_lock_exiting(msg);
1963 114 : }
1964 :
1965 :
1966 :
1967 :
1968 :
1969 :
1970 2 : std::string cluckd::serialized_tickets()
1971 : {
1972 2 : std::stringstream result;
1973 :
1974 4 : for(auto const & obj_ticket : f_tickets)
1975 : {
1976 4 : for(auto const & key_ticket : obj_ticket.second)
1977 : {
1978 : result
1979 4 : << key_ticket.second->serialize()
1980 4 : << '\n';
1981 : }
1982 : }
1983 :
1984 4 : return result.str();
1985 2 : }
1986 :
1987 :
1988 : /** \brief Try to get a set of parameters.
1989 : *
1990 : * This function attempts to get the specified set of parameters from the
1991 : * specified message.
1992 : *
1993 : * The function throws if a parameter is missing or invalid (i.e. passed
1994 : * a string when an integer was expected).
1995 : *
1996 : * When defined, the \p client_pid parameter is expected to be a positive
1997 : * integer. Any other number makes the function emit an error and return
1998 : * false.
1999 : *
2000 : * \note
2001 : * The timeout parameter is always viewed as optional. It is set to
2002 : * "now + cluck::CLUCK_LOCK_DEFAULT_TIMEOUT" if undefined in the message.
2003 : * If specified in the message, there is no minimum or maximum
2004 : * (i.e. it may already have timed out).
2005 : *
2006 : * \param[in] msg The message from which we get parameters.
2007 : * \param[out] object_name A pointer to an std::string that receives the object name.
2008 : * \param[out] tag A pointer to a tag_t that receives the tag number.
2009 : * \param[out] client_pid A pointer to a pid_t that receives the client pid.
2010 : * \param[out] timeout A pointer to an cluck::timeout_t that receives the timeout date.
2011 : * \param[out] key A pointer to an std::string that receives the key parameter.
2012 : * \param[out] source A pointer to an std::string that receives the source parameter.
2013 : *
2014 : * \return true if all specified parameters could be retrieved as expected,
2015 : * false if parameters are either missing or invalid.
2016 : */
2017 1005 : bool cluckd::get_parameters(
2018 : ed::message const & msg
2019 : , std::string * object_name
2020 : , ed::dispatcher_match::tag_t * tag
2021 : , pid_t * client_pid
2022 : , cluck::timeout_t * timeout
2023 : , std::string * key
2024 : , std::string * source)
2025 : {
2026 : // get the "object name" (what we are locking)
2027 : // in Snap, the object name is often a URI plus the action we are performing
2028 : //
2029 1005 : if(object_name != nullptr)
2030 : {
2031 3015 : *object_name = msg.get_parameter(cluck::g_name_cluck_param_object_name);
2032 : }
2033 :
2034 : // the same application may want to hold multiple locks simultaneously
2035 : // and this is made possible by using a tag (a 16 bits number)
2036 : //
2037 1005 : if(tag != nullptr)
2038 : {
2039 3015 : *tag = msg.get_integer_parameter(cluck::g_name_cluck_param_tag);
2040 : }
2041 :
2042 : // get the pid (process identifier) of the process that is
2043 : // requesting the lock; this is important to be able to distinguish
2044 : // multiple processes on the same computer requesting a lock
2045 : //
2046 1005 : if(client_pid != nullptr)
2047 : {
2048 786 : *client_pid = msg.get_integer_parameter(cluck::g_name_cluck_param_pid);
2049 262 : if(*client_pid < 1)
2050 : {
2051 : // invalid pid
2052 : //
2053 8 : SNAP_LOG_NOISY_ERROR
2054 : << "cluckd::get_parameters(): invalid pid specified for a lock ("
2055 4 : << std::to_string(*client_pid)
2056 : << "); it must be a positive decimal number."
2057 : << SNAP_LOG_SEND;
2058 2 : return false;
2059 : }
2060 : }
2061 :
2062 : // get the time limit we will wait up to before we decide we
2063 : // cannot obtain that lock
2064 : //
2065 1003 : if(timeout != nullptr)
2066 : {
2067 489 : if(msg.has_parameter(cluck::g_name_cluck_param_timeout))
2068 : {
2069 : // this timeout may already be out of date in which case
2070 : // the lock immediately fails
2071 : //
2072 159 : *timeout = msg.get_timespec_parameter(cluck::g_name_cluck_param_timeout);
2073 : }
2074 : else
2075 : {
2076 110 : *timeout = snapdev::now() + cluck::get_lock_obtention_timeout();
2077 : }
2078 : }
2079 :
2080 : // get the key of a ticket or entering object
2081 : //
2082 1003 : if(key != nullptr)
2083 : {
2084 2229 : *key = msg.get_parameter(cluck::g_name_cluck_param_key);
2085 : }
2086 :
2087 : // get the source of a ticket (i.e. <server> '/' <service>)
2088 : //
2089 1003 : if(source != nullptr)
2090 : {
2091 24 : *source = msg.get_parameter(cluck::g_name_cluck_param_source);
2092 : }
2093 :
2094 1003 : return true;
2095 : }
2096 :
2097 :
2098 : /** \brief Lock the resource after confirmation that client is alive.
2099 : *
2100 : * This message is expected just after we sent an ALIVE message to
2101 : * the client.
2102 : *
2103 : * Whenever a leader dies, we suspect that the client may have died
2104 : * with it so we send it an ALIVE message to know whether it is worth
2105 : * the trouble of entering that lock.
2106 : *
2107 : * \param[in] msg The ABSOLUTELY message to handle.
2108 : */
2109 4 : void cluckd::msg_absolutely(ed::message & msg)
2110 : {
2111 : // we may receive the ABSOLUTELY message from anywhere so don't expect
2112 : // the "serial" parameter to be defined
2113 : //
2114 12 : if(!msg.has_parameter(ed::g_name_ed_param_serial))
2115 : {
2116 1 : return;
2117 : }
2118 :
2119 9 : std::string const serial(msg.get_parameter(ed::g_name_ed_param_serial));
2120 3 : std::vector<std::string> segments;
2121 9 : snapdev::tokenize_string(segments, serial, "/");
2122 :
2123 3 : if(segments[0] == "cluckd")
2124 : {
2125 : // check serial as defined in msg_lock()
2126 : //
2127 2 : if(segments.size() != 4)
2128 : {
2129 2 : SNAP_LOG_WARNING
2130 : << "ABSOLUTELY reply has an invalid cluckd serial parameters \""
2131 : << serial
2132 : << "\" was expected to have exactly 4 segments."
2133 : << SNAP_LOG_SEND;
2134 :
2135 1 : ed::message invalid;
2136 3 : invalid.set_command(ed::g_name_ed_cmd_invalid);
2137 1 : invalid.reply_to(msg);
2138 3 : invalid.add_parameter(
2139 : ed::g_name_ed_param_command
2140 : , msg.get_command());
2141 4 : invalid.add_parameter(
2142 : ed::g_name_ed_param_message
2143 : , "invalid number of segments in \""
2144 2 : + serial
2145 4 : + "\".");
2146 1 : f_messenger->send_message(invalid);
2147 :
2148 1 : return;
2149 1 : }
2150 :
2151 : // notice how the split() re-split the entering key
2152 : //
2153 1 : std::string const object_name(segments[1]);
2154 1 : std::string const server_name(segments[2]);
2155 1 : std::string const client_pid(segments[3]);
2156 :
2157 1 : auto entering_ticket(f_entering_tickets.find(object_name));
2158 1 : if(entering_ticket != f_entering_tickets.end())
2159 : {
2160 1 : std::string const entering_key(server_name + '/' + client_pid);
2161 1 : auto key_ticket(entering_ticket->second.find(entering_key));
2162 1 : if(key_ticket != entering_ticket->second.end())
2163 : {
2164 : // remove the alive timeout
2165 : //
2166 1 : key_ticket->second->set_alive_timeout(cluck::timeout_t());
2167 :
2168 : // got it! start the bakery algorithm
2169 : //
2170 1 : key_ticket->second->entering();
2171 : }
2172 1 : }
2173 1 : }
2174 :
2175 : // ignore other messages
2176 4 : }
2177 :
2178 :
2179 : /** \brief Acknowledge the ACTIVATE_LOCK with what we think is our first lock.
2180 : *
2181 : * This function replies to an ACTIVATE_LOCK request with what we think is
2182 : * the first lock for the specified object.
2183 : *
2184 : * Right now, we disregard the specified key. There is nothing we can really
2185 : * do with it here.
2186 : *
2187 : * If we do not have a ticket for the specified object (something that could
2188 : * happen if the ticket just timed out) then we still have to reply, only
2189 : * we let the other leader know that we have no clue what he is talking about.
2190 : *
2191 : * \param[in] msg The ACTIVATE_LOCK message.
2192 : */
2193 114 : void cluckd::msg_activate_lock(ed::message & msg)
2194 : {
2195 114 : std::string object_name;
2196 114 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2197 114 : std::string key;
2198 114 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2199 : {
2200 : return; // LCOV_EXCL_LINE
2201 : }
2202 :
2203 342 : std::string first_key("no-key");
2204 :
2205 114 : auto ticket(find_first_lock(object_name));
2206 114 : if(ticket != nullptr)
2207 : {
2208 : // found a lock
2209 : //
2210 114 : first_key = ticket->get_ticket_key();
2211 :
2212 114 : if(key == first_key)
2213 : {
2214 : // we can mark this ticket as activated
2215 : //
2216 114 : ticket->lock_activated();
2217 : }
2218 : }
2219 :
2220 : // always reply, if we could not find the key, then we returned 'no-key'
2221 : // as the key parameter
2222 : //
2223 114 : ed::message lock_activated_message;
2224 342 : lock_activated_message.set_command(cluck::g_name_cluck_cmd_lock_activated);
2225 114 : lock_activated_message.reply_to(msg);
2226 342 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2227 342 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2228 342 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_key, key);
2229 342 : lock_activated_message.add_parameter(cluck::g_name_cluck_param_other_key, first_key);
2230 114 : f_messenger->send_message(lock_activated_message);
2231 :
2232 : // the list of tickets is not unlikely changed so we need to make
2233 : // a call to cleanup to make sure the timer is reset appropriately
2234 : //
2235 114 : cleanup();
2236 114 : }
2237 :
2238 :
2239 : /** \brief Add a ticket from another cluckd.
2240 : *
2241 : * Tickets get duplicated on the cluckd leaders.
2242 : *
2243 : * \note
2244 : * Although we only need a QUORUM number of nodes to receive a copy of
2245 : * the data, the data still get broadcast to all the cluck leaders.
2246 : * After this message arrives any one of the cluck process can
2247 : * handle the unlock if the UNLOCK message gets sent to another process
2248 : * instead of the one which first created the ticket. This is the point
2249 : * of the implementation since we want to be fault tolerant (as in if one
2250 : * of the leaders goes down, the locking mechanism still works).
2251 : *
2252 : * \param[in] msg The ADD_TICKET message being handled.
2253 : */
2254 8 : void cluckd::msg_add_ticket(ed::message & msg)
2255 : {
2256 8 : std::string object_name;
2257 8 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2258 8 : std::string key;
2259 8 : cluck::timeout_t timeout;
2260 8 : if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, nullptr))
2261 : {
2262 : return; // LCOV_EXCL_LINE
2263 : }
2264 :
2265 : // make sure the ticket is unique
2266 : //
2267 8 : auto const obj_ticket(f_tickets.find(object_name));
2268 8 : if(obj_ticket != f_tickets.end())
2269 : {
2270 1 : auto const key_ticket(obj_ticket->second.find(key));
2271 1 : if(key_ticket != obj_ticket->second.end())
2272 : {
2273 3 : SNAP_LOG_ERROR
2274 : << "an existing ticket has the same object name \""
2275 : << object_name
2276 1 : << "\" ("
2277 1 : << tag
2278 : << ") and key \""
2279 : << key
2280 : << "\"."
2281 : << SNAP_LOG_SEND;
2282 :
2283 1 : ed::message lock_failed_message;
2284 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2285 1 : lock_failed_message.reply_to(msg);
2286 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2287 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2288 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2289 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2290 : #ifndef CLUCKD_OPTIMIZATIONS
2291 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an existing ticket name and key");
2292 : #endif
2293 1 : f_messenger->send_message(lock_failed_message);
2294 :
2295 13 : communicatord::flag::pointer_t flag(COMMUNICATORD_FLAG_UP(
2296 : "cluckd"
2297 : , "ticket"
2298 : , "invalid-algorithm"
2299 : , "msg_add_ticket() received a second call to add the"
2300 : " same ticket. This either means there is a bug in our"
2301 : " algorithm or there is a hacker sending us messages"
2302 : " trying to create invalid tickets."
2303 4 : ));
2304 1 : flag->set_priority(25);
2305 3 : flag->add_tag("bug");
2306 1 : flag->set_manual_down(true);
2307 1 : flag->save();
2308 :
2309 1 : return;
2310 1 : }
2311 : }
2312 :
2313 : // the client_pid parameter is part of the key (3rd segment)
2314 : //
2315 7 : std::vector<std::string> segments;
2316 21 : snapdev::tokenize_string(segments, key, "/");
2317 7 : if(segments.size() != 3)
2318 : {
2319 2 : SNAP_LOG_ERROR
2320 : << "Expected exactly 3 segments in \""
2321 : << key
2322 : << "\" to add a ticket."
2323 : << SNAP_LOG_SEND;
2324 :
2325 1 : ed::message lock_failed_message;
2326 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2327 1 : lock_failed_message.reply_to(msg);
2328 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2329 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2330 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2331 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2332 : #ifndef CLUCKD_OPTIMIZATIONS
2333 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (expected three segments)");
2334 : #endif
2335 1 : f_messenger->send_message(lock_failed_message);
2336 :
2337 1 : return;
2338 1 : }
2339 :
2340 : // TODO: we probably want to look at using a function which returns false
2341 : // instead of having to do a try/catch
2342 : //
2343 6 : bool ok(true);
2344 6 : std::uint32_t number(0);
2345 : try
2346 : {
2347 6 : number = snapdev::hex_to_int<std::uint32_t>(segments[0]);
2348 : }
2349 2 : catch(snapdev::hexadecimal_string_exception const &)
2350 : {
2351 1 : ok = false;
2352 1 : }
2353 1 : catch(snapdev::hexadecimal_string_out_of_range const &)
2354 : {
2355 1 : ok = false;
2356 1 : }
2357 6 : if(!ok)
2358 : {
2359 6 : SNAP_LOG_ERROR
2360 : << "somehow ticket number \""
2361 2 : << segments[0]
2362 : << "\" is not a valid hexadecimal number."
2363 : << SNAP_LOG_SEND;
2364 :
2365 2 : ed::message lock_failed_message;
2366 6 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2367 2 : lock_failed_message.reply_to(msg);
2368 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2369 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2370 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2371 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2372 : #ifndef CLUCKD_OPTIMIZATIONS
2373 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (first segment is not a valid hexadecimal number)");
2374 : #endif
2375 2 : f_messenger->send_message(lock_failed_message);
2376 :
2377 2 : return;
2378 2 : }
2379 :
2380 : // by now all the leaders should already have
2381 : // an entering ticket for that one ticket
2382 : //
2383 4 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
2384 4 : if(obj_entering_ticket == f_entering_tickets.end())
2385 : {
2386 2 : SNAP_LOG_ERROR
2387 : << "Expected entering ticket object for \""
2388 : << object_name
2389 : << "\" not found when adding a ticket."
2390 : << SNAP_LOG_SEND;
2391 :
2392 1 : ed::message lock_failed_message;
2393 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2394 1 : lock_failed_message.reply_to(msg);
2395 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2396 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2397 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2398 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2399 : #ifndef CLUCKD_OPTIMIZATIONS
2400 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find an entering ticket with the specified object_name");
2401 : #endif
2402 1 : f_messenger->send_message(lock_failed_message);
2403 :
2404 1 : return;
2405 1 : }
2406 :
2407 : // the key we need to search is not the new ticket key but the
2408 : // entering key, build it from the segments
2409 : //
2410 3 : std::string const entering_key(segments[1] + '/' + segments[2]);
2411 3 : auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2412 3 : if(key_entering_ticket == obj_entering_ticket->second.end())
2413 : {
2414 2 : SNAP_LOG_ERROR
2415 : << "Expected entering ticket key for \""
2416 : << object_name
2417 : << "\" not found when adding a ticket."
2418 : << SNAP_LOG_SEND;
2419 :
2420 1 : ed::message lock_failed_message;
2421 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2422 1 : lock_failed_message.reply_to(msg);
2423 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2424 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2425 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2426 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2427 : #ifndef CLUCKD_OPTIMIZATIONS
2428 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find the very entering ticket with the specified key");
2429 : #endif
2430 1 : f_messenger->send_message(lock_failed_message);
2431 :
2432 1 : return;
2433 1 : }
2434 :
2435 : // make it an official ticket now
2436 : //
2437 : // this should happen on all cluck daemon other than the one that
2438 : // first received the LOCK message
2439 : //
2440 2 : set_ticket(object_name, key, key_entering_ticket->second);
2441 :
2442 : // WARNING: the set_ticket_number() function has the same side
2443 : // effects as the add_ticket() function without the
2444 : // f_messenger->send_message() call
2445 : //
2446 2 : f_tickets[object_name][key]->set_ticket_number(number);
2447 :
2448 2 : ed::message ticket_added_message;
2449 6 : ticket_added_message.set_command(cluck::g_name_cluck_cmd_ticket_added);
2450 2 : ticket_added_message.reply_to(msg);
2451 6 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2452 6 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_key, key);
2453 6 : ticket_added_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2454 2 : f_messenger->send_message(ticket_added_message);
2455 20 : }
2456 :
2457 :
2458 : /** \brief Message telling us whether the clock is stable.
2459 : *
2460 : * When rebooting, the NTP system takes a little time to get started. The
2461 : * communicator daemon checks the state of that NTP and sends messages
2462 : * to other services about the current state.
2463 : *
2464 : * Once the clock is considered stable, the function sets the f_stable_clock
2465 : * flag to true and it remains true forever. This is done that way because
2466 : * at the moment I really don't have the time to consider changing the
2467 : * implementation to support a drop of the clock when it pretty much never
2468 : * happens.
2469 : *
2470 : * \param[in,out] msg The CLOCK_STABLE message.
2471 : */
2472 25 : void cluckd::msg_clock_stable(ed::message & msg)
2473 : {
2474 25 : if(!f_stable_clock)
2475 : {
2476 100 : f_stable_clock = msg.get_parameter(communicatord::g_name_communicatord_param_clock_resolution)
2477 50 : == communicatord::g_name_communicatord_value_verified;
2478 : }
2479 25 : }
2480 :
2481 :
2482 : /** \brief The communicatord lost too many connections.
2483 : *
2484 : * When the cluster is not complete, the CLUSTER_DOWN message gets sent
2485 : * by the communicatord. We need to stop offering locks at that point.
2486 : * Locks that are up are fine, but new locks are not possible.
2487 : *
2488 : * \param[in] msg The CLUSTER_DOWN message.
2489 : */
2490 1 : void cluckd::msg_cluster_down(ed::message & msg)
2491 : {
2492 1 : snapdev::NOT_USED(msg);
2493 :
2494 2 : SNAP_LOG_INFO
2495 : << "cluster is down, canceling existing locks and we have to"
2496 : " refuse any further lock requests for a while."
2497 : << SNAP_LOG_SEND;
2498 :
2499 : // in this case, we cannot safely keep the leaders
2500 : //
2501 1 : f_leaders.clear();
2502 :
2503 : // in case services listen to the NO_LOCK, let them know it's gone
2504 : //
2505 1 : check_lock_status();
2506 :
2507 : // we do not call the lock_gone() because the HANGUP will be sent
2508 : // if required so we do not have to do that twice
2509 1 : }
2510 :
2511 :
2512 : /** \brief Cluster is ready, send the LOCK_STARTED message.
2513 : *
2514 : * Our cluster is finally ready, so we can send the LOCK_STARTED and work
2515 : * on a leader election if still required.
2516 : *
2517 : * \param[in] msg CLUSTER_UP message we are dealing with.
2518 : */
2519 24 : void cluckd::msg_cluster_up(ed::message & msg)
2520 : {
2521 72 : f_neighbors_count = msg.get_integer_parameter("neighbors_count");
2522 24 : f_neighbors_quorum = f_neighbors_count / 2 + 1;
2523 :
2524 24 : computer::priority_t priority(computer::PRIORITY_OFF);
2525 72 : std::string candidate_priority(f_opts.get_string("candidate-priority"));
2526 24 : if(candidate_priority != "off")
2527 : {
2528 69 : priority = f_opts.get_long("candidate-priority"
2529 : , 0
2530 : , computer::PRIORITY_USER_MIN
2531 : , computer::PRIORITY_MAX);
2532 : }
2533 :
2534 : #ifdef _DEBUG
2535 : // the READY message is expected to happen first and setup this parameter
2536 : //
2537 24 : if(f_my_ip_address.is_default())
2538 : {
2539 : throw cluck::logic_error("cluckd::msg_cluster_up(): somehow f_my_ip_address is still the default in msg_cluster_up()."); // LCOV_EXCL_LINE
2540 : }
2541 : #endif
2542 :
2543 : // add ourselves to the list of computers; mark us connected; get our ID
2544 : //
2545 24 : f_computers[f_server_name] = std::make_shared<computer>(f_server_name, priority, f_my_ip_address);
2546 24 : f_computers[f_server_name]->set_start_time(f_start_time);
2547 24 : f_computers[f_server_name]->set_connected(true);
2548 24 : f_my_id = f_computers[f_server_name]->get_id();
2549 :
2550 72 : SNAP_LOG_INFO
2551 24 : << "cluster is up with "
2552 24 : << f_neighbors_count
2553 : << " neighbors, attempt an election"
2554 : " then check for leaders by sending a LOCK_STARTED message."
2555 : << SNAP_LOG_SEND;
2556 :
2557 24 : election_status();
2558 24 : send_lock_started(nullptr);
2559 24 : check_lock_status();
2560 48 : }
2561 :
2562 :
2563 : /** \brief One of the cluckd processes asked for a ticket to be dropped.
2564 : *
2565 : * This function searches for the specified ticket and removes it from
2566 : * this cluckd.
2567 : *
2568 : * If the specified ticket does not exist, nothing happens.
2569 : *
2570 : * \warning
2571 : * The DROP_TICKET event includes either the ticket key (if available)
2572 : * or the entering key (when the ticket key was not yet available).
2573 : * Note that the ticket key should always exists by the time a DROP_TICKET
2574 : * happens, but just in case this allows the dropping of a ticket at any
2575 : * time.
2576 : *
2577 : * \param[in] msg The DROP_TICKET message.
2578 : */
2579 2 : void cluckd::msg_drop_ticket(ed::message & msg)
2580 : {
2581 2 : std::string object_name;
2582 2 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2583 2 : std::string key;
2584 2 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2585 : {
2586 : return; // LCOV_EXCL_LINE
2587 : }
2588 :
2589 2 : std::vector<std::string> segments;
2590 6 : snapdev::tokenize_string(segments, key, "/");
2591 :
2592 : // drop the regular ticket
2593 : //
2594 : // if we have only 2 segments, then there is no corresponding ticket
2595 : // since tickets are added only once we have a ticket_id
2596 : //
2597 2 : std::string entering_key;
2598 2 : if(segments.size() == 3)
2599 : {
2600 1 : auto obj_ticket(f_tickets.find(object_name));
2601 1 : if(obj_ticket != f_tickets.end())
2602 : {
2603 1 : auto key_ticket(obj_ticket->second.find(key));
2604 1 : if(key_ticket != obj_ticket->second.end())
2605 : {
2606 1 : obj_ticket->second.erase(key_ticket);
2607 : }
2608 :
2609 1 : if(obj_ticket->second.empty())
2610 : {
2611 1 : f_tickets.erase(obj_ticket);
2612 : }
2613 :
2614 : // one ticket was erased, another may be first now
2615 : //
2616 1 : activate_first_lock(object_name);
2617 : }
2618 :
2619 : // we received the ticket_id in the message, so
2620 : // we have to regenerate the entering_key without
2621 : // the ticket_id (which is the first element)
2622 : //
2623 1 : entering_key = segments[1] + '/' + segments[2];
2624 : }
2625 : else
2626 : {
2627 : // we received the entering_key in the message, use as is
2628 : //
2629 1 : entering_key = key;
2630 : }
2631 :
2632 : // drop the entering ticket
2633 : //
2634 2 : auto obj_entering_ticket(f_entering_tickets.find(object_name));
2635 2 : if(obj_entering_ticket != f_entering_tickets.end())
2636 : {
2637 1 : auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2638 1 : if(key_entering_ticket != obj_entering_ticket->second.end())
2639 : {
2640 1 : obj_entering_ticket->second.erase(key_entering_ticket);
2641 : }
2642 :
2643 1 : if(obj_entering_ticket->second.empty())
2644 : {
2645 1 : f_entering_tickets.erase(obj_entering_ticket);
2646 : }
2647 : }
2648 :
2649 : // the list of tickets is not unlikely changed so we need to make
2650 : // a call to cleanup to make sure the timer is reset appropriately
2651 : //
2652 2 : cleanup();
2653 2 : }
2654 :
2655 :
2656 : /** \brief Search for the largest ticket.
2657 : *
2658 : * This function searches the list of tickets for the largest one
2659 : * and returns that number.
2660 : *
2661 : * Note that this is specific to a given lock as defined by the object
2662 : * name of that lock. It is not the largest of all tickets across the
2663 : * entire set.
2664 : *
2665 : * The function replies with the MAX_TICKET message.
2666 : *
2667 : * \param[in] msg The message just received.
2668 : */
2669 2 : void cluckd::msg_get_max_ticket(ed::message & msg)
2670 : {
2671 2 : std::string object_name;
2672 2 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2673 2 : std::string key;
2674 2 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2675 : {
2676 : return; // LCOV_EXCL_LINE
2677 : }
2678 :
2679 : // remove any f_tickets that timed out by now because these should
2680 : // not be taken in account in the max. computation
2681 : //
2682 2 : cleanup();
2683 :
2684 2 : ticket::ticket_id_t const last_ticket(get_last_ticket(object_name));
2685 :
2686 2 : ed::message reply;
2687 6 : reply.set_command(cluck::g_name_cluck_cmd_max_ticket);
2688 2 : reply.reply_to(msg);
2689 6 : reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2690 6 : reply.add_parameter(cluck::g_name_cluck_param_key, key);
2691 6 : reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
2692 6 : reply.add_parameter(cluck::g_name_cluck_param_ticket_id, last_ticket);
2693 2 : f_messenger->send_message(reply);
2694 2 : }
2695 :
2696 :
2697 : /** \brief Reply to the LIST_TICKETS message with the TICKET_LIST.
2698 : *
2699 : * This function gets called whenever the command line tool (`cluckd`)
2700 : * is run with the `--list` command line option. It generates a list of
2701 : * tickets and sends that back to the tool as a TICKET_LIST message.
2702 : *
2703 : * \param[in] msg The LIST_TICKETS message.
2704 : */
2705 4 : void cluckd::msg_list_tickets(ed::message & msg)
2706 : {
2707 4 : ed::message list_message;
2708 12 : list_message.set_command(cluck::g_name_cluck_cmd_ticket_list);
2709 4 : list_message.reply_to(msg);
2710 12 : list_message.add_parameter(cluck::g_name_cluck_param_list, ticket_list());
2711 4 : f_messenger->send_message(list_message);
2712 8 : }
2713 :
2714 :
2715 : /** \brief Lock the named resource.
2716 : *
2717 : * This function locks the specified resource \p object_name. It returns
2718 : * when the resource is locked or when the lock timeout is reached.
2719 : *
2720 : * See the ticket class for more details about the locking
2721 : * mechanisms (algorithm and MSC implementation).
2722 : *
2723 : * Note that if lock() is called with an empty string then the function
2724 : * unlocks the lock and returns immediately with false. This is equivalent
2725 : * to calling unlock().
2726 : *
2727 : * \note
2728 : * The function reloads all the parameters (outside of the table) because
2729 : * we need to support a certain amount of dynamism. For example, an
2730 : * administrator may want to add a new host on the system. In that case,
2731 : * the list of host changes and it has to be detected here.
2732 : *
2733 : * \attention
2734 : * The function accepts a "serial" parameter in the message. This is only
2735 : * used internally when a leader is lost and a new one is assigned a lock
2736 : * which would otherwise fail.
2737 : *
2738 : * \warning
2739 : * The object name is left available in the lock table. Do not use any
2740 : * secure/secret name/word, etc. as the object name.
2741 : *
2742 : * \bug
2743 : * At this point there is no proper protection to recover from errors
2744 : * that would happen while working on locking this entry. This means
2745 : * failures may result in a lock that never ends.
2746 : *
2747 : * \param[in] msg The lock message.
2748 : *
2749 : * \sa unlock()
2750 : */
2751 145 : void cluckd::msg_lock(ed::message & msg)
2752 : {
2753 145 : std::string object_name;
2754 145 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2755 145 : pid_t client_pid(0);
2756 145 : cluck::timeout_t timeout;
2757 145 : if(!get_parameters(msg, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
2758 : {
2759 1 : return;
2760 : }
2761 :
2762 : // do some cleanup as well
2763 : //
2764 144 : cleanup();
2765 :
2766 : // if we are a leader, create an entering key
2767 : //
2768 720 : std::string const server_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2769 144 : ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2770 290 : : msg.get_sent_from_server());
2771 :
2772 720 : std::string const service_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2773 144 : ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2774 290 : : msg.get_sent_from_service());
2775 :
2776 144 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
2777 :
2778 144 : if(timeout <= snapdev::now())
2779 : {
2780 6 : SNAP_LOG_WARNING
2781 : << "lock on \""
2782 : << object_name
2783 2 : << "\" ("
2784 2 : << tag
2785 2 : << ")/ \""
2786 : << client_pid
2787 : << "\" timed out before we could start the locking process."
2788 : << SNAP_LOG_SEND;
2789 :
2790 2 : ed::message lock_failed_message;
2791 6 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2792 2 : lock_failed_message.reply_to(msg);
2793 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2794 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2795 6 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2796 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
2797 : #ifndef CLUCKD_OPTIMIZATIONS
2798 10 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK timeout date is already in the past");
2799 : #endif
2800 2 : f_messenger->send_message(lock_failed_message);
2801 :
2802 2 : return;
2803 2 : }
2804 :
2805 426 : cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
2806 142 : if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
2807 : {
2808 : // duration too small
2809 : //
2810 2 : SNAP_LOG_ERROR
2811 : << duration
2812 : << " is an invalid duration, the minimum accepted is "
2813 : << cluck::CLUCK_MINIMUM_TIMEOUT
2814 : << '.'
2815 : << SNAP_LOG_SEND;
2816 :
2817 1 : ed::message lock_failed_message;
2818 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2819 1 : lock_failed_message.reply_to(msg);
2820 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2821 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2822 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2823 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2824 : #ifndef CLUCKD_OPTIMIZATIONS
2825 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with a duration that is too small");
2826 : #endif
2827 1 : f_messenger->send_message(lock_failed_message);
2828 :
2829 1 : return;
2830 1 : }
2831 :
2832 141 : cluck::timeout_t unlock_duration(cluck::CLUCK_DEFAULT_TIMEOUT);
2833 423 : if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
2834 : {
2835 15 : unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
2836 5 : if(unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
2837 : {
2838 : // invalid duration, minimum is cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
2839 : //
2840 2 : SNAP_LOG_ERROR
2841 : << unlock_duration
2842 : << " is an invalid unlock duration, the minimum accepted is "
2843 : << cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
2844 : << '.'
2845 : << SNAP_LOG_SEND;
2846 :
2847 1 : ed::message lock_failed_message;
2848 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2849 1 : lock_failed_message.reply_to(msg);
2850 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2851 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2852 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2853 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2854 : #ifndef CLUCKD_OPTIMIZATIONS
2855 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with an unlock duration that is too small");
2856 : #endif
2857 1 : f_messenger->send_message(lock_failed_message);
2858 :
2859 1 : return;
2860 1 : }
2861 : }
2862 :
2863 140 : if(!is_daemon_ready())
2864 : {
2865 18 : SNAP_LOG_TRACE
2866 : << "caching LOCK message for \""
2867 : << object_name
2868 6 : << "\" ("
2869 6 : << tag
2870 : << ") as the cluck system is not yet considered ready."
2871 : << SNAP_LOG_SEND;
2872 :
2873 6 : f_message_cache.emplace_back(timeout, msg);
2874 :
2875 : // make sure the cache gets cleaned up if the message times out
2876 : //
2877 6 : std::int64_t const timeout_date(f_timer->get_timeout_date());
2878 8 : if(timeout_date == -1
2879 8 : || cluck::timeout_t(timeout_date / 1'000'000, timeout_date % 1'000'000) > timeout)
2880 : {
2881 6 : f_timer->set_timeout_date(timeout);
2882 : }
2883 6 : return;
2884 : }
2885 :
2886 134 : if(is_leader() == nullptr)
2887 : {
2888 : // we are not a leader, we need to forward the message to one
2889 : // of the leaders instead
2890 : //
2891 2 : forward_message_to_leader(msg);
2892 2 : return;
2893 : }
2894 :
2895 : // make sure this is a new ticket
2896 : //
2897 132 : auto entering_ticket(f_entering_tickets.find(object_name));
2898 132 : if(entering_ticket != f_entering_tickets.end())
2899 : {
2900 114 : auto key_ticket(entering_ticket->second.find(entering_key));
2901 114 : if(key_ticket != entering_ticket->second.end())
2902 : {
2903 : // if this is a re-LOCK, then it may be a legitimate duplicate
2904 : // in which case we do not want to generate a LOCK_FAILED error
2905 : //
2906 6 : if(msg.has_parameter(cluck::g_name_cluck_param_serial))
2907 : {
2908 3 : ticket::serial_t const serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
2909 1 : if(key_ticket->second->get_serial() == serial)
2910 : {
2911 : // legitimate double request from leaders
2912 : // (this happens when a leader dies and we have to restart
2913 : // a lock negotiation)
2914 : //
2915 1 : return;
2916 : }
2917 : }
2918 :
2919 : // the object already exists... do not allow duplicates
2920 : //
2921 3 : SNAP_LOG_ERROR
2922 : << "an entering ticket has the same object name \""
2923 : << object_name
2924 1 : << "\" ("
2925 1 : << tag
2926 : << ") and entering key \""
2927 : << entering_key
2928 : << "\"."
2929 : << SNAP_LOG_SEND;
2930 :
2931 1 : ed::message lock_failed_message;
2932 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2933 1 : lock_failed_message.reply_to(msg);
2934 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2935 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2936 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2937 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2938 : #ifndef CLUCKD_OPTIMIZATIONS
2939 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same entering ticket object_name and entering_key");
2940 : #endif
2941 1 : f_messenger->send_message(lock_failed_message);
2942 :
2943 1 : return;
2944 1 : }
2945 112 : if(entering_ticket->second.size() >= cluck::CLUCK_MAXIMUM_ENTERING_LOCKS)
2946 : {
2947 : // this is a failure in the algorithm (unfortunately), if you
2948 : // send LOCK commands without much pause, the number of entering
2949 : // ticket can grow forever; the following is a way to avoid
2950 : // that situation by preventing such inconsiderate growth.
2951 : //
2952 20 : SNAP_LOG_ERROR
2953 : << "too many entering tickets for object name \""
2954 : << object_name
2955 : << "\"."
2956 : << SNAP_LOG_SEND;
2957 :
2958 10 : ed::message lock_failed_message;
2959 30 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2960 10 : lock_failed_message.reply_to(msg);
2961 30 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2962 30 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2963 30 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2964 50 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_overflow);
2965 : #ifndef CLUCKD_OPTIMIZATIONS
2966 50 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called too quickly that the number of entering tickets overflowed");
2967 : #endif
2968 10 : f_messenger->send_message(lock_failed_message);
2969 :
2970 10 : return;
2971 10 : }
2972 : }
2973 :
2974 : // make sure there is not a ticket with the same name already defined
2975 : //
2976 : // (this is is really important so we can actually properly UNLOCK an
2977 : // existing lock since we use the same search and if two entries were
2978 : // to be the same we could not know which to unlock; there are a few
2979 : // other places where such a search is used actually...)
2980 : //
2981 120 : auto obj_ticket(f_tickets.find(object_name));
2982 120 : if(obj_ticket != f_tickets.end())
2983 : {
2984 6 : auto key_ticket(std::find_if(
2985 2 : obj_ticket->second.begin()
2986 2 : , obj_ticket->second.end()
2987 2 : , [&entering_key](auto const & t)
2988 : {
2989 2 : return t.second->get_entering_key() == entering_key;
2990 : }));
2991 2 : if(key_ticket != obj_ticket->second.end())
2992 : {
2993 : // there is already a ticket with this object name/entering key
2994 : //
2995 3 : SNAP_LOG_ERROR
2996 : << "a ticket has the same object name \""
2997 : << object_name
2998 1 : << "\" ("
2999 1 : << tag
3000 : << ") and entering key \""
3001 : << entering_key
3002 : << "\"."
3003 : << SNAP_LOG_SEND;
3004 :
3005 1 : ed::message lock_failed_message;
3006 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3007 1 : lock_failed_message.reply_to(msg);
3008 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3009 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3010 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
3011 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
3012 : #ifndef CLUCKD_OPTIMIZATIONS
3013 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same ticket object_name and entering_key");
3014 : #endif
3015 1 : f_messenger->send_message(lock_failed_message);
3016 :
3017 1 : return;
3018 1 : }
3019 : }
3020 :
3021 119 : ticket::pointer_t ticket(std::make_shared<ticket>(
3022 119 : this
3023 119 : , f_messenger
3024 : , object_name
3025 : , tag
3026 : , entering_key
3027 : , timeout
3028 : , duration
3029 : , server_name
3030 119 : , service_name));
3031 :
3032 119 : f_entering_tickets[object_name][entering_key] = ticket;
3033 :
3034 : // finish up ticket initialization
3035 : //
3036 119 : ticket->set_unlock_duration(unlock_duration);
3037 :
3038 : // generate a serial number for that ticket
3039 : //
3040 119 : f_ticket_serial = (f_ticket_serial + 1) & 0x00FFFFFF; // 0 is a valid serial number (-1 is not)
3041 119 : if(f_leaders[0]->get_id() != f_my_id)
3042 : {
3043 12 : if(f_leaders.size() >= 2
3044 12 : && f_leaders[1]->get_id() != f_my_id)
3045 : {
3046 0 : f_ticket_serial |= 1 << 24;
3047 : }
3048 12 : else if(f_leaders.size() >= 3
3049 12 : && f_leaders[2]->get_id() != f_my_id)
3050 : {
3051 11 : f_ticket_serial |= 2 << 24;
3052 : }
3053 : }
3054 119 : ticket->set_serial(f_ticket_serial);
3055 :
3056 357 : if(msg.has_parameter(cluck::g_name_cluck_param_serial))
3057 : {
3058 : // if we have a "serial" number in that message, we lost a leader
3059 : // and when that happens we are not unlikely to have lost the
3060 : // client that requested the LOCK, send an ALIVE message to make
3061 : // sure that the client still exists before entering the ticket
3062 : //
3063 : // TODO: we may want to make this 5s a parameter that we can change
3064 : //
3065 1 : ticket->set_alive_timeout(snapdev::now() + cluck::timeout_t(5, 0));
3066 :
3067 1 : ed::message alive_message;
3068 3 : alive_message.set_command(ed::g_name_ed_cmd_alive);
3069 1 : alive_message.set_server(server_name);
3070 1 : alive_message.set_service(service_name);
3071 3 : alive_message.add_parameter(ed::g_name_ed_param_serial, "cluckd/" + object_name + '/' + entering_key);
3072 3 : alive_message.add_parameter(ed::g_name_ed_param_timestamp, snapdev::now());
3073 1 : f_messenger->send_message(alive_message);
3074 1 : }
3075 : else
3076 : {
3077 : // act on the new ticket
3078 : //
3079 118 : ticket->entering();
3080 : }
3081 :
3082 : // the list of tickets changed, make sure we update the timeout timer
3083 : //
3084 119 : cleanup();
3085 220 : }
3086 :
3087 :
3088 : /** \brief Acknowledgement of the lock to activate.
3089 : *
3090 : * This function is an acknowledgement that the lock can now be
3091 : * activated. This is true only if the 'key' and 'other_key'
3092 : * are a match, though.
3093 : *
3094 : * \param[in] msg The LOCK_ACTIVATED message.
3095 : */
3096 17 : void cluckd::msg_lock_activated(ed::message & msg)
3097 : {
3098 17 : std::string object_name;
3099 17 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3100 17 : std::string key;
3101 17 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3102 : {
3103 : return; // LCOV_EXCL_LINE
3104 : }
3105 :
3106 51 : std::string const & other_key(msg.get_parameter(cluck::g_name_cluck_param_other_key));
3107 17 : if(other_key == key)
3108 : {
3109 17 : auto obj_ticket(f_tickets.find(object_name));
3110 17 : if(obj_ticket != f_tickets.end())
3111 : {
3112 17 : auto key_ticket(obj_ticket->second.find(key));
3113 17 : if(key_ticket != obj_ticket->second.end())
3114 : {
3115 : // that key is still here!
3116 : // time to activate
3117 : //
3118 17 : key_ticket->second->lock_activated();
3119 : }
3120 : }
3121 : }
3122 17 : }
3123 :
3124 :
3125 : /** \brief Tell the specified ticket LOCK_ENTERED was received.
3126 : *
3127 : * This function calls the specified ticket entered() function, which
3128 : * processes the LOCK_ENTERED message and sends the GET_MAX_TICKET
3129 : * message to the other leaders.
3130 : *
3131 : * \param[in] msg The LOCK_ENTERED message.
3132 : */
3133 123 : void cluckd::msg_lock_entered(ed::message & msg)
3134 : {
3135 123 : std::string object_name;
3136 123 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3137 123 : std::string key;
3138 123 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3139 : {
3140 : return; // LCOV_EXCL_LINE
3141 : }
3142 :
3143 123 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3144 123 : if(obj_entering_ticket != f_entering_tickets.end())
3145 : {
3146 123 : auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3147 123 : if(key_entering_ticket != obj_entering_ticket->second.end())
3148 : {
3149 123 : key_entering_ticket->second->entered();
3150 : }
3151 : }
3152 123 : }
3153 :
3154 :
3155 : /** \brief Create an entering ticket.
3156 : *
3157 : * When the cluck daemon receives a LOCK message, it sends a LOCK_ENTERING
3158 : * to the other leaders so that way of all them have a complete list of all
3159 : * the tickets.
3160 : *
3161 : * This function creates an "entering ticket." This is a "real" ticket,
3162 : * but it is still in an "entering" state.
3163 : *
3164 : * The function sends a LOCK_ENTERED as a reply to the LOCK_ENTERING
3165 : * message.
3166 : *
3167 : * \note
3168 : * Since a cluck daemon may receive this message multiple times, if it
3169 : * finds an existing entering ticket with the same parameters (object
3170 : * name and key), then it does not re-create yet another instance.
3171 : *
3172 : * \param[in] msg The LOCK_ENTERING message.
3173 : */
3174 8 : void cluckd::msg_lock_entering(ed::message & msg)
3175 : {
3176 8 : std::string object_name;
3177 8 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3178 8 : cluck::timeout_t timeout;
3179 8 : std::string key;
3180 8 : std::string source;
3181 8 : if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, &source))
3182 : {
3183 : return; // LCOV_EXCL_LINE
3184 : }
3185 :
3186 : // lock still in the future?
3187 : //
3188 8 : if(timeout <= snapdev::now())
3189 : {
3190 2 : SNAP_LOG_DEBUG
3191 : << "received LOCK_ENTERING for \""
3192 : << object_name
3193 : << "\" that already timed out."
3194 : << SNAP_LOG_SEND;
3195 1 : return;
3196 : }
3197 :
3198 : // do we have enough leaders?
3199 : //
3200 7 : if(!is_daemon_ready())
3201 : {
3202 2 : SNAP_LOG_DEBUG
3203 : << "received LOCK_ENTERING while we are thinking we are not ready."
3204 : << SNAP_LOG_SEND;
3205 1 : return;
3206 : }
3207 :
3208 : // the entering is just a flag (i.e. entering[i] = true)
3209 : // in our case the existance of a ticket is enough to know
3210 : // that we entered
3211 : //
3212 6 : bool allocate(true);
3213 6 : auto const obj_ticket(f_entering_tickets.find(object_name));
3214 6 : if(obj_ticket != f_entering_tickets.end())
3215 : {
3216 1 : auto const key_ticket(obj_ticket->second.find(key));
3217 1 : allocate = key_ticket == obj_ticket->second.end();
3218 : }
3219 6 : if(allocate)
3220 : {
3221 : // ticket does not exist, so create it now
3222 : // (note: ticket should only exist on originator)
3223 : //
3224 15 : cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
3225 5 : if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
3226 : {
3227 : // invalid duration
3228 : //
3229 2 : SNAP_LOG_ERROR
3230 : << duration
3231 : << " is an invalid duration, the minimum accepted is "
3232 : << cluck::CLUCK_MINIMUM_TIMEOUT
3233 : << "."
3234 : << SNAP_LOG_SEND;
3235 :
3236 1 : ed::message lock_failed_message;
3237 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3238 1 : lock_failed_message.reply_to(msg);
3239 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3240 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3241 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3242 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3243 : #ifndef CLUCKD_OPTIMIZATIONS
3244 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with a duration which is too small");
3245 : #endif
3246 1 : f_messenger->send_message(lock_failed_message);
3247 :
3248 1 : return;
3249 1 : }
3250 :
3251 4 : cluck::timeout_t unlock_duration(cluck::CLUCK_DEFAULT_TIMEOUT);
3252 12 : if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
3253 : {
3254 12 : unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
3255 4 : if(unlock_duration != cluck::CLUCK_DEFAULT_TIMEOUT
3256 4 : && unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
3257 : {
3258 : // invalid duration, minimum is 60
3259 : //
3260 2 : SNAP_LOG_ERROR
3261 : << unlock_duration
3262 : << " is an invalid unlock duration, the minimum accepted is "
3263 : << cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
3264 : << "."
3265 : << SNAP_LOG_SEND;
3266 :
3267 1 : ed::message lock_failed_message;
3268 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3269 1 : lock_failed_message.reply_to(msg);
3270 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3271 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3272 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3273 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3274 : #ifndef CLUCKD_OPTIMIZATIONS
3275 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an unlock duration which is too small or set to the default value");
3276 : #endif
3277 1 : f_messenger->send_message(lock_failed_message);
3278 :
3279 1 : return;
3280 1 : }
3281 : }
3282 :
3283 : // we have to know where this message comes from
3284 : //
3285 3 : std::vector<std::string> source_segments;
3286 9 : if(snapdev::tokenize_string(source_segments, source, "/") != 2)
3287 : {
3288 3 : SNAP_LOG_ERROR
3289 : << "Invalid number of parameters in source parameter \""
3290 : << source
3291 1 : << "\" (found "
3292 1 : << source_segments.size()
3293 : << ", expected 2)."
3294 : << SNAP_LOG_SEND;
3295 :
3296 1 : ed::message lock_failed_message;
3297 3 : lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3298 1 : lock_failed_message.reply_to(msg);
3299 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3300 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3301 3 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3302 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3303 : #ifndef CLUCKD_OPTIMIZATIONS
3304 5 : lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an invalid source parameter");
3305 : #endif
3306 1 : f_messenger->send_message(lock_failed_message);
3307 :
3308 1 : return;
3309 1 : }
3310 :
3311 2 : ticket::pointer_t ticket(std::make_shared<ticket>(
3312 2 : this
3313 2 : , f_messenger
3314 : , object_name
3315 : , tag
3316 : , key
3317 : , timeout
3318 : , duration
3319 2 : , source_segments[0]
3320 4 : , source_segments[1]));
3321 :
3322 2 : f_entering_tickets[object_name][key] = ticket;
3323 :
3324 : // finish up on ticket initialization
3325 : //
3326 2 : ticket->set_owner(msg.get_sent_from_server());
3327 2 : ticket->set_unlock_duration(unlock_duration);
3328 6 : ticket->set_serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
3329 3 : }
3330 :
3331 3 : ed::message reply;
3332 9 : reply.set_command(cluck::g_name_cluck_cmd_lock_entered);
3333 3 : reply.reply_to(msg);
3334 9 : reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3335 9 : reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
3336 9 : reply.add_parameter(cluck::g_name_cluck_param_key, key);
3337 3 : f_messenger->send_message(reply);
3338 :
3339 3 : cleanup();
3340 18 : }
3341 :
3342 :
3343 : /** \brief Exit a ticket.
3344 : *
3345 : * This command exits a ticket, which means we got the GET_MAX_TICKET
3346 : * result and thus can be sure that the ticket is properly sorted in
3347 : * the list of tickets.
3348 : *
3349 : * \param[in] msg The LOCK_EXITING message.
3350 : */
3351 117 : void cluckd::msg_lock_exiting(ed::message & msg)
3352 : {
3353 117 : std::string object_name;
3354 117 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3355 117 : std::string key;
3356 117 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3357 : {
3358 : return; // LCOV_EXCL_LINE
3359 : }
3360 :
3361 : // when exiting we just remove the entry with that key
3362 : //
3363 117 : auto const obj_entering(f_entering_tickets.find(object_name));
3364 117 : if(obj_entering != f_entering_tickets.end())
3365 : {
3366 115 : auto const key_entering(obj_entering->second.find(key));
3367 115 : if(key_entering != obj_entering->second.end())
3368 : {
3369 114 : obj_entering->second.erase(key_entering);
3370 :
3371 : // we also want to remove it from the ticket f_entering
3372 : // map if it is there (older ones are there!)
3373 : //
3374 114 : bool run_activation(false);
3375 114 : auto const obj_ticket(f_tickets.find(object_name));
3376 114 : if(obj_ticket != f_tickets.end())
3377 : {
3378 10135 : for(auto const & key_ticket : obj_ticket->second)
3379 : {
3380 10021 : key_ticket.second->remove_entering(key);
3381 10021 : run_activation = true;
3382 : }
3383 : }
3384 114 : if(run_activation)
3385 : {
3386 : // hmm... looking at the code closer, it seems that this is
3387 : // still very much necessary
3388 : //
3389 : // --------------------------------------------------
3390 : //
3391 : // try to activate the lock right now since it could
3392 : // very well be the only ticket and that is exactly
3393 : // when it is viewed as active!
3394 : //
3395 : // Note: this is from my old version, if I am correct
3396 : // it cannot happen anymore because (1) this is
3397 : // not the owner so the activation would not
3398 : // take anyway and (2) the ticket is not going
3399 : // to be marked as being ready at this point
3400 : // (that happens later)
3401 : //
3402 : // XXX we probably should remove this statement
3403 : // and the run_activation flag which would
3404 : // then be useless
3405 : //
3406 114 : activate_first_lock(object_name);
3407 : }
3408 :
3409 114 : if(obj_entering->second.empty())
3410 : {
3411 13 : f_entering_tickets.erase(obj_entering);
3412 : }
3413 : }
3414 : else
3415 : {
3416 2 : SNAP_LOG_WARNING
3417 : << "entering lock \""
3418 : << object_name
3419 : << "\" with key \""
3420 : << key
3421 : << "\" in LOCK_EXITING specified lock not found."
3422 : << SNAP_LOG_SEND;
3423 : }
3424 : }
3425 : else
3426 : {
3427 4 : SNAP_LOG_WARNING
3428 : << "LOCK_EXITING specified lock \""
3429 : << object_name
3430 : << "\" not found."
3431 : << SNAP_LOG_SEND;
3432 : }
3433 :
3434 : // the list of tickets is not unlikely changed so we need to make
3435 : // a call to cleanup to make sure the timer is reset appropriately
3436 : //
3437 117 : cleanup();
3438 117 : }
3439 :
3440 :
3441 : /** \brief Acknowledge a lock failure.
3442 : *
3443 : * This function handles the LOCK_FAILED event that another leader may send
3444 : * to us. In that case we have to stop the process.
3445 : *
3446 : * LOCK_FAILED can happen mainly because of tainted data so we should never
3447 : * get here within a leader. However, with time we may add a few errors
3448 : * which could happen for other reasons than just tainted data.
3449 : *
3450 : * When this function finds an entering ticket or a plain ticket to remove
3451 : * according to the object name and key found in the LOCK_FAILED message,
3452 : * it forwards the LOCK_FAILED message to the server and service found in
3453 : * the ticket.
3454 : *
3455 : * \todo
3456 : * This function destroys a ticket even if it is already considered locked.
3457 : * Make double sure that this is okay with a LOCK_FAILED sent to the client.
3458 : *
3459 : * \warning
3460 : * Although this event should not occur, it is problematic since anyone
3461 : * can send a LOCK_FAILED message here and as a side effect destroy a
3462 : * perfectly valid ticket.
3463 : *
3464 : * \param[in] msg The LOCK_FAILED message.
3465 : */
3466 3 : void cluckd::msg_lock_failed(ed::message & msg)
3467 : {
3468 3 : std::string object_name;
3469 3 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3470 3 : std::string key;
3471 3 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3472 : {
3473 : return; // LCOV_EXCL_LINE
3474 : }
3475 :
3476 9 : std::string errmsg("get LOCK_FAILED: ");
3477 9 : errmsg += msg.get_parameter("error");
3478 :
3479 3 : std::string forward_server;
3480 3 : std::string forward_service;
3481 :
3482 : // remove f_entering_tickets entries if we find matches there
3483 : //
3484 3 : auto obj_entering(f_entering_tickets.find(object_name));
3485 3 : if(obj_entering != f_entering_tickets.end())
3486 : {
3487 3 : auto key_entering(obj_entering->second.find(key));
3488 3 : if(key_entering != obj_entering->second.end())
3489 : {
3490 3 : forward_server = key_entering->second->get_server_name();
3491 3 : forward_service = key_entering->second->get_service_name();
3492 :
3493 3 : obj_entering->second.erase(key_entering);
3494 :
3495 3 : errmsg += " -- happened while entering";
3496 : }
3497 :
3498 3 : if(obj_entering->second.empty())
3499 : {
3500 2 : f_entering_tickets.erase(obj_entering);
3501 : }
3502 : }
3503 :
3504 : // remove any f_tickets entries if we find matches there
3505 : //
3506 3 : auto obj_ticket(f_tickets.find(object_name));
3507 3 : if(obj_ticket != f_tickets.end())
3508 : {
3509 2 : bool try_activate(false);
3510 2 : auto key_ticket(obj_ticket->second.find(key));
3511 2 : if(key_ticket == obj_ticket->second.end())
3512 : {
3513 4 : key_ticket = std::find_if(
3514 2 : obj_ticket->second.begin()
3515 2 : , obj_ticket->second.end()
3516 2 : , [&key](auto const & t)
3517 : {
3518 2 : return t.second->get_entering_key() == key;
3519 : });
3520 : }
3521 2 : if(key_ticket != obj_ticket->second.end())
3522 : {
3523 : // Note: if we already found it in the f_entering_tickets then
3524 : // the server and service names are going to be exactly
3525 : // the same so there is no need to test that here
3526 : //
3527 2 : forward_server = key_ticket->second->get_server_name();
3528 2 : forward_service = key_ticket->second->get_service_name();
3529 :
3530 2 : obj_ticket->second.erase(key_ticket);
3531 2 : try_activate = true;
3532 :
3533 2 : errmsg += " -- happened when locked"; // TBD: are we really always locked in this case?
3534 : }
3535 :
3536 2 : if(obj_ticket->second.empty())
3537 : {
3538 1 : f_tickets.erase(obj_ticket);
3539 : }
3540 1 : else if(try_activate)
3541 : {
3542 : // something was erased, a new ticket may be first
3543 : //
3544 1 : activate_first_lock(obj_ticket->first);
3545 : }
3546 : }
3547 :
3548 3 : if(!forward_server.empty()
3549 3 : && !forward_service.empty())
3550 : {
3551 : // we deleted an entry, forward the message to the service
3552 : // that requested that lock
3553 : //
3554 3 : msg.set_server(forward_server);
3555 3 : msg.set_service(forward_service);
3556 3 : f_messenger->send_message(msg);
3557 :
3558 3 : errmsg += " -> ";
3559 3 : errmsg += forward_server;
3560 3 : errmsg += ":";
3561 3 : errmsg += forward_service;
3562 : }
3563 :
3564 6 : SNAP_LOG_IMPORTANT
3565 : << errmsg
3566 : << '.'
3567 : << SNAP_LOG_SEND;
3568 :
3569 : // the list of tickets is not unlikely changed so we need to make
3570 : // a call to cleanup to make sure the timer is reset appropriately
3571 : //
3572 3 : cleanup();
3573 3 : }
3574 :
3575 :
3576 : /** \brief The list of leaders.
3577 : *
3578 : * This function receives the list of leaders after an election.
3579 : *
3580 : * \param[in] msg The LOCK_LEADERS message.
3581 : */
3582 9 : void cluckd::msg_lock_leaders(ed::message & msg)
3583 : {
3584 27 : f_election_date = msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date);
3585 :
3586 : // save the new leaders in our own list
3587 : //
3588 9 : f_leaders.clear();
3589 36 : for(int idx(0); idx < 3; ++idx)
3590 : {
3591 27 : std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3592 27 : if(msg.has_parameter(param_name))
3593 : {
3594 24 : computer::pointer_t leader(std::make_shared<computer>());
3595 24 : std::string const lockid(msg.get_parameter(param_name));
3596 24 : if(leader->set_id(lockid))
3597 : {
3598 24 : computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3599 24 : if(exists != f_computers.end())
3600 : {
3601 : // it already exists, use our existing instance
3602 : //
3603 23 : f_leaders.push_back(exists->second);
3604 : }
3605 : else
3606 : {
3607 : // we do not yet know of that computer, even though
3608 : // it is a leader! (i.e. we are not yet aware that
3609 : // somehow we are connected to it)
3610 : //
3611 1 : leader->set_connected(false);
3612 1 : f_computers[leader->get_name()] = leader;
3613 :
3614 1 : f_leaders.push_back(leader);
3615 : }
3616 : }
3617 24 : }
3618 27 : }
3619 :
3620 9 : if(!f_leaders.empty())
3621 : {
3622 9 : synchronize_leaders();
3623 :
3624 : // set the round-robin position to a random value
3625 : //
3626 : // note: I know the result is likely skewed, c will be set to
3627 : // a number between 0 and 255 and modulo 3 means that you get
3628 : // one extra zero (255 % 3 == 0); however, there are 85 times
3629 : // 3 in 255 so it probably won't be noticeable.
3630 : //
3631 9 : std::uint8_t c;
3632 9 : RAND_bytes(reinterpret_cast<unsigned char *>(&c), sizeof(c));
3633 9 : f_next_leader = c % f_leaders.size();
3634 : }
3635 :
3636 : // the is_daemon_ready() function depends on having f_leaders defined
3637 : // and when that happens we may need to empty our cache
3638 : //
3639 9 : check_lock_status();
3640 9 : }
3641 :
3642 :
3643 : /** \brief Called whenever a cluck computer is acknowledging itself.
3644 : *
3645 : * This function gets called on a LOCK_STARTED event which is sent whenever
3646 : * a cluck process is initialized on a computer.
3647 : *
3648 : * The message is expected to include the computer name. At this time
3649 : * we cannot handle having more than one instance one the same computer.
3650 : *
3651 : * \param[in] msg The LOCK_STARTED message.
3652 : */
3653 53 : void cluckd::msg_lock_started(ed::message & msg)
3654 : {
3655 : // I do not think we would ever message ourselves, but in case it happens
3656 : // the rest of the function does not support that case
3657 : //
3658 159 : std::string const server_name(msg.get_parameter(communicatord::g_name_communicatord_param_server_name));
3659 53 : if(server_name == f_server_name)
3660 : {
3661 1 : return;
3662 : }
3663 :
3664 156 : cluck::timeout_t const start_time(msg.get_timespec_parameter(cluck::g_name_cluck_param_start_time));
3665 :
3666 52 : computer::map_t::iterator it(f_computers.find(server_name));
3667 52 : bool new_computer(it == f_computers.end());
3668 52 : if(new_computer)
3669 : {
3670 : // create a computer instance so we know it exists
3671 : //
3672 48 : computer::pointer_t computer(std::make_shared<computer>());
3673 :
3674 : // fill the fields from the "lock_id" parameter
3675 : //
3676 144 : if(!computer->set_id(msg.get_parameter(cluck::g_name_cluck_param_lock_id)))
3677 : {
3678 : // this is not a valid identifier, ignore altogether
3679 : //
3680 1 : return;
3681 : }
3682 47 : computer->set_start_time(start_time);
3683 :
3684 : #ifdef _DEBUG
3685 : // LCOV_EXCL_START
3686 : if(computer->get_name() != server_name)
3687 : {
3688 : throw cluck::logic_error("cluckd::msg_lock_started(): server_name ("
3689 : + server_name
3690 : + ") does not match the new computer name ("
3691 : + computer->get_name()
3692 : + ").");
3693 : }
3694 : // LCOV_EXCL_STOP
3695 : #endif
3696 47 : f_computers[computer->get_name()] = computer;
3697 48 : }
3698 : else
3699 : {
3700 4 : if(!it->second->get_connected())
3701 : {
3702 : // we heard of this computer (because it is/was a leader) but
3703 : // we had not yet received a LOCK_STARTED message from it; so here
3704 : // we consider it a new computer and will reply to the LOCK_STARTED
3705 : //
3706 2 : new_computer = true;
3707 2 : it->second->set_connected(true);
3708 : }
3709 :
3710 4 : if(it->second->get_start_time() != start_time)
3711 : {
3712 : // when the start time changes that means cluckd
3713 : // restarted which can happen without communicatord
3714 : // restarting so here we would not know about the feat
3715 : // without this parameter and in this case it is very
3716 : // much the same as a new computer so send it a
3717 : // LOCK_STARTED message back
3718 : //
3719 4 : new_computer = true;
3720 4 : it->second->set_start_time(start_time);
3721 : }
3722 : }
3723 :
3724 : // keep the newest election results
3725 : //
3726 51 : computer::pointer_t old_leader(is_leader());
3727 153 : if(msg.has_parameter(cluck::g_name_cluck_param_election_date))
3728 : {
3729 18 : snapdev::timespec_ex const election_date(msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date));
3730 6 : if(election_date > f_election_date)
3731 : {
3732 3 : f_election_date = election_date;
3733 3 : f_leaders.clear();
3734 : }
3735 : }
3736 :
3737 51 : if(f_leaders.empty())
3738 : {
3739 164 : for(int idx(0); idx < 3; ++idx)
3740 : {
3741 123 : std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3742 123 : if(msg.has_parameter(param_name))
3743 : {
3744 9 : computer::pointer_t leader(std::make_shared<computer>());
3745 9 : std::string const lockid(msg.get_parameter(param_name));
3746 9 : if(leader->set_id(lockid))
3747 : {
3748 9 : computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3749 9 : if(exists != f_computers.end())
3750 : {
3751 : // it already exists, use our existing instance
3752 : //
3753 8 : f_leaders.push_back(exists->second);
3754 : }
3755 : else
3756 : {
3757 : // we do not yet know of that computer, even though
3758 : // it is a leader! (i.e. we are not yet aware that
3759 : // somehow we are connected to it)
3760 : //
3761 1 : leader->set_connected(false);
3762 1 : f_computers[leader->get_name()] = leader;
3763 :
3764 1 : f_leaders.push_back(leader);
3765 : }
3766 : }
3767 9 : }
3768 123 : }
3769 : }
3770 :
3771 51 : election_status();
3772 :
3773 : // this can have an effect on the lock statuses
3774 : //
3775 51 : check_lock_status();
3776 :
3777 51 : if(new_computer
3778 51 : || old_leader != is_leader())
3779 : {
3780 : // send a reply if that was a new computer
3781 : //
3782 51 : send_lock_started(&msg);
3783 : }
3784 53 : }
3785 :
3786 :
3787 : /** \brief A service asked about the lock status.
3788 : *
3789 : * The lock status is whether the cluck service is ready to receive
3790 : * LOCK messages (LOCK_READY) or is still waiting on a CLUSTER_UP and
3791 : * LOCK_LEADERS to happen (NO_LOCK).
3792 : *
3793 : * Note that LOCK messages are accepted while the lock service is not
3794 : * yet ready, however, those are cached and it is more likely that they
3795 : * timeout before the system is ready to process the request.
3796 : *
3797 : * \param[in] msg The message to reply to.
3798 : */
3799 13 : void cluckd::msg_lock_status(ed::message & msg)
3800 : {
3801 13 : ed::message status_message;
3802 39 : status_message.set_command(is_daemon_ready()
3803 : ? cluck::g_name_cluck_cmd_lock_ready
3804 : : cluck::g_name_cluck_cmd_no_lock);
3805 :
3806 13 : status_message.reply_to(msg);
3807 65 : status_message.add_parameter(communicatord::g_name_communicatord_param_cache, communicatord::g_name_communicatord_value_no);
3808 13 : f_messenger->send_message(status_message);
3809 26 : }
3810 :
3811 :
3812 : /** \brief Another cluckd is sending us its list of tickets.
3813 : *
3814 : * Whenever a cluckd dies, a new one is quickly promoted as a leader
3815 : * and that new leader would have no idea about the existing tickets
3816 : * (locks) so the other two send it a LOCK_TICKETS message.
3817 : *
3818 : * The tickets are defined in the parameter of the same name using
3819 : * the serialization function to transform the objects in a string.
3820 : * Here we can unserialize that string accordingly.
3821 : *
3822 : * First we extract the object name and entering key to see whether
3823 : * we have that ticket already defined. If so, then we unserialize
3824 : * in that existing object. The extraction is additive so we can do
3825 : * it any number of times.
3826 : *
3827 : * \param[in] msg The LOCK_TICKETS message.
3828 : */
3829 2 : void cluckd::msg_lock_tickets(ed::message & msg)
3830 : {
3831 6 : std::string const tickets(msg.get_parameter(cluck::g_name_cluck_param_tickets));
3832 :
3833 : // we have one ticket per line, so we first split per line and then
3834 : // work on lines one at a time
3835 : //
3836 2 : bool added_tickets(false);
3837 2 : std::list<std::string> lines;
3838 6 : snapdev::tokenize_string(lines, tickets, "\n", true);
3839 4 : for(auto const & l : lines)
3840 : {
3841 2 : ticket::pointer_t t;
3842 2 : std::list<std::string> vars;
3843 6 : snapdev::tokenize_string(vars, tickets, "|", true);
3844 2 : auto object_name_value(std::find_if(
3845 : vars.begin()
3846 : , vars.end()
3847 18 : , [](std::string const & vv)
3848 : {
3849 18 : return vv.starts_with("object_name=");
3850 : }));
3851 2 : if(object_name_value != vars.end())
3852 : {
3853 2 : auto entering_key_value(std::find_if(
3854 : vars.begin()
3855 : , vars.end()
3856 6 : , [](std::string const & vv)
3857 : {
3858 6 : return vv.starts_with("entering_key=");
3859 : }));
3860 2 : if(entering_key_value != vars.end())
3861 : {
3862 : // extract the values which start after the '=' sign
3863 : //
3864 2 : std::string const object_name(object_name_value->substr(12));
3865 2 : std::string const entering_key(entering_key_value->substr(13));
3866 :
3867 2 : auto entering_ticket(f_entering_tickets.find(object_name));
3868 2 : if(entering_ticket != f_entering_tickets.end())
3869 : {
3870 0 : auto key_ticket(entering_ticket->second.find(entering_key));
3871 0 : if(key_ticket != entering_ticket->second.end())
3872 : {
3873 0 : t = key_ticket->second;
3874 : }
3875 : }
3876 2 : if(t == nullptr)
3877 : {
3878 2 : auto obj_ticket(f_tickets.find(object_name));
3879 2 : if(obj_ticket != f_tickets.end())
3880 : {
3881 3 : auto key_ticket(std::find_if(
3882 1 : obj_ticket->second.begin()
3883 1 : , obj_ticket->second.end()
3884 1 : , [&entering_key](auto const & o)
3885 : {
3886 1 : return o.second->get_entering_key() == entering_key;
3887 : }));
3888 1 : if(key_ticket != obj_ticket->second.end())
3889 : {
3890 1 : t = key_ticket->second;
3891 : }
3892 : }
3893 : }
3894 :
3895 : // ticket exists? if not create a new one
3896 : //
3897 2 : bool const new_ticket(t == nullptr);
3898 2 : if(new_ticket)
3899 : {
3900 : // create a new ticket, some of the parameters are there just
3901 : // because they are required; they will be replaced by the
3902 : // unserialize call below...
3903 : //
3904 3 : t = std::make_shared<ticket>(
3905 2 : this
3906 1 : , f_messenger
3907 : , object_name
3908 : , ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG
3909 : , entering_key
3910 2 : , cluck::CLUCK_DEFAULT_TIMEOUT + snapdev::now()
3911 : , cluck::CLUCK_DEFAULT_TIMEOUT
3912 1 : , f_server_name
3913 1 : , cluck::g_name_cluck_service_name);
3914 : }
3915 :
3916 2 : t->unserialize(l);
3917 2 : added_tickets = true;
3918 :
3919 : // do a couple of additional sanity tests to
3920 : // make sure that we want to keep new tickets
3921 : //
3922 : // first make sure it is marked as "locked"
3923 : //
3924 : // second check that the owner is a leader that
3925 : // exists (the sender uses a LOCK message for
3926 : // locks that are not yet locked or require
3927 : // a new owner)
3928 : //
3929 2 : if(new_ticket
3930 2 : && t->is_locked())
3931 : {
3932 1 : auto li(std::find_if(
3933 : f_leaders.begin()
3934 : , f_leaders.end()
3935 3 : , [&t](auto const & c)
3936 : {
3937 3 : return t->get_owner() == c->get_name();
3938 : }));
3939 1 : if(li != f_leaders.end())
3940 : {
3941 1 : f_tickets[object_name][t->get_ticket_key()] = t;
3942 : }
3943 : }
3944 2 : }
3945 : }
3946 2 : }
3947 :
3948 : // if we updated some tickets, we need to make sure our timer is setup
3949 : // appropriately
3950 : //
3951 2 : if(added_tickets)
3952 : {
3953 2 : cleanup();
3954 : }
3955 4 : }
3956 :
3957 :
3958 : /** \brief Got the largest ticket from another leader.
3959 : *
3960 : * This function searches the list of entering tickets for the specified
3961 : * object name and key parameters found in the MAX_TICKET message.
3962 : *
3963 : * On the first MAX_TICKET received for an entering ticket, that maximum + 1
3964 : * gets saved in the ticket as its identifier. In other words, we place that
3965 : * ticket at the end of the list.
3966 : *
3967 : * \note
3968 : * Whenever a list of locks goes empty, its maximum ticket number returns to
3969 : * the default: NO_TICKET (0). In the unlikely case where the locks happen
3970 : * back to back and thus the list never becomes empty, the maximum ticket
3971 : * number could end up wrapping around. This is an error we can and it
3972 : * has the side effect of killing the cluck daemon.
3973 : *
3974 : * \param[in] msg The MAX_TICKET message being handled.
3975 : */
3976 124 : void cluckd::msg_max_ticket(ed::message & msg)
3977 : {
3978 124 : std::string object_name;
3979 124 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3980 124 : std::string key;
3981 124 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3982 : {
3983 : return; // LCOV_EXCL_LINE
3984 : }
3985 :
3986 : // the MAX_TICKET is an answer that has to go in a still un-added ticket
3987 : //
3988 124 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3989 124 : if(obj_entering_ticket != f_entering_tickets.end())
3990 : {
3991 124 : auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3992 124 : if(key_entering_ticket != obj_entering_ticket->second.end())
3993 : {
3994 373 : key_entering_ticket->second->max_ticket(msg.get_integer_parameter(cluck::g_name_cluck_param_ticket_id));
3995 : }
3996 : }
3997 125 : }
3998 :
3999 :
4000 : /** \brief Called whenever a remote connection is disconnected.
4001 : *
4002 : * This function is used to know that a remote connection was
4003 : * disconnected.
4004 : *
4005 : * This function handles the HANGUP and DISCONNECTED nessages as required.
4006 : *
4007 : * This allows us to manage the f_computers list of computers running
4008 : * cluckd services. If a cluckd was affected, we verify we still have
4009 : * enough leaders.
4010 : *
4011 : * \note
4012 : * The STATUS message does not directly call this function. Instead, it
4013 : * calls msg_status() which checks that (1) the status is in link with
4014 : * a communicator service and (2) the new service state is "down" and only
4015 : * if so, this function gets called.
4016 : *
4017 : * \param[in] msg The DISCONNECTED, HANGUP, or STATUS message.
4018 : */
4019 22 : void cluckd::msg_server_gone(ed::message & msg)
4020 : {
4021 : // if the server is not defined, ignore that message
4022 : // (already tested by the dispatcher)
4023 : //
4024 66 : if(!msg.has_parameter(communicatord::g_name_communicatord_param_server_name))
4025 : {
4026 : return; // LCOV_EXCL_LINE
4027 : }
4028 :
4029 : // is it us?
4030 : //
4031 66 : std::string const server_name(msg.get_parameter(communicatord::g_name_communicatord_param_server_name));
4032 22 : if(server_name.empty()
4033 22 : || server_name == f_server_name)
4034 : {
4035 : // we never want to remove ourselves
4036 : //
4037 1 : return;
4038 : }
4039 :
4040 : // is "server_name" known?
4041 : //
4042 21 : auto it(f_computers.find(server_name));
4043 21 : if(it == f_computers.end())
4044 : {
4045 : // no computer found, nothing else to do here
4046 : //
4047 1 : return;
4048 : }
4049 20 : computer::pointer_t c(it->second);
4050 :
4051 : // got it, remove it
4052 : //
4053 20 : f_computers.erase(it);
4054 40 : SNAP_LOG_WARNING << "removed \"" << server_name << "\"" << SNAP_LOG_SEND;
4055 :
4056 : // is that computer a leader?
4057 : //
4058 20 : auto li(std::find(
4059 : f_leaders.begin()
4060 : , f_leaders.end()
4061 : , c));
4062 20 : if(li != f_leaders.end())
4063 : {
4064 6 : f_leaders.erase(li);
4065 :
4066 6 : if(f_messenger != nullptr)
4067 : {
4068 : // elect another computer in case the one we just erased was a leader
4069 : //
4070 : // (of course, no elections occur unless we are the computer with the
4071 : // smallest IP address)
4072 : //
4073 6 : election_status();
4074 :
4075 : // if too many leaders were dropped, we may go back to the NO_LOCK status
4076 : //
4077 : // we only send a NO_LOCK if the election could not re-assign another
4078 : // computer to replace the missing leader(s)
4079 : //
4080 6 : check_lock_status();
4081 : }
4082 : }
4083 22 : }
4084 :
4085 :
4086 : /** \brief With the STATUS message we know of new communicatord services.
4087 : *
4088 : * This function captures the STATUS message and if it sees that the
4089 : * name of the service is a remote communicator daemon then it
4090 : * sends a new LOCK_STARTED message to make sure that all cluck daemons
4091 : * are aware of us.
4092 : *
4093 : * \param[in] msg The STATUS message.
4094 : */
4095 39 : void cluckd::msg_status(ed::message & msg)
4096 : {
4097 : // check the service name, it has to be one that means it is a remote
4098 : // connection with another communicator daemon
4099 : //
4100 117 : std::string const service(msg.get_parameter(communicatord::g_name_communicatord_param_service));
4101 :
4102 39 : if(service.starts_with(communicatord::g_name_communicatord_connection_remote_communicator_in) // remote host connected to us
4103 39 : || service.starts_with(communicatord::g_name_communicatord_connection_remote_communicator_out)) // we connected to remote host
4104 : {
4105 : // check what the status is now: "up" or "down"
4106 : //
4107 42 : std::string const status(msg.get_parameter(communicatord::g_name_communicatord_param_status));
4108 14 : if(status == communicatord::g_name_communicatord_value_up)
4109 : {
4110 : // we already broadcast a LOCK_STARTED from CLUSTER_UP
4111 : // and that's enough
4112 : //
4113 : ;
4114 : }
4115 : else
4116 : {
4117 : // host is down, remove from our list of hosts
4118 : //
4119 10 : msg_server_gone(msg);
4120 : }
4121 14 : }
4122 78 : }
4123 :
4124 :
4125 : /** \brief Acknowledgement that the ticket was properly added.
4126 : *
4127 : * This function gets called whenever the ticket was added on another
4128 : * leader.
4129 : *
4130 : * \param[in] msg The TICKET_ADDED message being handled.
4131 : */
4132 122 : void cluckd::msg_ticket_added(ed::message & msg)
4133 : {
4134 122 : std::string object_name;
4135 122 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4136 122 : std::string key;
4137 122 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4138 : {
4139 : return; // LCOV_EXCL_LINE
4140 : }
4141 :
4142 122 : auto const obj_ticket(f_tickets.find(object_name));
4143 122 : if(obj_ticket != f_tickets.end())
4144 : {
4145 121 : auto const key_ticket(obj_ticket->second.find(key));
4146 121 : if(key_ticket != obj_ticket->second.end())
4147 : {
4148 : // this ticket exists on this system
4149 : //
4150 120 : auto const obj_entering_ticket(f_entering_tickets.find(object_name));
4151 120 : if(obj_entering_ticket == f_entering_tickets.end())
4152 : {
4153 : // this happens all the time because the entering ticket
4154 : // gets removed on the first TICKET_ADDED we receive so
4155 : // on the second one we get here...
4156 : //
4157 14 : SNAP_LOG_TRACE
4158 : << "called with object \""
4159 : << object_name
4160 : << "\" not present in f_entering_ticket (key: \""
4161 : << key
4162 : << "\")."
4163 : << SNAP_LOG_SEND;
4164 7 : return;
4165 : }
4166 113 : key_ticket->second->ticket_added(obj_entering_ticket->second);
4167 : }
4168 : else
4169 : {
4170 2 : SNAP_LOG_WARNING
4171 : << "found object \""
4172 : << object_name
4173 : << "\" but could not find a corresponding ticket with key \""
4174 : << key
4175 : << "\"..."
4176 : << SNAP_LOG_SEND;
4177 : }
4178 : }
4179 : else
4180 : {
4181 2 : SNAP_LOG_WARNING
4182 : << "object \""
4183 : << object_name
4184 : << "\" not found."
4185 : << SNAP_LOG_SEND;
4186 : }
4187 129 : }
4188 :
4189 :
4190 : /** \brief Let other leaders know that the ticket is ready.
4191 : *
4192 : * This message is received when the owner of a ticket marks a
4193 : * ticket as ready. This means the ticket is available for locking.
4194 : *
4195 : * \param[in] msg The TICKET_READY message.
4196 : */
4197 103 : void cluckd::msg_ticket_ready(ed::message & msg)
4198 : {
4199 103 : std::string object_name;
4200 103 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4201 103 : std::string key;
4202 103 : if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4203 : {
4204 : return; // LCOV_EXCL_LINE
4205 : }
4206 :
4207 103 : auto obj_ticket(f_tickets.find(object_name));
4208 103 : if(obj_ticket != f_tickets.end())
4209 : {
4210 103 : auto key_ticket(obj_ticket->second.find(key));
4211 103 : if(key_ticket != obj_ticket->second.end())
4212 : {
4213 : // we can mark this ticket as activated
4214 : //
4215 103 : key_ticket->second->set_ready();
4216 : }
4217 : }
4218 103 : }
4219 :
4220 :
4221 : /** \brief Unlock the resource.
4222 : *
4223 : * This function unlocks the resource specified in the call to lock().
4224 : *
4225 : * \param[in] msg The unlock message.
4226 : *
4227 : * \sa msg_lock()
4228 : */
4229 116 : void cluckd::msg_unlock(ed::message & msg)
4230 : {
4231 116 : if(!is_daemon_ready())
4232 : {
4233 2 : SNAP_LOG_ERROR
4234 : << "received an UNLOCK when cluckd is not ready to receive lock related messages."
4235 : << SNAP_LOG_SEND;
4236 1 : return;
4237 : }
4238 :
4239 115 : if(is_leader() == nullptr)
4240 : {
4241 : // we are not a leader, we need to forward to a leader to handle
4242 : // the message properly
4243 : //
4244 1 : forward_message_to_leader(msg);
4245 1 : return;
4246 : }
4247 :
4248 114 : std::string object_name;
4249 114 : ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4250 114 : pid_t client_pid(0);
4251 114 : if(!get_parameters(msg, &object_name, &tag, &client_pid, nullptr, nullptr, nullptr))
4252 : {
4253 1 : return;
4254 : }
4255 :
4256 : // if the ticket still exists, send the UNLOCKED and then erase it
4257 : //
4258 113 : auto obj_ticket(f_tickets.find(object_name));
4259 113 : if(obj_ticket != f_tickets.end())
4260 : {
4261 560 : std::string const server_name(msg.has_parameter("lock_proxy_server_name")
4262 112 : ? msg.get_parameter("lock_proxy_server_name")
4263 224 : : msg.get_sent_from_server());
4264 :
4265 : //std::string const service_name(msg.has_parameter("lock_proxy_service_name")
4266 : // ? msg.get_parameter("lock_proxy_service_name")
4267 : // : msg.get_sent_from_service());
4268 :
4269 112 : std::string const entering_key(server_name + '/' + std::to_string(client_pid));
4270 336 : auto key_ticket(std::find_if(
4271 112 : obj_ticket->second.begin()
4272 112 : , obj_ticket->second.end()
4273 112 : , [&entering_key](auto const & t)
4274 : {
4275 112 : return t.second->get_entering_key() == entering_key;
4276 : }));
4277 112 : if(key_ticket != obj_ticket->second.end())
4278 : {
4279 : // this function will send a DROPTICKET to the other leaders
4280 : // and the UNLOCKED to the source (unless we already sent the
4281 : // UNLOCKED which gets sent at most once.)
4282 : //
4283 111 : key_ticket->second->drop_ticket();
4284 :
4285 111 : obj_ticket->second.erase(key_ticket);
4286 111 : if(obj_ticket->second.empty())
4287 : {
4288 : // we are done with this one!
4289 : //
4290 9 : f_tickets.erase(obj_ticket);
4291 : }
4292 :
4293 : // TBD: the clean up calls this function if it removes a lock
4294 : // that timed out but not otherwise; maybe we could consider
4295 : // doing it slightly differently where we pass a flag to
4296 : // the clean up function to force the call either way
4297 : //
4298 111 : activate_first_lock(object_name);
4299 : }
4300 : else
4301 : {
4302 2 : SNAP_LOG_MAJOR
4303 : << "UNLOCK could not find key \""
4304 : << entering_key
4305 : << "\" in object \""
4306 : << object_name
4307 : << "\"."
4308 : << SNAP_LOG_SEND;
4309 : }
4310 112 : }
4311 : else
4312 : {
4313 2 : SNAP_LOG_WARNING
4314 : << "UNLOCK could not find object \""
4315 : << object_name
4316 : << "\"."
4317 : << SNAP_LOG_SEND;
4318 : }
4319 :
4320 : // reset the timeout with the other locks
4321 : //
4322 113 : cleanup();
4323 114 : }
4324 :
4325 :
4326 :
4327 : } // namespace cluck_daemon
4328 : // vim: ts=4 sw=4 et
|