cluck 1.0.1
The cluster lock service.
cluckd.cpp
Go to the documentation of this file.
1// Copyright (c) 2016-2025 Made to Order Software Corp. All Rights Reserved
2//
3// https://snapwebsites.org/project/cluck
4// contact@m2osw.com
5//
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10//
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15//
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19
20// self
21//
22#include "cluckd.h"
23
24
25
26// cluck
27//
28#include <cluck/exception.h>
29#include <cluck/names.h>
30#include <cluck/version.h>
31
32
33// communicator
34//
35#include <communicator/flags.h>
36#include <communicator/names.h>
37
38
39// as2js
40//
41#include <as2js/json.h>
42
43
44// eventdispatcher
45//
46#include <eventdispatcher/names.h>
47
48
49// snapdev
50//
51#include <snapdev/gethostname.h>
52#include <snapdev/hexadecimal_string.h>
53#include <snapdev/stringize.h>
54#include <snapdev/tokenize_string.h>
55#include <snapdev/to_string_literal.h>
56
57
58// snaplogger
59//
60#include <snaplogger/logger.h>
61#include <snaplogger/options.h>
62#include <snaplogger/severity.h>
63
64
65// advgetopt
66//
67#include <advgetopt/advgetopt.h>
68#include <advgetopt/exception.h>
69
70
71// C++
72//
73#include <algorithm>
74#include <iostream>
75#include <sstream>
76
77
78// openssl
79//
80#include <openssl/rand.h>
81
82
83// last include
84//
85#include <snapdev/poison.h>
86
87
88
136{
137
138
139namespace
140{
141
142
143
144constexpr std::string_view g_default_candidate_priority =
145 snapdev::integer_to_string_literal<computer::PRIORITY_DEFAULT>.data();
146
147
148advgetopt::option const g_options[] =
149{
150 advgetopt::define_option(
151 advgetopt::Name("candidate-priority")
152 , advgetopt::ShortName('p')
153 , advgetopt::Flags(advgetopt::all_flags<
154 advgetopt::GETOPT_FLAG_REQUIRED
155 , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
156 , advgetopt::Help("Define the priority of this candidate (1 to 14) to gain a leader position or \"off\".")
157 , advgetopt::DefaultValue(g_default_candidate_priority.data())
158 ),
159 advgetopt::define_option(
160 advgetopt::Name("server-name")
161 , advgetopt::ShortName('n')
162 , advgetopt::Flags(advgetopt::all_flags<
163 advgetopt::GETOPT_FLAG_DYNAMIC_CONFIGURATION
164 , advgetopt::GETOPT_FLAG_REQUIRED
165 , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
166 , advgetopt::Help("Set the name of this server instance.")
167 ),
168 advgetopt::end_options()
169};
170
171
172advgetopt::group_description const g_group_descriptions[] =
173{
174 advgetopt::define_group(
175 advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_COMMANDS)
176 , advgetopt::GroupName("command")
177 , advgetopt::GroupDescription("Commands:")
178 ),
179 advgetopt::define_group(
180 advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_OPTIONS)
181 , advgetopt::GroupName("option")
182 , advgetopt::GroupDescription("Options:")
183 ),
184 advgetopt::end_groups()
185};
186
187
188constexpr char const * const g_configuration_files[] =
189{
190 "/etc/cluck/cluckd.conf",
191 nullptr
192};
193
194
195advgetopt::options_environment const g_options_environment =
196{
197 .f_project_name = "cluckd",
198 .f_group_name = "cluck",
199 .f_options = g_options,
200 .f_environment_variable_name = "CLUCKD_OPTIONS",
201 .f_configuration_files = g_configuration_files,
202 .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_SYSTEM_PARAMETERS
203 | advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
204 .f_help_header = "Usage: %p [-<opt>]\n"
205 "where -<opt> is one or more of:",
206 .f_help_footer = "%c",
207 .f_version = CLUCK_VERSION_STRING,
208 .f_license = "GNU GPL v3",
209 .f_copyright = "Copyright (c) 2013-"
210 SNAPDEV_STRINGIZE(UTC_BUILD_YEAR)
211 " by Made to Order Software Corporation -- All Rights Reserved",
212 .f_groups = g_group_descriptions,
213};
214
215
216
217}
218// no name namespace
219
220
221
222
223
224
225
226
227
228
229
230
231
323cluckd::cluckd(int argc, char * argv[])
324 : f_opts(g_options_environment)
325{
326 snaplogger::add_logger_options(f_opts);
327
328 // before we can parse command line arguments, we must create the
329 // fluid settings & communicator client objects which happen to
330 // dynamically add command line options to f_opts
331 //
332 f_messenger = std::make_shared<messenger>(this, f_opts);
333
334 f_opts.finish_parsing(argc, argv);
335 if(!snaplogger::process_logger_options(f_opts, "/etc/cluck/logger"))
336 {
337 throw advgetopt::getopt_exit("logger options generated an error.", 1);
338 }
339
340 // determine this server name
341 //
342 // TODO: if the name of the server is changed, we should reboot, but
343 // to the minimum we need to restart cluckd (among other daemons)
344 // remember that snapmanager.cgi gives you that option;
345 // we CANNOT use fluid-settings for this one since each computer
346 // must have a different name
347 //
348 if(f_opts.is_defined("server-name"))
349 {
350 f_server_name = f_opts.get_string("server-name");
351 }
352 if(f_server_name.empty())
353 {
354 f_server_name = snapdev::gethostname();
355 }
356
357 f_start_time = snapdev::now();
358}
359
360
367{
368}
369
370
382{
383 f_communicator = ed::communicator::instance();
384
385 // capture Ctrl-C (SIGINT) to get a clean exit by default
386 //
387 f_interrupt = std::make_shared<interrupt>(this);
388 f_communicator->add_connection(f_interrupt);
389
390 // timer so we can timeout locks
391 //
392 f_timer = std::make_shared<timer>(this);
393 f_communicator->add_connection(f_timer);
394
395 // add the messenger used to communicate with the communicator daemon
396 // and other services as required
397 //
398 f_communicator->add_connection(f_messenger);
399
400 // the following call actually connects the messenger to the
401 // communicator daemon
402 //
403 f_messenger->finish_parsing();
404}
405
406
407void cluckd::set_my_ip_address(addr::addr const & a)
408{
409 f_my_ip_address = a;
410}
411
412
422{
423 SNAP_LOG_INFO
424 << "--------------------------------- cluckd started."
425 << SNAP_LOG_SEND;
426
427 // now run our listening loop
428 //
429 f_communicator->run();
430}
431
432
433
434
435
446{
447 return f_computers.size();
448}
449
450
461std::string const & cluckd::get_server_name() const
462{
463 return f_server_name;
464}
465
466
479{
480 // we are not yet properly registered
481 //
482 if(f_messenger == nullptr || !f_messenger->is_ready())
483 {
484 return false;
485 }
486
487 // we need a stable clock otherwise we cannot guarantee that the time
488 // between servers is going to be sufficiently close
489 //
490 if(!f_stable_clock)
491 {
492 return false;
493 }
494
495 // without at least one leader we are definitely not ready
496 //
497 if(f_leaders.empty())
498 {
499 SNAP_LOG_TRACE
500 << "not considered ready: no leaders."
501 << SNAP_LOG_SEND;
502 return false;
503 }
504
505 // enough leaders for that cluster?
506 //
507 // we consider that having at least 2 leaders is valid because locks
508 // will still work, an election should be happening when we lose a
509 // leader fixing that temporary state
510 //
511 // the test below allows for the case where we have a single computer
512 // too (i.e. "one neighbor")
513 //
514 // notice how not having received the CLUSTERUP would be taken in
515 // account here since f_neighbors_count will still be 0 in that case
516 // (however, the previous empty() test already takes that in account)
517 //
518 if(f_leaders.size() == 1
519 && f_neighbors_count != 1)
520 {
521 SNAP_LOG_TRACE
522 << "not considered ready: not enough leaders for this cluster."
523 << SNAP_LOG_SEND;
524 return false;
525 }
526
527 // the election_status() function verifies that the quorum is
528 // attained, but it can change if the cluster grows or shrinks
529 // so we have to check here again as the lock system becomes
530 // "unready" when the quorum is lost; see that other function
531 // for additional info
532
533 // this one probably looks complicated...
534 //
535 // if our quorum is 1 or 2 then we need a number of computers
536 // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
537 // status which we verify here)
538 //
539 if(f_neighbors_quorum < 3
540 && f_computers.size() < f_neighbors_count)
541 {
542 SNAP_LOG_TRACE
543 << "not considered ready: quorum changed, re-election expected soon (number of neighbors: "
545 << ", neighbors quorum: "
547 << ", number of computers: "
548 << f_computers.size()
549 << "."
550 << SNAP_LOG_SEND;
551 return false;
552 }
553
554 // the neighbors count & quorum can change over time so
555 // we have to verify that the number of computers is
556 // still acceptable here
557 //
559 {
560 SNAP_LOG_TRACE
561 << "not considered ready: quorum lost, re-election expected soon."
562 << SNAP_LOG_SEND;
563 return false;
564 }
565
566 // are all leaders connected to us?
567 //
568 std::size_t ready(0);
569 computer::pointer_t last_leader;
570 for(auto const & l : f_leaders)
571 {
572 if(l->get_connected())
573 {
574 ++ready;
575 }
576 else
577 {
578 // attempt resending a LOCK_STARTED because it could be that it
579 // did not work quite right and the cluck daemons are not
580 // going to ever talk with each others otherwise
581 //
582 // we also make sure we do not send the message too many times,
583 // it should be resolved in five seconds or less...
584 //
585 time_t const now(time(nullptr));
586 if(now >= f_pace_lockstarted)
587 {
588 // pause for 5 to 6 seconds in case this happens a lot
589 //
590 f_pace_lockstarted = now + 5;
591
592 // only send it to that specific cluck daemon
593 //
594 ed::message temporary_message;
595 temporary_message.set_sent_from_server(l->get_name());
596 temporary_message.set_sent_from_service(cluck::g_name_cluck_service_name);
597 const_cast<cluckd *>(this)->send_lock_started(&temporary_message);
598 }
599
600 last_leader = l;
601 }
602 }
603
604 // we only need to have 2 leaders to have a functional system with
605 // 2 or 3 leaders total
606 //
607 if(ready >= 2
608 || ready == f_leaders.size())
609 {
610 return true;
611 }
612
613 // we're still not ready
614 //
615 SNAP_LOG_TRACE
616 << "not considered ready: no direct connection with leader: \""
617 << last_leader->get_name()
618 << "\"."
619 << SNAP_LOG_SEND;
620
621 return false;
622}
623
624
659{
660 if(id.empty())
661 {
662 id = f_my_id;
663 }
664
665 auto const l(std::find_if(
666 f_leaders.begin()
667 , f_leaders.end()
668 , [id](auto const & c){
669 return c->get_id() == id;
670 }));
671 if(l != f_leaders.end())
672 {
673 return *l;
674 }
675
676 return computer::pointer_t();
677}
678
679
702{
703#ifdef _DEBUG
704 if(is_leader() == nullptr)
705 {
706 throw cluck::logic_error("cluckd::get_leader_a(): only a leader can call this function.");
707 }
708#endif
709
710 switch(f_leaders.size())
711 {
712 case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
713 default:
714 throw cluck::logic_error("cluckd::get_leader_a(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
715
716 case 1:
717 return computer::pointer_t();
718
719 case 2:
720 case 3:
721 return f_leaders[f_leaders[0]->is_self() ? 1 : 0];
722
723 }
724}
725
726
749{
750#ifdef _DEBUG
751 if(is_leader() == nullptr)
752 {
753 throw cluck::logic_error("cluckd::get_leader_b(): only a leader can call this function.");
754 }
755#endif
756
757 switch(f_leaders.size())
758 {
759 case 0: // LCOV_EXCL_LINE -- because of the debug above, this cannot happen here
760 default:
761 throw cluck::unexpected_case("cluckd::get_leader_b(): call this function only when leaders were elected."); // LCOV_EXCL_LINE
762
763 case 1:
764 case 2: // we have a leader A but no leader B when we have only 2 leaders
765 return computer::pointer_t();
766
767 case 3:
768 return f_leaders[f_leaders[2]->is_self() ? 1 : 2];
769
770 }
771}
772
773
774
790void cluckd::msg_info(ed::message & msg)
791{
792 std::stringstream ss;
793
794 as2js::position p;
795 p.set_filename("cluckd.cpp");
796 p.set_function("msg_info");
797
798 as2js::json::json_value::object_t obj;
799 as2js::json::json_value::pointer_t result(std::make_shared<as2js::json::json_value>(p, obj));
800
801 {
802 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, is_daemon_ready()));
803 result->set_member("daemon_ready", value);
804 }
805
806 if(!f_my_id.empty())
807 {
808 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, f_my_id));
809 result->set_member("id", value);
810 }
811
812 {
813 addr::addr zero;
814 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
815 f_my_ip_address == zero
816 ? "<not assigned>"
817 : f_my_ip_address.to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
818 result->set_member("ip", value);
819 }
820
821 {
822 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_count)));
823 result->set_member("neighbors_count", value);
824 }
825
826 {
827 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_neighbors_quorum)));
828 result->set_member("neighbors_quorum", value);
829 }
830
831 {
832 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_leaders.size())));
833 result->set_member("leaders_count", value);
834 }
835
836 if(!f_message_cache.empty())
837 {
838 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(f_message_cache.size())));
839 result->set_member("cache_size", value);
840 }
841
842 {
843 as2js::json::json_value::array_t computers;
844 as2js::json::json_value::pointer_t list(std::make_shared<as2js::json::json_value>(p, computers));
845
846 for(auto const & c : f_computers)
847 {
848 as2js::json::json_value::object_t computer;
849 as2js::json::json_value::pointer_t item(std::make_shared<as2js::json::json_value>(p, computer));
850
851 {
852 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_name()));
853 item->set_member("name", value);
854 }
855
856 {
857 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_id()));
858 item->set_member("id", value);
859 }
860
861 {
862 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_ip_address().to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
863 item->set_member("ip", value);
864 }
865
866 {
867 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_connected()));
868 item->set_member("connected", value);
869 }
870
871 {
872 auto const it(std::find_if(
873 f_leaders.begin()
874 , f_leaders.end()
875 , [&c](auto const & l)
876 {
877 return c.second == l;
878 }));
879 std::string leader;
880 if(it != f_leaders.end())
881 {
882 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(it - f_leaders.begin())));
883 item->set_member("leader", value);
884 }
885 }
886
887 list->set_item(list->get_array().size(), item);
888 }
889 result->set_member("computers", list);
890 }
891
892 if(msg.has_parameter(cluck::g_name_cluck_param_mode)
893 && msg.get_parameter(cluck::g_name_cluck_param_mode) == cluck::g_name_cluck_value_debug)
894 {
895 // TODO: create a serialization to JSON instead of a specialized string
896 //
897 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, serialized_tickets()));
898 result->set_member("tickets", value);
899 }
900
901 ed::message reply_message;
902 reply_message.set_command(cluck::g_name_cluck_cmd_cluckd_status);
903 reply_message.reply_to(msg);
904 reply_message.add_parameter(communicator::g_name_communicator_param_status, result->to_string());
905 f_messenger->send_message(reply_message);
906}
907
908
909
917std::string cluckd::ticket_list() const
918{
919 std::stringstream list;
920 for(auto const & obj_ticket : f_tickets)
921 {
922 for(auto const & key_ticket : obj_ticket.second)
923 {
924 list
925 << "ticket_id: "
926 << key_ticket.second->get_ticket_number()
927 << " object name: \""
928 << key_ticket.second->get_object_name()
929 << "\" key: "
930 << key_ticket.second->get_entering_key();
931
932 cluck::timeout_t const lock_timeout(key_ticket.second->get_lock_timeout_date());
933 if(lock_timeout != cluck::timeout_t())
934 {
935 list
936 << " timeout "
937 << lock_timeout.to_string();
938 }
939 else
940 {
941 cluck::timeout_t const obtention_timeout(key_ticket.second->get_obtention_timeout());
942 list
943 << " obtention "
944 << obtention_timeout.to_string();
945 }
946
947 list << '\n';
948 }
949 }
950
951 return list.str();
952}
953
954
973{
974 // we already have election results?
975 //
976 if(!f_leaders.empty())
977 {
978 // the results may have been "tempered" with (i.e. one of
979 // the leaders was lost)
980 //
981 if(f_leaders.size() == 3
982 || (f_neighbors_count < 3 && f_leaders.size() == f_neighbors_count))
983 {
984 // status is fine
985 //
986 return;
987 }
988 }
989
990 // we do not yet know our IP address, we cannot support an election just yet
991 //
992 if(f_my_ip_address.is_default())
993 {
994 // this should not be possible since we expect to receive the
995 // REGISTER's reply before any other message
996 //
997 return; // LCOV_EXCL_LINE
998 }
999
1000 // neighbors count is 0 until we receive a very first CLUSTER_UP
1001 // (note that it does not go back to zero on CLUSTER_DOWN, however,
1002 // the quorum as checked in the next if() is never going to be
1003 // reached if the cluster is down)
1004 //
1005 if(f_neighbors_count == 0)
1006 {
1007 return;
1008 }
1009
1010 // this one probably looks complicated...
1011 //
1012 // if our quorum is 1 or 2 then we need a number of computers
1013 // equal to the total number of computers (i.e. a CLUSTER_COMPLETE
1014 // status which we compute here)
1015 //
1016 if(f_neighbors_quorum < 3
1017 && f_computers.size() < f_neighbors_count)
1018 {
1019 return;
1020 }
1021
1022 // since the neighbors count & quorum never go back to zero (on a
1023 // CLUSTER_DOWN) we have to verify that the number of computers is
1024 // acceptable here
1025 //
1026 // Note: further we will not count computers marked disabled, which
1027 // is done below when sorting by ID, however, that does not
1028 // prevent the quorum to be attained, even with disabled
1029 // computers
1030 //
1031 if(f_computers.size() < f_neighbors_quorum)
1032 {
1033 return;
1034 }
1035
1036 // to proceed with an election we must have the smallest IP address
1037 // (it is not absolutely required, but that way we avoid many
1038 // consensus problems, in effect we have one "temporary-leader" that ends
1039 // up telling us who the final three leaders are)
1040 //
1041 // TODO: verify that this works properly in a non-complete cluster
1042 //
1043 for(auto & c : f_computers)
1044 {
1045 // Note: the test fails when we compare to ourselves so we do not
1046 // need any special case
1047 //
1048 if(c.second->get_ip_address() < f_my_ip_address)
1049 {
1050 return;
1051 }
1052 }
1053
1054 // to select the leaders sort them by identifier and take the first
1055 // three (i.e. lower priority, random, IP, pid.)
1056 //
1057 int off(0);
1058 computer::map_t sort_by_id;
1059 for(auto const & c : f_computers)
1060 {
1061 // ignore nodes with a priority of 15 (i.e. OFF)
1062 //
1063 if(c.second->get_priority() != computer::PRIORITY_OFF)
1064 {
1065 sort_by_id[c.second->get_id()] = c.second;
1066 }
1067 else
1068 {
1069 ++off;
1070 }
1071 }
1072
1073//for(auto const & s : sort_by_id)
1074//{
1075//SNAP_LOG_WARNING << "--- sort by ID: " << s.first << SNAP_LOG_SEND;
1076//}
1077
1078 bool too_many_computers_off(false);
1079 if(f_computers.size() <= 3)
1080 {
1081 if(off != 0
1082 && f_computers.size() >= f_neighbors_count)
1083 {
1084 SNAP_LOG_FATAL
1085 << "you cannot have any cluck computer turned OFF when you"
1086 " have three or less computers total in your cluster."
1087 " The elections cannot be completed in these"
1088 " conditions."
1089 << SNAP_LOG_SEND;
1090 too_many_computers_off = true;
1091 }
1092 }
1093 else if(f_computers.size() - off < 3
1094 && f_computers.size() >= f_neighbors_count)
1095 {
1096 SNAP_LOG_FATAL
1097 << "you have a total of "
1098 << f_computers.size()
1099 << " computers in your cluster. You turned off "
1100 << off
1101 << " of them, which means less than three are left"
1102 " as candidates for leadership which is not enough."
1103 " You can have a maximum of "
1104 << f_computers.size() - 3
1105 << " that are turned off on this cluster."
1106 << SNAP_LOG_SEND;
1107 too_many_computers_off = true;
1108 }
1109 if(too_many_computers_off)
1110 {
1111 // only generate the flag once we reach the CLUSTER_COMPLETE status
1112 // (we cannot be sure that the `off` variable is valid until then)
1113 //
1114 if(f_computers.size() >= f_neighbors_count)
1115 {
1116 // this is something that breaks the entire system so someone
1117 // needs to fix it and thus it has a really high priority
1118 //
1119 communicator::flag::pointer_t flag(COMMUNICATOR_FLAG_UP(
1120 "cluckd"
1121 , "election"
1122 , "instances-off"
1123 , "the cluck daemon detected that too many of the"
1124 " daemons have their priority set to OFF;"
1125 " you must turn some of these back ON."
1126 ));
1127 flag->set_priority(99);
1128 flag->add_tag("settings");
1129 flag->set_manual_down(true);
1130 flag->save();
1131 }
1132 return;
1133 }
1134
1135 // an election works as soon as we have at least 2 leaders
1136 // or we reached the total number of computers
1137 //
1138 if(sort_by_id.size() < 2
1139 && sort_by_id.size() != f_computers.size())
1140 {
1141 return;
1142 }
1143
1144//std::cerr << f_communicator_port << " is conducting an election:\n";
1145//for(auto s : sort_by_id)
1146//{
1147//std::cerr << " " << s.second->get_name() << " " << s.first << "\n";
1148//}
1149
1150 // the first three are the new leaders
1151 //
1152 ed::message lock_leaders_message;
1153 lock_leaders_message.set_command(cluck::g_name_cluck_cmd_lock_leaders);
1154 lock_leaders_message.set_service(communicator::g_name_communicator_server_any);
1155 f_leaders.clear();
1156 f_election_date = snapdev::now();
1157 lock_leaders_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1158 auto leader(sort_by_id.begin());
1159 std::size_t const max(std::min(static_cast<computer::map_t::size_type>(3), sort_by_id.size()));
1160 for(std::size_t idx(0); idx < max; ++idx, ++leader)
1161 {
1162 lock_leaders_message.add_parameter(
1163 cluck::g_name_cluck_param_leader + std::to_string(idx)
1164 , leader->second->get_id());
1165 f_leaders.push_back(leader->second);
1166 }
1167 f_messenger->send_message(lock_leaders_message);
1168
1169#if 1
1170SNAP_LOG_WARNING
1171<< "election status = add leader(s)... "
1172<< f_computers.size()
1173<< " computers and "
1174<< f_leaders.size()
1175<< " leaders."
1176<< SNAP_LOG_SEND;
1177#endif
1178
1179 // we need to synchronize from this cluckd daemon if it is one of the
1180 // leaders otherwise we won't get the LOCK_STARTED which would also
1181 // call that function
1182 //
1184}
1185
1186
1188{
1189 bool const lock_status(is_daemon_ready());
1190 if(f_lock_status == lock_status)
1191 {
1192 return;
1193 }
1194 f_lock_status = lock_status;
1195
1196 ed::message status_message;
1197 status_message.set_command(f_lock_status
1198 ? cluck::g_name_cluck_cmd_lock_ready
1199 : cluck::g_name_cluck_cmd_no_lock);
1200 status_message.set_service(communicator::g_name_communicator_server_me);
1201 status_message.add_parameter(communicator::g_name_communicator_param_cache, communicator::g_name_communicator_value_no);
1202 f_messenger->send_message(status_message);
1203
1204 if(lock_status
1205 && !f_message_cache.empty())
1206 {
1207 // we still have a cache of locks that can now be processed
1208 //
1209 // note:
1210 // although msg_lock() could re-add some of those messages
1211 // in the f_message_cache vector, it should not since it
1212 // calls the same is_daemon_ready() function which we know returns
1213 // true and therefore no cache is required
1214 //
1216 cache.swap(f_message_cache);
1217 for(auto & mc : cache)
1218 {
1219 msg_lock(mc.f_message);
1220 }
1221 }
1222}
1223
1224
1225void cluckd::send_lock_started(ed::message const * msg)
1226{
1227 // tell other cluck daemon instances that are already listening that
1228 // we are ready; this way we can calculate the number of computers
1229 // available in our network and use that to calculate the QUORUM
1230 //
1231 ed::message lock_started_message;
1232 lock_started_message.set_command(cluck::g_name_cluck_cmd_lock_started);
1233 if(msg == nullptr)
1234 {
1235 lock_started_message.set_service(communicator::g_name_communicator_service_public_broadcast);
1236
1237 // unfortunately, the following does NOT work as expected...
1238 // (i.e. the following ends up sending the message to ourselves only
1239 // and does not forward to any remote communicators).
1240 //
1241 //lock_started_message.set_server(communicator::g_name_communicator_server_any);
1242 //lock_started_message.set_service(cluck::g_name_cluck_service_name);
1243 }
1244 else
1245 {
1246 lock_started_message.reply_to(*msg);
1247 }
1248
1249 // our info: server name and id
1250 //
1251 lock_started_message.add_parameter(communicator::g_name_communicator_param_server_name, f_server_name);
1252 lock_started_message.add_parameter(cluck::g_name_cluck_param_lock_id, f_my_id);
1253 lock_started_message.add_parameter(cluck::g_name_cluck_param_start_time, f_start_time);
1254
1255 // include the leaders if present
1256 //
1257 if(!f_leaders.empty())
1258 {
1259 lock_started_message.add_parameter(cluck::g_name_cluck_param_election_date, f_election_date);
1260 for(size_t idx(0); idx < f_leaders.size(); ++idx)
1261 {
1262 lock_started_message.add_parameter(
1263 cluck::g_name_cluck_param_leader + std::to_string(idx)
1264 , f_leaders[idx]->get_id());
1265 }
1266 }
1267
1268 f_messenger->send_message(lock_started_message);
1269}
1270
1271
1284void cluckd::stop(bool quitting)
1285{
1286 if(f_messenger != nullptr)
1287 {
1288 f_messenger->unregister_fluid_settings(quitting);
1289 f_communicator->remove_connection(f_messenger);
1290 f_messenger.reset();
1291 }
1292
1293 if(f_communicator != nullptr)
1294 {
1295 f_communicator->remove_connection(f_interrupt);
1296 f_interrupt.reset();
1297
1298 f_communicator->remove_connection(f_timer);
1299 f_timer.reset();
1300 }
1301}
1302
1303
1328void cluckd::activate_first_lock(std::string const & object_name)
1329{
1330 auto ticket(find_first_lock(object_name));
1331
1332 if(ticket != nullptr)
1333 {
1334 // there is what we think is the first ticket
1335 // that should be actived now; we need to share
1336 // with the other 2 leaders to make sure of that
1337 //
1339 }
1340}
1341
1342
1343ticket::pointer_t cluckd::find_first_lock(std::string const & object_name)
1344{
1345 ticket::pointer_t first_ticket;
1346 auto const obj_ticket(f_tickets.find(object_name));
1347
1348 if(obj_ticket != f_tickets.end())
1349 {
1350 // loop through making sure that we activate a ticket only
1351 // if the obtention date was not already reached; if that
1352 // date was reached before we had the time to activate the
1353 // lock, then the client should have abandonned the lock
1354 // request anyway...
1355 //
1356 // (this is already done in the cleanup(), but a couple of
1357 // other functions may call the activate_first_lock()
1358 // function!)
1359 //
1360 for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1361 {
1362 if(key_ticket->second->timed_out())
1363 {
1364 // that ticket timed out, send an UNLOCKING, UNLOCKED,
1365 // or LOCK_FAILED message and get rid of it
1366 //
1367 key_ticket->second->lock_failed("timed out while searching for first lock");
1368 if(key_ticket->second->timed_out())
1369 {
1370 // still timed out, remove it
1371 //
1372 key_ticket = obj_ticket->second.erase(key_ticket);
1373 }
1374 }
1375 else
1376 {
1377 if(first_ticket == nullptr)
1378 {
1379 first_ticket = key_ticket->second;
1380 }
1381 ++key_ticket;
1382 }
1383 }
1384
1385 if(obj_ticket->second.empty())
1386 {
1387 // it is empty now, get rid of that set of tickets
1388 //
1389 f_tickets.erase(obj_ticket);
1390 }
1391 }
1392
1393 return first_ticket;
1394}
1395
1396
1420{
1421 // there is nothing to do if we are by ourselves because we cannot
1422 // gain any type of concensus unless we are expected to be the only
1423 // one in which case there is no synchronization requirements anyway
1424 //
1425 if(f_leaders.size() <= 1)
1426 {
1427 return;
1428 }
1429
1430 // only leaders can synchronize each others
1431 // (other cluck daemons do not have any tickets to synchronize)
1432 //
1433 if(is_leader() == nullptr)
1434 {
1435 return;
1436 }
1437
1438 // determine whether we are leader #0 or not, if zero, then we
1439 // call msg_lock() directly, otherwise we do a f_messenger->send_message()
1440 //
1441 // TODO: review the logic here, I do not think that leader0 has anything
1442 // to do with lock requests and tickets; instead, I think that the
1443 // sharing that needs to happen is between the old and the new
1444 // leaders (i.e. when we lose a leader and assign another computer
1445 // as a new leader to compensate, that new leader needs to get
1446 // all the info which is what the LOCK_TICKETS message is about)
1447 //
1448 bool const leader0(f_leaders[0]->get_id() == f_my_id);
1449
1450 // a vector of messages for which we have to call msg_lock()
1451 //
1452 ed::message::vector_t local_locks;
1453
1454 // if entering a ticket is definitely not locked, although it
1455 // could be ready (one step away from being locked!) we still
1456 // restart the whole process with the new leaders if such
1457 // exist
1458 //
1459 // Note: of course we restart the process only if the owner
1460 // was that one leader that disappeared, not if the
1461 // ticket is owned by a remaining leader
1462 //
1463 for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); ++obj_entering)
1464 {
1465 for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1466 {
1467 std::string const owner_name(key_entering->second->get_owner());
1468 auto key_leader(std::find_if(
1469 f_leaders.begin()
1470 , f_leaders.end()
1471 , [&owner_name](auto const & l)
1472 {
1473 return l->get_name() == owner_name;
1474 }));
1475 if(key_leader == f_leaders.end())
1476 {
1477 // give new ownership to leader[0]
1478 //
1479 ed::message lock_message;
1480 lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1481 lock_message.set_server(f_leaders[0]->get_name());
1482 lock_message.set_service(cluck::g_name_cluck_service_name);
1483 lock_message.set_sent_from_server(key_entering->second->get_server_name());
1484 lock_message.set_sent_from_service(key_entering->second->get_service_name());
1485 lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_entering->second->get_object_name());
1486 lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_entering->second->get_tag());
1487 lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_entering->second->get_client_pid());
1488 lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_entering->second->get_obtention_timeout());
1489 lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_entering->second->get_lock_duration());
1490 lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_entering->second->get_unlock_duration());
1491 if(leader0)
1492 {
1493 // we are leader #0 so directly call msg_lock()
1494 //
1495 // first we remove the entry otherwise we get a duplicate
1496 // error since we try to re-add the same ticket
1497 //
1498 key_entering = obj_entering->second.erase(key_entering);
1499 local_locks.push_back(lock_message);
1500 }
1501 else
1502 {
1503 // we are not leader #0, so send the message to it
1504 //
1505 ++key_entering;
1506 lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_entering->second->get_serial());
1507 f_messenger->send_message(lock_message);
1508 }
1509 }
1510 else
1511 {
1512 ++key_entering;
1513 }
1514 }
1515 }
1516
1517 // a ticket may still be unlocked in which case we want to
1518 // restart the lock process as if still entering
1519 //
1520 // if locked, a ticket is assigned leader0 as its new owner so
1521 // further work on that ticket works as expected
1522 //
1523 std::string serialized;
1524 for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); ++obj_ticket)
1525 {
1526 for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1527 {
1528 std::string const owner_name(key_ticket->second->get_owner());
1529 auto key_leader(std::find_if(
1530 f_leaders.begin()
1531 , f_leaders.end()
1532 , [&owner_name](auto const & l)
1533 {
1534 return l->get_name() == owner_name;
1535 }));
1536 if(key_ticket->second->is_locked())
1537 {
1538 // if ticket was locked by the leader that disappeared, we
1539 // transfer ownership to leader #0
1540 //
1541 if(key_leader == f_leaders.end())
1542 {
1543 key_ticket->second->set_owner(f_leaders[0]->get_name());
1544 }
1545
1546 // and send that ticket to the other leaders to make sure
1547 // they all agree on its current state
1548 //
1549 serialized += key_ticket->second->serialize();
1550 serialized += '\n';
1551
1552 ++key_ticket;
1553 }
1554 else
1555 {
1556 // it was not locked yet, restart the LOCK process from
1557 // the very beginning
1558 //
1559 if(key_leader == f_leaders.end())
1560 {
1561 // give new ownership to leader[0]
1562 //
1563 ed::message lock_message;
1564 lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1565 lock_message.set_server(f_leaders[0]->get_name());
1566 lock_message.set_service(cluck::g_name_cluck_service_name);
1567 lock_message.set_sent_from_server(key_ticket->second->get_server_name());
1568 lock_message.set_sent_from_service(key_ticket->second->get_service_name());
1569 lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_ticket->second->get_object_name());
1570 lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_ticket->second->get_tag());
1571 lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_ticket->second->get_client_pid());
1572 lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_ticket->second->get_obtention_timeout());
1573 lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_ticket->second->get_lock_duration());
1574 lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_ticket->second->get_unlock_duration());
1575 if(leader0)
1576 {
1577 // we are leader #0 so directly call msg_lock()
1578 //
1579 key_ticket = obj_ticket->second.erase(key_ticket);
1580 local_locks.push_back(lock_message);
1581 }
1582 else
1583 {
1584 // we are not leader #0, so send the message to it
1585 //
1586 ++key_ticket;
1587 lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_ticket->second->get_serial());
1588 f_messenger->send_message(lock_message);
1589 }
1590 }
1591 else
1592 {
1593 ++key_ticket;
1594 }
1595 }
1596 }
1597 }
1598
1599 // we send those after the loops above because the msg_lock() is
1600 // not unlikely to change the f_entering_tickets map and looping
1601 // through it when another function is going to modify it is not
1602 // wise
1603 //
1604 for(auto lm : local_locks)
1605 {
1606 msg_lock(lm);
1607 }
1608
1609 // send LOCK_TICkETS if there is serialized ticket data
1610 //
1611 if(!serialized.empty())
1612 {
1613 ed::message lock_tickets_message;
1614 lock_tickets_message.set_command(cluck::g_name_cluck_cmd_lock_tickets);
1615 lock_tickets_message.set_service(cluck::g_name_cluck_service_name);
1616 lock_tickets_message.add_parameter(cluck::g_name_cluck_param_tickets, serialized);
1617
1618 auto const la(get_leader_a());
1619 if(la != nullptr)
1620 {
1621 lock_tickets_message.set_server(la->get_name());
1622 f_messenger->send_message(lock_tickets_message);
1623
1624 auto const lb(get_leader_b());
1625 if(lb != nullptr)
1626 {
1627 lock_tickets_message.set_server(lb->get_name());
1628 f_messenger->send_message(lock_tickets_message);
1629 }
1630 }
1631 }
1632}
1633
1634
1649{
1650 // for safety, we do not call this function if the daemon is not
1651 // considered ready meaning it has at least one leader
1652 //
1653 if(f_leaders.empty())
1654 {
1655 return; // LCOV_EXCL_LINE
1656 }
1657
1658 // we are not a leader, we work as a proxy by forwarding the
1659 // message to a leader, we add our trail so the LOCKED and
1660 // other messages can be proxied back
1661 //
1662 // Note: using the get_sent_from_server() means that we may not
1663 // even see the returned message, it may be proxied to another
1664 // server directly or through another route
1665 //
1666 msg.set_service(cluck::g_name_cluck_service_name);
1667 msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_server_name, msg.get_sent_from_server());
1668 msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_service_name, msg.get_sent_from_service());
1669
1670 f_next_leader = (f_next_leader + 1) % f_leaders.size();
1671 msg.set_server(f_leaders[f_next_leader]->get_name());
1672
1673 f_messenger->send_message(msg);
1674}
1675
1676
1689{
1690 cluck::timeout_t next_timeout(snapdev::timespec_ex::max());
1691
1692 // when we receive LOCK requests before we have leaders elected, they
1693 // get added to our cache, so do some cache clean up when not empty
1694 //
1695 cluck::timeout_t const now(snapdev::now());
1696 for(auto c(f_message_cache.begin()); c != f_message_cache.end(); )
1697 {
1698 if(c->f_timeout <= now)
1699 {
1700 std::string object_name;
1701 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
1702 pid_t client_pid(0);
1703 cluck::timeout_t timeout;
1704 if(!get_parameters(c->f_message, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
1705 {
1706 // we should never cache messages that are invalid
1707 //
1708 throw cluck::logic_error("cluck::cleanup() of LOCK message failed get_parameters()."); // LCOV_EXCL_LINE
1709 }
1710
1711 SNAP_LOG_WARNING
1712 << "Lock on \""
1713 << object_name
1714 << "\" / \""
1715 << client_pid
1716 << "\" / \""
1717 << tag
1718 << "\" timed out before leaders were known."
1719 << SNAP_LOG_SEND;
1720
1721 std::string const server_name(c->f_message.has_parameter("lock_proxy_server_name")
1722 ? c->f_message.get_parameter("lock_proxy_server_name")
1723 : c->f_message.get_sent_from_server());
1724 std::string const service_name(c->f_message.has_parameter("lock_proxy_service_name")
1725 ? c->f_message.get_parameter("lock_proxy_service_name")
1726 : c->f_message.get_sent_from_service());
1727 std::string const entering_key(server_name + '/' + std::to_string(client_pid));
1728
1729 ed::message lock_failed_message;
1730 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
1731 lock_failed_message.set_service(service_name);
1732 lock_failed_message.set_server(server_name);
1733 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
1734 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
1735 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
1736 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
1737#ifndef CLUCKD_OPTIMIZATIONS
1738 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "cleanup() found a timed out lock");
1739#endif
1740 f_messenger->send_message(lock_failed_message);
1741
1742 c = f_message_cache.erase(c);
1743 }
1744 else
1745 {
1746 if(c->f_timeout < next_timeout)
1747 {
1748 next_timeout = c->f_timeout;
1749 }
1750 ++c;
1751 }
1752 }
1753
1754 // remove any f_ticket that timed out
1755 //
1756 for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); )
1757 {
1758 bool try_activate(false);
1759 for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1760 {
1761 bool move_next(true);
1762 if(key_ticket->second->timed_out())
1763 {
1764 key_ticket->second->lock_failed("ticket: timed out while cleaning up");
1765 if(key_ticket->second->timed_out())
1766 {
1767 // still timed out, remove it
1768 //
1769 key_ticket = obj_ticket->second.erase(key_ticket);
1770 try_activate = true;
1771 move_next = false;
1772 }
1773 }
1774 if(move_next)
1775 {
1776 if(key_ticket->second->get_current_timeout_date() < next_timeout)
1777 {
1778 next_timeout = key_ticket->second->get_current_timeout_date();
1779 }
1780 ++key_ticket;
1781 }
1782 }
1783
1784 if(obj_ticket->second.empty())
1785 {
1786 obj_ticket = f_tickets.erase(obj_ticket);
1787 }
1788 else
1789 {
1790 if(try_activate)
1791 {
1792 // something was erased, a new ticket may be first
1793 //
1794 activate_first_lock(obj_ticket->first);
1795 }
1796
1797 ++obj_ticket;
1798 }
1799 }
1800
1801 // remove any f_entering_tickets that timed out
1802 //
1803 for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); )
1804 {
1805 for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1806 {
1807 if(key_entering->second->timed_out())
1808 {
1809 key_entering->second->lock_failed("entering ticket: timed out while cleanup");
1810 if(key_entering->second->timed_out())
1811 {
1812 // still timed out, remove it
1813 //
1814 key_entering = obj_entering->second.erase(key_entering);
1815 }
1816 }
1817 else
1818 {
1819 if(key_entering->second->get_current_timeout_date() < next_timeout)
1820 {
1821 next_timeout = key_entering->second->get_current_timeout_date();
1822 }
1823 ++key_entering;
1824 }
1825 }
1826
1827 if(obj_entering->second.empty())
1828 {
1829 obj_entering = f_entering_tickets.erase(obj_entering);
1830 }
1831 else
1832 {
1833 ++obj_entering;
1834 }
1835 }
1836
1837 // got a new timeout?
1838 //
1839 if(next_timeout != snapdev::timespec_ex::max())
1840 {
1841 // we add one second to avoid looping like crazy
1842 // if we timeout just around the "wrong" time
1843 //
1844 f_timer->set_timeout_date(next_timeout + cluck::timeout_t(1, 0));
1845 }
1846 else
1847 {
1848 f_timer->set_timeout_date(-1);
1849 }
1850}
1851
1852
1871ticket::ticket_id_t cluckd::get_last_ticket(std::string const & object_name)
1872{
1874
1875 // Note: There is no need to check the f_entering_tickets list
1876 // since that one does not yet have any ticket number assigned
1877 // and thus the maximum there would return 0 every time
1878 //
1879 auto obj_ticket(f_tickets.find(object_name));
1880 if(obj_ticket != f_tickets.end())
1881 {
1882 // note:
1883 // the std::max_element() algorithm would require many more
1884 // get_ticket_number() when our loop uses one per ticket
1885 //
1886 for(auto key_ticket : obj_ticket->second)
1887 {
1888 ticket::ticket_id_t const ticket_number(key_ticket.second->get_ticket_number());
1889 if(ticket_number > last_ticket)
1890 {
1891 last_ticket = ticket_number;
1892 }
1893 }
1894 }
1895
1896 return last_ticket;
1897}
1898
1899
1911 std::string const & object_name
1912 , std::string const & key
1914{
1915 f_tickets[object_name][key] = ticket;
1916}
1917
1918
1930ticket::key_map_t const cluckd::get_entering_tickets(std::string const & object_name)
1931{
1932 auto const it(f_entering_tickets.find(object_name));
1933 if(it == f_entering_tickets.end())
1934 {
1935 // LCOV_EXCL_START
1936 //
1937 // I could not get this covered and I do not think it can happen,
1938 // also the caller function does not verify that the returned map
1939 // is valid so it is safer this way
1940 //
1941 throw std::logic_error(
1942 "could not find entering ticket with object name: \""
1943 + object_name
1944 + "\".");
1945 // LCOV_EXCL_STOP
1946 }
1947
1948 return it->second;
1949}
1950
1951
1960void cluckd::lock_exiting(ed::message & msg)
1961{
1962 msg_lock_exiting(msg);
1963}
1964
1965
1966
1967
1968
1969
1971{
1972 std::stringstream result;
1973
1974 for(auto const & obj_ticket : f_tickets)
1975 {
1976 for(auto const & key_ticket : obj_ticket.second)
1977 {
1978 result
1979 << key_ticket.second->serialize()
1980 << '\n';
1981 }
1982 }
1983
1984 return result.str();
1985}
1986
1987
2018 ed::message const & msg
2019 , std::string * object_name
2020 , ed::dispatcher_match::tag_t * tag
2021 , pid_t * client_pid
2022 , cluck::timeout_t * timeout
2023 , std::string * key
2024 , std::string * source)
2025{
2026 // get the "object name" (what we are locking)
2027 // in Snap, the object name is often a URI plus the action we are performing
2028 //
2029 if(object_name != nullptr)
2030 {
2031 *object_name = msg.get_parameter(cluck::g_name_cluck_param_object_name);
2032 }
2033
2034 // the same application may want to hold multiple locks simultaneously
2035 // and this is made possible by using a tag (a 16 bits number)
2036 //
2037 if(tag != nullptr)
2038 {
2039 *tag = msg.get_integer_parameter(cluck::g_name_cluck_param_tag);
2040 }
2041
2042 // get the pid (process identifier) of the process that is
2043 // requesting the lock; this is important to be able to distinguish
2044 // multiple processes on the same computer requesting a lock
2045 //
2046 if(client_pid != nullptr)
2047 {
2048 *client_pid = msg.get_integer_parameter(cluck::g_name_cluck_param_pid);
2049 if(*client_pid < 1)
2050 {
2051 // invalid pid
2052 //
2053 SNAP_LOG_NOISY_ERROR
2054 << "cluckd::get_parameters(): invalid pid specified for a lock ("
2055 << std::to_string(*client_pid)
2056 << "); it must be a positive decimal number."
2057 << SNAP_LOG_SEND;
2058 return false;
2059 }
2060 }
2061
2062 // get the time limit we will wait up to before we decide we
2063 // cannot obtain that lock
2064 //
2065 if(timeout != nullptr)
2066 {
2067 if(msg.has_parameter(cluck::g_name_cluck_param_timeout))
2068 {
2069 // this timeout may already be out of date in which case
2070 // the lock immediately fails
2071 //
2072 *timeout = msg.get_timespec_parameter(cluck::g_name_cluck_param_timeout);
2073 }
2074 else
2075 {
2076 *timeout = snapdev::now() + cluck::get_lock_obtention_timeout();
2077 }
2078 }
2079
2080 // get the key of a ticket or entering object
2081 //
2082 if(key != nullptr)
2083 {
2084 *key = msg.get_parameter(cluck::g_name_cluck_param_key);
2085 }
2086
2087 // get the source of a ticket (i.e. <server> '/' <service>)
2088 //
2089 if(source != nullptr)
2090 {
2091 *source = msg.get_parameter(cluck::g_name_cluck_param_source);
2092 }
2093
2094 return true;
2095}
2096
2097
2109void cluckd::msg_absolutely(ed::message & msg)
2110{
2111 // we may receive the ABSOLUTELY message from anywhere so don't expect
2112 // the "serial" parameter to be defined
2113 //
2114 if(!msg.has_parameter(ed::g_name_ed_param_serial))
2115 {
2116 return;
2117 }
2118
2119 std::string const serial(msg.get_parameter(ed::g_name_ed_param_serial));
2120 std::vector<std::string> segments;
2121 snapdev::tokenize_string(segments, serial, "/");
2122
2123 if(segments[0] == "cluckd")
2124 {
2125 // check serial as defined in msg_lock()
2126 //
2127 if(segments.size() != 4)
2128 {
2129 SNAP_LOG_WARNING
2130 << "ABSOLUTELY reply has an invalid cluckd serial parameters \""
2131 << serial
2132 << "\" was expected to have exactly 4 segments."
2133 << SNAP_LOG_SEND;
2134
2135 ed::message invalid;
2136 invalid.set_command(ed::g_name_ed_cmd_invalid);
2137 invalid.reply_to(msg);
2138 invalid.add_parameter(
2139 ed::g_name_ed_param_command
2140 , msg.get_command());
2141 invalid.add_parameter(
2142 ed::g_name_ed_param_message
2143 , "invalid number of segments in \""
2144 + serial
2145 + "\".");
2146 f_messenger->send_message(invalid);
2147
2148 return;
2149 }
2150
2151 // notice how the split() re-split the entering key
2152 //
2153 std::string const object_name(segments[1]);
2154 std::string const server_name(segments[2]);
2155 std::string const client_pid(segments[3]);
2156
2157 auto entering_ticket(f_entering_tickets.find(object_name));
2158 if(entering_ticket != f_entering_tickets.end())
2159 {
2160 std::string const entering_key(server_name + '/' + client_pid);
2161 auto key_ticket(entering_ticket->second.find(entering_key));
2162 if(key_ticket != entering_ticket->second.end())
2163 {
2164 // remove the alive timeout
2165 //
2166 key_ticket->second->set_alive_timeout(cluck::timeout_t());
2167
2168 // got it! start the bakery algorithm
2169 //
2170 key_ticket->second->entering();
2171 }
2172 }
2173 }
2174
2175 // ignore other messages
2176}
2177
2178
2193void cluckd::msg_activate_lock(ed::message & msg)
2194{
2195 std::string object_name;
2196 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2197 std::string key;
2198 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2199 {
2200 return; // LCOV_EXCL_LINE
2201 }
2202
2203 std::string first_key("no-key");
2204
2205 auto ticket(find_first_lock(object_name));
2206 if(ticket != nullptr)
2207 {
2208 // found a lock
2209 //
2210 first_key = ticket->get_ticket_key();
2211
2212 if(key == first_key)
2213 {
2214 // we can mark this ticket as activated
2215 //
2217 }
2218 }
2219
2220 // always reply, if we could not find the key, then we returned 'no-key'
2221 // as the key parameter
2222 //
2223 ed::message lock_activated_message;
2224 lock_activated_message.set_command(cluck::g_name_cluck_cmd_lock_activated);
2225 lock_activated_message.reply_to(msg);
2226 lock_activated_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2227 lock_activated_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2228 lock_activated_message.add_parameter(cluck::g_name_cluck_param_key, key);
2229 lock_activated_message.add_parameter(cluck::g_name_cluck_param_other_key, first_key);
2230 f_messenger->send_message(lock_activated_message);
2231
2232 // the list of tickets is not unlikely changed so we need to make
2233 // a call to cleanup to make sure the timer is reset appropriately
2234 //
2235 cleanup();
2236}
2237
2238
2254void cluckd::msg_add_ticket(ed::message & msg)
2255{
2256 std::string object_name;
2257 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2258 std::string key;
2259 cluck::timeout_t timeout;
2260 if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, nullptr))
2261 {
2262 return; // LCOV_EXCL_LINE
2263 }
2264
2265 // make sure the ticket is unique
2266 //
2267 auto const obj_ticket(f_tickets.find(object_name));
2268 if(obj_ticket != f_tickets.end())
2269 {
2270 auto const key_ticket(obj_ticket->second.find(key));
2271 if(key_ticket != obj_ticket->second.end())
2272 {
2273 SNAP_LOG_ERROR
2274 << "an existing ticket has the same object name \""
2275 << object_name
2276 << "\" ("
2277 << tag
2278 << ") and key \""
2279 << key
2280 << "\"."
2281 << SNAP_LOG_SEND;
2282
2283 ed::message lock_failed_message;
2284 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2285 lock_failed_message.reply_to(msg);
2286 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2287 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2288 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2289 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2290#ifndef CLUCKD_OPTIMIZATIONS
2291 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an existing ticket name and key");
2292#endif
2293 f_messenger->send_message(lock_failed_message);
2294
2295 communicator::flag::pointer_t flag(COMMUNICATOR_FLAG_UP(
2296 "cluckd"
2297 , "ticket"
2298 , "invalid-algorithm"
2299 , "msg_add_ticket() received a second call to add the"
2300 " same ticket. This either means there is a bug in our"
2301 " algorithm or there is a hacker sending us messages"
2302 " trying to create invalid tickets."
2303 ));
2304 flag->set_priority(25);
2305 flag->add_tag("bug");
2306 flag->set_manual_down(true);
2307 flag->save();
2308
2309 return;
2310 }
2311 }
2312
2313 // the client_pid parameter is part of the key (3rd segment)
2314 //
2315 std::vector<std::string> segments;
2316 snapdev::tokenize_string(segments, key, "/");
2317 if(segments.size() != 3)
2318 {
2319 SNAP_LOG_ERROR
2320 << "Expected exactly 3 segments in \""
2321 << key
2322 << "\" to add a ticket."
2323 << SNAP_LOG_SEND;
2324
2325 ed::message lock_failed_message;
2326 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2327 lock_failed_message.reply_to(msg);
2328 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2329 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2330 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2331 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2332#ifndef CLUCKD_OPTIMIZATIONS
2333 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (expected three segments)");
2334#endif
2335 f_messenger->send_message(lock_failed_message);
2336
2337 return;
2338 }
2339
2340 // TODO: we probably want to look at using a function which returns false
2341 // instead of having to do a try/catch
2342 //
2343 bool ok(true);
2344 std::uint32_t number(0);
2345 try
2346 {
2347 number = snapdev::hex_to_int<std::uint32_t>(segments[0]);
2348 }
2349 catch(snapdev::hexadecimal_string_exception const &)
2350 {
2351 ok = false;
2352 }
2353 catch(snapdev::hexadecimal_string_out_of_range const &)
2354 {
2355 ok = false;
2356 }
2357 if(!ok)
2358 {
2359 SNAP_LOG_ERROR
2360 << "somehow ticket number \""
2361 << segments[0]
2362 << "\" is not a valid hexadecimal number."
2363 << SNAP_LOG_SEND;
2364
2365 ed::message lock_failed_message;
2366 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2367 lock_failed_message.reply_to(msg);
2368 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2369 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2370 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2371 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2372#ifndef CLUCKD_OPTIMIZATIONS
2373 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET called with an invalid key (first segment is not a valid hexadecimal number)");
2374#endif
2375 f_messenger->send_message(lock_failed_message);
2376
2377 return;
2378 }
2379
2380 // by now all the leaders should already have
2381 // an entering ticket for that one ticket
2382 //
2383 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
2384 if(obj_entering_ticket == f_entering_tickets.end())
2385 {
2386 SNAP_LOG_ERROR
2387 << "Expected entering ticket object for \""
2388 << object_name
2389 << "\" not found when adding a ticket."
2390 << SNAP_LOG_SEND;
2391
2392 ed::message lock_failed_message;
2393 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2394 lock_failed_message.reply_to(msg);
2395 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2396 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2397 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2398 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2399#ifndef CLUCKD_OPTIMIZATIONS
2400 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find an entering ticket with the specified object_name");
2401#endif
2402 f_messenger->send_message(lock_failed_message);
2403
2404 return;
2405 }
2406
2407 // the key we need to search is not the new ticket key but the
2408 // entering key, build it from the segments
2409 //
2410 std::string const entering_key(segments[1] + '/' + segments[2]);
2411 auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2412 if(key_entering_ticket == obj_entering_ticket->second.end())
2413 {
2414 SNAP_LOG_ERROR
2415 << "Expected entering ticket key for \""
2416 << object_name
2417 << "\" not found when adding a ticket."
2418 << SNAP_LOG_SEND;
2419
2420 ed::message lock_failed_message;
2421 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2422 lock_failed_message.reply_to(msg);
2423 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2424 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2425 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2426 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2427#ifndef CLUCKD_OPTIMIZATIONS
2428 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "ADD_TICKET could not find the very entering ticket with the specified key");
2429#endif
2430 f_messenger->send_message(lock_failed_message);
2431
2432 return;
2433 }
2434
2435 // make it an official ticket now
2436 //
2437 // this should happen on all cluck daemon other than the one that
2438 // first received the LOCK message
2439 //
2440 set_ticket(object_name, key, key_entering_ticket->second);
2441
2442 // WARNING: the set_ticket_number() function has the same side
2443 // effects as the add_ticket() function without the
2444 // f_messenger->send_message() call
2445 //
2446 f_tickets[object_name][key]->set_ticket_number(number);
2447
2448 ed::message ticket_added_message;
2449 ticket_added_message.set_command(cluck::g_name_cluck_cmd_ticket_added);
2450 ticket_added_message.reply_to(msg);
2451 ticket_added_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2452 ticket_added_message.add_parameter(cluck::g_name_cluck_param_key, key);
2453 ticket_added_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2454 f_messenger->send_message(ticket_added_message);
2455}
2456
2457
2472void cluckd::msg_clock_stable(ed::message & msg)
2473{
2474 if(!f_stable_clock)
2475 {
2476 f_stable_clock = msg.get_parameter(communicator::g_name_communicator_param_clock_resolution)
2477 == communicator::g_name_communicator_value_verified;
2478
2480 }
2481}
2482
2483
2492void cluckd::msg_cluster_down(ed::message & msg)
2493{
2494 snapdev::NOT_USED(msg);
2495
2496 SNAP_LOG_INFO
2497 << "cluster is down, canceling existing locks and we have to"
2498 " refuse any further lock requests for a while."
2499 << SNAP_LOG_SEND;
2500
2501 // in this case, we cannot safely keep the leaders
2502 //
2503 f_leaders.clear();
2504
2505 // in case services listen to the NO_LOCK, let them know it's gone
2506 //
2508
2509 // we do not call the lock_gone() because the HANGUP will be sent
2510 // if required so we do not have to do that twice
2511}
2512
2513
2521void cluckd::msg_cluster_up(ed::message & msg)
2522{
2523 f_neighbors_count = msg.get_integer_parameter("neighbors_count");
2525
2527 std::string candidate_priority(f_opts.get_string("candidate-priority"));
2528 if(candidate_priority != "off")
2529 {
2530 priority = f_opts.get_long("candidate-priority"
2531 , 0
2534 }
2535
2536#ifdef _DEBUG
2537 // the READY message is expected to happen first and setup this parameter
2538 //
2539 if(f_my_ip_address.is_default())
2540 {
2541 throw cluck::logic_error("cluckd::msg_cluster_up(): somehow f_my_ip_address is still the default in msg_cluster_up()."); // LCOV_EXCL_LINE
2542 }
2543#endif
2544
2545 // add ourselves to the list of computers; mark us connected; get our ID
2546 //
2547 f_computers[f_server_name] = std::make_shared<computer>(f_server_name, priority, f_my_ip_address);
2548 f_computers[f_server_name]->set_start_time(f_start_time);
2549 f_computers[f_server_name]->set_connected(true);
2550 f_my_id = f_computers[f_server_name]->get_id();
2551
2552 SNAP_LOG_INFO
2553 << "cluster is up with "
2555 << " neighbors, attempt an election"
2556 " then check for leaders by sending a LOCK_STARTED message."
2557 << SNAP_LOG_SEND;
2558
2560 send_lock_started(nullptr);
2562}
2563
2564
2581void cluckd::msg_drop_ticket(ed::message & msg)
2582{
2583 std::string object_name;
2584 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2585 std::string key;
2586 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2587 {
2588 return; // LCOV_EXCL_LINE
2589 }
2590
2591 std::vector<std::string> segments;
2592 snapdev::tokenize_string(segments, key, "/");
2593
2594 // drop the regular ticket
2595 //
2596 // if we have only 2 segments, then there is no corresponding ticket
2597 // since tickets are added only once we have a ticket_id
2598 //
2599 std::string entering_key;
2600 if(segments.size() == 3)
2601 {
2602 auto obj_ticket(f_tickets.find(object_name));
2603 if(obj_ticket != f_tickets.end())
2604 {
2605 auto key_ticket(obj_ticket->second.find(key));
2606 if(key_ticket != obj_ticket->second.end())
2607 {
2608 obj_ticket->second.erase(key_ticket);
2609 }
2610
2611 if(obj_ticket->second.empty())
2612 {
2613 f_tickets.erase(obj_ticket);
2614 }
2615
2616 // one ticket was erased, another may be first now
2617 //
2618 activate_first_lock(object_name);
2619 }
2620
2621 // we received the ticket_id in the message, so
2622 // we have to regenerate the entering_key without
2623 // the ticket_id (which is the first element)
2624 //
2625 entering_key = segments[1] + '/' + segments[2];
2626 }
2627 else
2628 {
2629 // we received the entering_key in the message, use as is
2630 //
2631 entering_key = key;
2632 }
2633
2634 // drop the entering ticket
2635 //
2636 auto obj_entering_ticket(f_entering_tickets.find(object_name));
2637 if(obj_entering_ticket != f_entering_tickets.end())
2638 {
2639 auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2640 if(key_entering_ticket != obj_entering_ticket->second.end())
2641 {
2642 obj_entering_ticket->second.erase(key_entering_ticket);
2643 }
2644
2645 if(obj_entering_ticket->second.empty())
2646 {
2647 f_entering_tickets.erase(obj_entering_ticket);
2648 }
2649 }
2650
2651 // the list of tickets is not unlikely changed so we need to make
2652 // a call to cleanup to make sure the timer is reset appropriately
2653 //
2654 cleanup();
2655}
2656
2657
2671void cluckd::msg_get_max_ticket(ed::message & msg)
2672{
2673 std::string object_name;
2674 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2675 std::string key;
2676 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
2677 {
2678 return; // LCOV_EXCL_LINE
2679 }
2680
2681 // remove any f_tickets that timed out by now because these should
2682 // not be taken in account in the max. computation
2683 //
2684 cleanup();
2685
2686 ticket::ticket_id_t const last_ticket(get_last_ticket(object_name));
2687
2688 ed::message reply;
2689 reply.set_command(cluck::g_name_cluck_cmd_max_ticket);
2690 reply.reply_to(msg);
2691 reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2692 reply.add_parameter(cluck::g_name_cluck_param_key, key);
2693 reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
2694 reply.add_parameter(cluck::g_name_cluck_param_ticket_id, last_ticket);
2695 f_messenger->send_message(reply);
2696}
2697
2698
2707void cluckd::msg_list_tickets(ed::message & msg)
2708{
2709 ed::message list_message;
2710 list_message.set_command(cluck::g_name_cluck_cmd_ticket_list);
2711 list_message.reply_to(msg);
2712 list_message.add_parameter(cluck::g_name_cluck_param_list, ticket_list());
2713 f_messenger->send_message(list_message);
2714}
2715
2716
2753void cluckd::msg_lock(ed::message & msg)
2754{
2755 std::string object_name;
2756 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2757 pid_t client_pid(0);
2758 cluck::timeout_t timeout;
2759 if(!get_parameters(msg, &object_name, &tag, &client_pid, &timeout, nullptr, nullptr))
2760 {
2761 return;
2762 }
2763
2764 // do some cleanup as well
2765 //
2766 cleanup();
2767
2768 // if we are a leader, create an entering key
2769 //
2770 std::string const server_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2771 ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2772 : msg.get_sent_from_server());
2773
2774 std::string const service_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2775 ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2776 : msg.get_sent_from_service());
2777
2778 std::string const entering_key(server_name + '/' + std::to_string(client_pid));
2779
2780 if(timeout <= snapdev::now())
2781 {
2782 SNAP_LOG_WARNING
2783 << "lock on \""
2784 << object_name
2785 << "\" ("
2786 << tag
2787 << ")/ \""
2788 << client_pid
2789 << "\" timed out before we could start the locking process."
2790 << SNAP_LOG_SEND;
2791
2792 ed::message lock_failed_message;
2793 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2794 lock_failed_message.reply_to(msg);
2795 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2796 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2797 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2798 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
2799#ifndef CLUCKD_OPTIMIZATIONS
2800 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK timeout date is already in the past");
2801#endif
2802 f_messenger->send_message(lock_failed_message);
2803
2804 return;
2805 }
2806
2807 cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
2808 if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
2809 {
2810 // duration too small
2811 //
2812 SNAP_LOG_ERROR
2813 << duration
2814 << " is an invalid duration, the minimum accepted is "
2816 << '.'
2817 << SNAP_LOG_SEND;
2818
2819 ed::message lock_failed_message;
2820 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2821 lock_failed_message.reply_to(msg);
2822 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2823 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2824 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2825 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2826#ifndef CLUCKD_OPTIMIZATIONS
2827 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with a duration that is too small");
2828#endif
2829 f_messenger->send_message(lock_failed_message);
2830
2831 return;
2832 }
2833
2835 if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
2836 {
2837 unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
2838 if(unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
2839 {
2840 // invalid duration, minimum is cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT
2841 //
2842 SNAP_LOG_ERROR
2843 << unlock_duration
2844 << " is an invalid unlock duration, the minimum accepted is "
2846 << '.'
2847 << SNAP_LOG_SEND;
2848
2849 ed::message lock_failed_message;
2850 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2851 lock_failed_message.reply_to(msg);
2852 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2853 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2854 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2855 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2856#ifndef CLUCKD_OPTIMIZATIONS
2857 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with an unlock duration that is too small");
2858#endif
2859 f_messenger->send_message(lock_failed_message);
2860
2861 return;
2862 }
2863 }
2864
2865 if(!is_daemon_ready())
2866 {
2867 SNAP_LOG_TRACE
2868 << "caching LOCK message for \""
2869 << object_name
2870 << "\" ("
2871 << tag
2872 << ") as the cluck system is not yet considered ready."
2873 << SNAP_LOG_SEND;
2874
2875 f_message_cache.emplace_back(timeout, msg);
2876
2877 // make sure the cache gets cleaned up if the message times out
2878 //
2879 std::int64_t const timeout_date(f_timer->get_timeout_date());
2880 if(timeout_date == -1
2881 || cluck::timeout_t(timeout_date / 1'000'000, timeout_date % 1'000'000) > timeout)
2882 {
2883 f_timer->set_timeout_date(timeout);
2884 }
2885 return;
2886 }
2887
2888 if(is_leader() == nullptr)
2889 {
2890 // we are not a leader, we need to forward the message to one
2891 // of the leaders instead
2892 //
2894 return;
2895 }
2896
2897 // make sure this is a new ticket
2898 //
2899 auto entering_ticket(f_entering_tickets.find(object_name));
2900 if(entering_ticket != f_entering_tickets.end())
2901 {
2902 auto key_ticket(entering_ticket->second.find(entering_key));
2903 if(key_ticket != entering_ticket->second.end())
2904 {
2905 // if this is a re-LOCK, then it may be a legitimate duplicate
2906 // in which case we do not want to generate a LOCK_FAILED error
2907 //
2908 if(msg.has_parameter(cluck::g_name_cluck_param_serial))
2909 {
2910 ticket::serial_t const serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
2911 if(key_ticket->second->get_serial() == serial)
2912 {
2913 // legitimate double request from leaders
2914 // (this happens when a leader dies and we have to restart
2915 // a lock negotiation)
2916 //
2917 return;
2918 }
2919 }
2920
2921 // the object already exists... do not allow duplicates
2922 //
2923 SNAP_LOG_ERROR
2924 << "an entering ticket has the same object name \""
2925 << object_name
2926 << "\" ("
2927 << tag
2928 << ") and entering key \""
2929 << entering_key
2930 << "\"."
2931 << SNAP_LOG_SEND;
2932
2933 ed::message lock_failed_message;
2934 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2935 lock_failed_message.reply_to(msg);
2936 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2937 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2938 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2939 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2940#ifndef CLUCKD_OPTIMIZATIONS
2941 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same entering ticket object_name and entering_key");
2942#endif
2943 f_messenger->send_message(lock_failed_message);
2944
2945 return;
2946 }
2947 if(entering_ticket->second.size() >= cluck::CLUCK_MAXIMUM_ENTERING_LOCKS)
2948 {
2949 // this is a failure in the algorithm (unfortunately), if you
2950 // send LOCK commands without much pause, the number of entering
2951 // ticket can grow forever; the following is a way to avoid
2952 // that situation by preventing such inconsiderate growth.
2953 //
2954 SNAP_LOG_ERROR
2955 << "too many entering tickets for object name \""
2956 << object_name
2957 << "\"."
2958 << SNAP_LOG_SEND;
2959
2960 ed::message lock_failed_message;
2961 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2962 lock_failed_message.reply_to(msg);
2963 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2964 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2965 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2966 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_overflow);
2967#ifndef CLUCKD_OPTIMIZATIONS
2968 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called too quickly that the number of entering tickets overflowed");
2969#endif
2970 f_messenger->send_message(lock_failed_message);
2971
2972 return;
2973 }
2974 }
2975
2976 // make sure there is not a ticket with the same name already defined
2977 //
2978 // (this is is really important so we can actually properly UNLOCK an
2979 // existing lock since we use the same search and if two entries were
2980 // to be the same we could not know which to unlock; there are a few
2981 // other places where such a search is used actually...)
2982 //
2983 auto obj_ticket(f_tickets.find(object_name));
2984 if(obj_ticket != f_tickets.end())
2985 {
2986 auto key_ticket(std::find_if(
2987 obj_ticket->second.begin()
2988 , obj_ticket->second.end()
2989 , [&entering_key](auto const & t)
2990 {
2991 return t.second->get_entering_key() == entering_key;
2992 }));
2993 if(key_ticket != obj_ticket->second.end())
2994 {
2995 // there is already a ticket with this object name/entering key
2996 //
2997 SNAP_LOG_ERROR
2998 << "a ticket has the same object name \""
2999 << object_name
3000 << "\" ("
3001 << tag
3002 << ") and entering key \""
3003 << entering_key
3004 << "\"."
3005 << SNAP_LOG_SEND;
3006
3007 ed::message lock_failed_message;
3008 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3009 lock_failed_message.reply_to(msg);
3010 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3011 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3012 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
3013 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
3014#ifndef CLUCKD_OPTIMIZATIONS
3015 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK called with the same ticket object_name and entering_key");
3016#endif
3017 f_messenger->send_message(lock_failed_message);
3018
3019 return;
3020 }
3021 }
3022
3023 ticket::pointer_t ticket(std::make_shared<ticket>(
3024 this
3025 , f_messenger
3026 , object_name
3027 , tag
3028 , entering_key
3029 , timeout
3030 , duration
3031 , server_name
3032 , service_name));
3033
3034 f_entering_tickets[object_name][entering_key] = ticket;
3035
3036 // finish up ticket initialization
3037 //
3038 ticket->set_unlock_duration(unlock_duration);
3039
3040 // generate a serial number for that ticket
3041 //
3042 f_ticket_serial = (f_ticket_serial + 1) & 0x00FFFFFF; // 0 is a valid serial number (-1 is not)
3043 if(f_leaders[0]->get_id() != f_my_id)
3044 {
3045 if(f_leaders.size() >= 2
3046 && f_leaders[1]->get_id() != f_my_id)
3047 {
3048 f_ticket_serial |= 1 << 24;
3049 }
3050 else if(f_leaders.size() >= 3
3051 && f_leaders[2]->get_id() != f_my_id)
3052 {
3053 f_ticket_serial |= 2 << 24;
3054 }
3055 }
3057
3058 if(msg.has_parameter(cluck::g_name_cluck_param_serial))
3059 {
3060 // if we have a "serial" number in that message, we lost a leader
3061 // and when that happens we are not unlikely to have lost the
3062 // client that requested the LOCK, send an ALIVE message to make
3063 // sure that the client still exists before entering the ticket
3064 //
3065 // TODO: we may want to make this 5s a parameter that we can change
3066 //
3067 ticket->set_alive_timeout(snapdev::now() + cluck::timeout_t(5, 0));
3068
3069 ed::message alive_message;
3070 alive_message.set_command(ed::g_name_ed_cmd_alive);
3071 alive_message.set_server(server_name);
3072 alive_message.set_service(service_name);
3073 alive_message.add_parameter(ed::g_name_ed_param_serial, "cluckd/" + object_name + '/' + entering_key);
3074 alive_message.add_parameter(ed::g_name_ed_param_timestamp, snapdev::now());
3075 f_messenger->send_message(alive_message);
3076 }
3077 else
3078 {
3079 // act on the new ticket
3080 //
3081 ticket->entering();
3082 }
3083
3084 // the list of tickets changed, make sure we update the timeout timer
3085 //
3086 cleanup();
3087}
3088
3089
3098void cluckd::msg_lock_activated(ed::message & msg)
3099{
3100 std::string object_name;
3101 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3102 std::string key;
3103 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3104 {
3105 return; // LCOV_EXCL_LINE
3106 }
3107
3108 std::string const & other_key(msg.get_parameter(cluck::g_name_cluck_param_other_key));
3109 if(other_key == key)
3110 {
3111 auto obj_ticket(f_tickets.find(object_name));
3112 if(obj_ticket != f_tickets.end())
3113 {
3114 auto key_ticket(obj_ticket->second.find(key));
3115 if(key_ticket != obj_ticket->second.end())
3116 {
3117 // that key is still here!
3118 // time to activate
3119 //
3120 key_ticket->second->lock_activated();
3121 }
3122 }
3123 }
3124}
3125
3126
3135void cluckd::msg_lock_entered(ed::message & msg)
3136{
3137 std::string object_name;
3138 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3139 std::string key;
3140 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3141 {
3142 return; // LCOV_EXCL_LINE
3143 }
3144
3145 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3146 if(obj_entering_ticket != f_entering_tickets.end())
3147 {
3148 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3149 if(key_entering_ticket != obj_entering_ticket->second.end())
3150 {
3151 key_entering_ticket->second->entered();
3152 }
3153 }
3154}
3155
3156
3176void cluckd::msg_lock_entering(ed::message & msg)
3177{
3178 std::string object_name;
3179 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3180 cluck::timeout_t timeout;
3181 std::string key;
3182 std::string source;
3183 if(!get_parameters(msg, &object_name, &tag, nullptr, &timeout, &key, &source))
3184 {
3185 return; // LCOV_EXCL_LINE
3186 }
3187
3188 // lock still in the future?
3189 //
3190 if(timeout <= snapdev::now())
3191 {
3192 SNAP_LOG_DEBUG
3193 << "received LOCK_ENTERING for \""
3194 << object_name
3195 << "\" that already timed out."
3196 << SNAP_LOG_SEND;
3197 return;
3198 }
3199
3200 // do we have enough leaders?
3201 //
3202 if(!is_daemon_ready())
3203 {
3204 SNAP_LOG_DEBUG
3205 << "received LOCK_ENTERING while we are thinking we are not ready."
3206 << SNAP_LOG_SEND;
3207 return;
3208 }
3209
3210 // the entering is just a flag (i.e. entering[i] = true)
3211 // in our case the existance of a ticket is enough to know
3212 // that we entered
3213 //
3214 bool allocate(true);
3215 auto const obj_ticket(f_entering_tickets.find(object_name));
3216 if(obj_ticket != f_entering_tickets.end())
3217 {
3218 auto const key_ticket(obj_ticket->second.find(key));
3219 allocate = key_ticket == obj_ticket->second.end();
3220 }
3221 if(allocate)
3222 {
3223 // ticket does not exist, so create it now
3224 // (note: ticket should only exist on originator)
3225 //
3226 cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
3227 if(duration < cluck::CLUCK_MINIMUM_TIMEOUT)
3228 {
3229 // invalid duration
3230 //
3231 SNAP_LOG_ERROR
3232 << duration
3233 << " is an invalid duration, the minimum accepted is "
3235 << "."
3236 << SNAP_LOG_SEND;
3237
3238 ed::message lock_failed_message;
3239 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3240 lock_failed_message.reply_to(msg);
3241 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3242 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3243 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3244 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3245#ifndef CLUCKD_OPTIMIZATIONS
3246 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with a duration which is too small");
3247#endif
3248 f_messenger->send_message(lock_failed_message);
3249
3250 return;
3251 }
3252
3254 if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
3255 {
3256 unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
3257 if(unlock_duration != cluck::CLUCK_DEFAULT_TIMEOUT
3258 && unlock_duration < cluck::CLUCK_UNLOCK_MINIMUM_TIMEOUT)
3259 {
3260 // invalid duration, minimum is 60
3261 //
3262 SNAP_LOG_ERROR
3263 << unlock_duration
3264 << " is an invalid unlock duration, the minimum accepted is "
3266 << "."
3267 << SNAP_LOG_SEND;
3268
3269 ed::message lock_failed_message;
3270 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3271 lock_failed_message.reply_to(msg);
3272 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3273 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3274 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3275 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3276#ifndef CLUCKD_OPTIMIZATIONS
3277 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an unlock duration which is too small or set to the default value");
3278#endif
3279 f_messenger->send_message(lock_failed_message);
3280
3281 return;
3282 }
3283 }
3284
3285 // we have to know where this message comes from
3286 //
3287 std::vector<std::string> source_segments;
3288 if(snapdev::tokenize_string(source_segments, source, "/") != 2)
3289 {
3290 SNAP_LOG_ERROR
3291 << "Invalid number of parameters in source parameter \""
3292 << source
3293 << "\" (found "
3294 << source_segments.size()
3295 << ", expected 2)."
3296 << SNAP_LOG_SEND;
3297
3298 ed::message lock_failed_message;
3299 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3300 lock_failed_message.reply_to(msg);
3301 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3302 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3303 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3304 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3305#ifndef CLUCKD_OPTIMIZATIONS
3306 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description, "LOCK_ENTERING called with an invalid source parameter");
3307#endif
3308 f_messenger->send_message(lock_failed_message);
3309
3310 return;
3311 }
3312
3313 ticket::pointer_t ticket(std::make_shared<ticket>(
3314 this
3315 , f_messenger
3316 , object_name
3317 , tag
3318 , key
3319 , timeout
3320 , duration
3321 , source_segments[0]
3322 , source_segments[1]));
3323
3324 f_entering_tickets[object_name][key] = ticket;
3325
3326 // finish up on ticket initialization
3327 //
3328 ticket->set_owner(msg.get_sent_from_server());
3329 ticket->set_unlock_duration(unlock_duration);
3330 ticket->set_serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
3331 }
3332
3333 ed::message reply;
3334 reply.set_command(cluck::g_name_cluck_cmd_lock_entered);
3335 reply.reply_to(msg);
3336 reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3337 reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
3338 reply.add_parameter(cluck::g_name_cluck_param_key, key);
3339 f_messenger->send_message(reply);
3340
3341 cleanup();
3342}
3343
3344
3353void cluckd::msg_lock_exiting(ed::message & msg)
3354{
3355 std::string object_name;
3356 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3357 std::string key;
3358 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3359 {
3360 return; // LCOV_EXCL_LINE
3361 }
3362
3363 // when exiting we just remove the entry with that key
3364 //
3365 auto const obj_entering(f_entering_tickets.find(object_name));
3366 if(obj_entering != f_entering_tickets.end())
3367 {
3368 auto const key_entering(obj_entering->second.find(key));
3369 if(key_entering != obj_entering->second.end())
3370 {
3371 obj_entering->second.erase(key_entering);
3372
3373 // we also want to remove it from the ticket f_entering
3374 // map if it is there (older ones are there!)
3375 //
3376 bool run_activation(false);
3377 auto const obj_ticket(f_tickets.find(object_name));
3378 if(obj_ticket != f_tickets.end())
3379 {
3380 for(auto const & key_ticket : obj_ticket->second)
3381 {
3382 key_ticket.second->remove_entering(key);
3383 run_activation = true;
3384 }
3385 }
3386 if(run_activation)
3387 {
3388 // hmm... looking at the code closer, it seems that this is
3389 // still very much necessary
3390 //
3391 // --------------------------------------------------
3392 //
3393 // try to activate the lock right now since it could
3394 // very well be the only ticket and that is exactly
3395 // when it is viewed as active!
3396 //
3397 // Note: this is from my old version, if I am correct
3398 // it cannot happen anymore because (1) this is
3399 // not the owner so the activation would not
3400 // take anyway and (2) the ticket is not going
3401 // to be marked as being ready at this point
3402 // (that happens later)
3403 //
3404 // XXX we probably should remove this statement
3405 // and the run_activation flag which would
3406 // then be useless
3407 //
3408 activate_first_lock(object_name);
3409 }
3410
3411 if(obj_entering->second.empty())
3412 {
3413 f_entering_tickets.erase(obj_entering);
3414 }
3415 }
3416 else
3417 {
3418 SNAP_LOG_WARNING
3419 << "entering lock \""
3420 << object_name
3421 << "\" with key \""
3422 << key
3423 << "\" in LOCK_EXITING specified lock not found."
3424 << SNAP_LOG_SEND;
3425 }
3426 }
3427 else
3428 {
3429 SNAP_LOG_WARNING
3430 << "LOCK_EXITING specified lock \""
3431 << object_name
3432 << "\" not found."
3433 << SNAP_LOG_SEND;
3434 }
3435
3436 // the list of tickets is not unlikely changed so we need to make
3437 // a call to cleanup to make sure the timer is reset appropriately
3438 //
3439 cleanup();
3440}
3441
3442
3468void cluckd::msg_lock_failed(ed::message & msg)
3469{
3470 std::string object_name;
3471 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3472 std::string key;
3473 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3474 {
3475 return; // LCOV_EXCL_LINE
3476 }
3477
3478 std::string errmsg("get LOCK_FAILED: ");
3479 errmsg += msg.get_parameter("error");
3480
3481 std::string forward_server;
3482 std::string forward_service;
3483
3484 // remove f_entering_tickets entries if we find matches there
3485 //
3486 auto obj_entering(f_entering_tickets.find(object_name));
3487 if(obj_entering != f_entering_tickets.end())
3488 {
3489 auto key_entering(obj_entering->second.find(key));
3490 if(key_entering != obj_entering->second.end())
3491 {
3492 forward_server = key_entering->second->get_server_name();
3493 forward_service = key_entering->second->get_service_name();
3494
3495 obj_entering->second.erase(key_entering);
3496
3497 errmsg += " -- happened while entering";
3498 }
3499
3500 if(obj_entering->second.empty())
3501 {
3502 f_entering_tickets.erase(obj_entering);
3503 }
3504 }
3505
3506 // remove any f_tickets entries if we find matches there
3507 //
3508 auto obj_ticket(f_tickets.find(object_name));
3509 if(obj_ticket != f_tickets.end())
3510 {
3511 bool try_activate(false);
3512 auto key_ticket(obj_ticket->second.find(key));
3513 if(key_ticket == obj_ticket->second.end())
3514 {
3515 key_ticket = std::find_if(
3516 obj_ticket->second.begin()
3517 , obj_ticket->second.end()
3518 , [&key](auto const & t)
3519 {
3520 return t.second->get_entering_key() == key;
3521 });
3522 }
3523 if(key_ticket != obj_ticket->second.end())
3524 {
3525 // Note: if we already found it in the f_entering_tickets then
3526 // the server and service names are going to be exactly
3527 // the same so there is no need to test that here
3528 //
3529 forward_server = key_ticket->second->get_server_name();
3530 forward_service = key_ticket->second->get_service_name();
3531
3532 obj_ticket->second.erase(key_ticket);
3533 try_activate = true;
3534
3535 errmsg += " -- happened when locked"; // TBD: are we really always locked in this case?
3536 }
3537
3538 if(obj_ticket->second.empty())
3539 {
3540 f_tickets.erase(obj_ticket);
3541 }
3542 else if(try_activate)
3543 {
3544 // something was erased, a new ticket may be first
3545 //
3546 activate_first_lock(obj_ticket->first);
3547 }
3548 }
3549
3550 if(!forward_server.empty()
3551 && !forward_service.empty())
3552 {
3553 // we deleted an entry, forward the message to the service
3554 // that requested that lock
3555 //
3556 msg.set_server(forward_server);
3557 msg.set_service(forward_service);
3558 f_messenger->send_message(msg);
3559
3560 errmsg += " -> ";
3561 errmsg += forward_server;
3562 errmsg += ":";
3563 errmsg += forward_service;
3564 }
3565
3566 SNAP_LOG_IMPORTANT
3567 << errmsg
3568 << '.'
3569 << SNAP_LOG_SEND;
3570
3571 // the list of tickets is not unlikely changed so we need to make
3572 // a call to cleanup to make sure the timer is reset appropriately
3573 //
3574 cleanup();
3575}
3576
3577
3584void cluckd::msg_lock_leaders(ed::message & msg)
3585{
3586 f_election_date = msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date);
3587
3588 // save the new leaders in our own list
3589 //
3590 f_leaders.clear();
3591 for(int idx(0); idx < 3; ++idx)
3592 {
3593 std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3594 if(msg.has_parameter(param_name))
3595 {
3596 computer::pointer_t leader(std::make_shared<computer>());
3597 std::string const lockid(msg.get_parameter(param_name));
3598 if(leader->set_id(lockid))
3599 {
3600 computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3601 if(exists != f_computers.end())
3602 {
3603 // it already exists, use our existing instance
3604 //
3605 f_leaders.push_back(exists->second);
3606 }
3607 else
3608 {
3609 // we do not yet know of that computer, even though
3610 // it is a leader! (i.e. we are not yet aware that
3611 // somehow we are connected to it)
3612 //
3613 leader->set_connected(false);
3614 f_computers[leader->get_name()] = leader;
3615
3616 f_leaders.push_back(leader);
3617 }
3618 }
3619 }
3620 }
3621
3622 if(!f_leaders.empty())
3623 {
3625
3626 // set the round-robin position to a random value
3627 //
3628 // note: I know the result is likely skewed, c will be set to
3629 // a number between 0 and 255 and modulo 3 means that you get
3630 // one extra zero (255 % 3 == 0); however, there are 85 times
3631 // 3 in 255 so it probably won't be noticeable.
3632 //
3633 std::uint8_t c;
3634 RAND_bytes(reinterpret_cast<unsigned char *>(&c), sizeof(c));
3635 f_next_leader = c % f_leaders.size();
3636 }
3637
3638 // the is_daemon_ready() function depends on having f_leaders defined
3639 // and when that happens we may need to empty our cache
3640 //
3642}
3643
3644
3655void cluckd::msg_lock_started(ed::message & msg)
3656{
3657 // I do not think we would ever message ourselves, but in case it happens
3658 // the rest of the function does not support that case
3659 //
3660 std::string const server_name(msg.get_parameter(communicator::g_name_communicator_param_server_name));
3661 if(server_name == f_server_name)
3662 {
3663 return;
3664 }
3665
3666 cluck::timeout_t const start_time(msg.get_timespec_parameter(cluck::g_name_cluck_param_start_time));
3667
3668 computer::map_t::iterator it(f_computers.find(server_name));
3669 bool new_computer(it == f_computers.end());
3670 if(new_computer)
3671 {
3672 // create a computer instance so we know it exists
3673 //
3674 computer::pointer_t computer(std::make_shared<computer>());
3675
3676 // fill the fields from the "lock_id" parameter
3677 //
3678 if(!computer->set_id(msg.get_parameter(cluck::g_name_cluck_param_lock_id)))
3679 {
3680 // this is not a valid identifier, ignore altogether
3681 //
3682 return;
3683 }
3684 computer->set_start_time(start_time);
3685
3686#ifdef _DEBUG
3687 // LCOV_EXCL_START
3688 if(computer->get_name() != server_name)
3689 {
3690 throw cluck::logic_error("cluckd::msg_lock_started(): server_name ("
3691 + server_name
3692 + ") does not match the new computer name ("
3693 + computer->get_name()
3694 + ").");
3695 }
3696 // LCOV_EXCL_STOP
3697#endif
3699 }
3700 else
3701 {
3702 if(!it->second->get_connected())
3703 {
3704 // we heard of this computer (because it is/was a leader) but
3705 // we had not yet received a LOCK_STARTED message from it; so here
3706 // we consider it a new computer and will reply to the LOCK_STARTED
3707 //
3708 new_computer = true;
3709 it->second->set_connected(true);
3710 }
3711
3712 if(it->second->get_start_time() != start_time)
3713 {
3714 // when the start time changes that means cluckd
3715 // restarted which can happen without communicatord
3716 // restarting so here we would not know about the feat
3717 // without this parameter and in this case it is very
3718 // much the same as a new computer so send it a
3719 // LOCK_STARTED message back
3720 //
3721 new_computer = true;
3722 it->second->set_start_time(start_time);
3723 }
3724 }
3725
3726 // keep the newest election results
3727 //
3728 computer::pointer_t old_leader(is_leader());
3729 if(msg.has_parameter(cluck::g_name_cluck_param_election_date))
3730 {
3731 snapdev::timespec_ex const election_date(msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date));
3732 if(election_date > f_election_date)
3733 {
3734 f_election_date = election_date;
3735 f_leaders.clear();
3736 }
3737 }
3738
3739 if(f_leaders.empty())
3740 {
3741 for(int idx(0); idx < 3; ++idx)
3742 {
3743 std::string const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3744 if(msg.has_parameter(param_name))
3745 {
3746 computer::pointer_t leader(std::make_shared<computer>());
3747 std::string const lockid(msg.get_parameter(param_name));
3748 if(leader->set_id(lockid))
3749 {
3750 computer::map_t::iterator exists(f_computers.find(leader->get_name()));
3751 if(exists != f_computers.end())
3752 {
3753 // it already exists, use our existing instance
3754 //
3755 f_leaders.push_back(exists->second);
3756 }
3757 else
3758 {
3759 // we do not yet know of that computer, even though
3760 // it is a leader! (i.e. we are not yet aware that
3761 // somehow we are connected to it)
3762 //
3763 leader->set_connected(false);
3764 f_computers[leader->get_name()] = leader;
3765
3766 f_leaders.push_back(leader);
3767 }
3768 }
3769 }
3770 }
3771 }
3772
3774
3775 // this can have an effect on the lock statuses
3776 //
3778
3779 if(new_computer
3780 || old_leader != is_leader())
3781 {
3782 // send a reply if that was a new computer
3783 //
3784 send_lock_started(&msg);
3785 }
3786}
3787
3788
3801void cluckd::msg_lock_status(ed::message & msg)
3802{
3803 ed::message status_message;
3804 status_message.set_command(is_daemon_ready()
3805 ? cluck::g_name_cluck_cmd_lock_ready
3806 : cluck::g_name_cluck_cmd_no_lock);
3807
3808 status_message.reply_to(msg);
3809 status_message.add_parameter(communicator::g_name_communicator_param_cache, communicator::g_name_communicator_value_no);
3810 f_messenger->send_message(status_message);
3811}
3812
3813
3831void cluckd::msg_lock_tickets(ed::message & msg)
3832{
3833 std::string const tickets(msg.get_parameter(cluck::g_name_cluck_param_tickets));
3834
3835 // we have one ticket per line, so we first split per line and then
3836 // work on lines one at a time
3837 //
3838 bool added_tickets(false);
3839 std::list<std::string> lines;
3840 snapdev::tokenize_string(lines, tickets, "\n", true);
3841 for(auto const & l : lines)
3842 {
3844 std::list<std::string> vars;
3845 snapdev::tokenize_string(vars, tickets, "|", true);
3846 auto object_name_value(std::find_if(
3847 vars.begin()
3848 , vars.end()
3849 , [](std::string const & vv)
3850 {
3851 return vv.starts_with("object_name=");
3852 }));
3853 if(object_name_value != vars.end())
3854 {
3855 auto entering_key_value(std::find_if(
3856 vars.begin()
3857 , vars.end()
3858 , [](std::string const & vv)
3859 {
3860 return vv.starts_with("entering_key=");
3861 }));
3862 if(entering_key_value != vars.end())
3863 {
3864 // extract the values which start after the '=' sign
3865 //
3866 std::string const object_name(object_name_value->substr(12));
3867 std::string const entering_key(entering_key_value->substr(13));
3868
3869 auto entering_ticket(f_entering_tickets.find(object_name));
3870 if(entering_ticket != f_entering_tickets.end())
3871 {
3872 auto key_ticket(entering_ticket->second.find(entering_key));
3873 if(key_ticket != entering_ticket->second.end())
3874 {
3875 t = key_ticket->second;
3876 }
3877 }
3878 if(t == nullptr)
3879 {
3880 auto obj_ticket(f_tickets.find(object_name));
3881 if(obj_ticket != f_tickets.end())
3882 {
3883 auto key_ticket(std::find_if(
3884 obj_ticket->second.begin()
3885 , obj_ticket->second.end()
3886 , [&entering_key](auto const & o)
3887 {
3888 return o.second->get_entering_key() == entering_key;
3889 }));
3890 if(key_ticket != obj_ticket->second.end())
3891 {
3892 t = key_ticket->second;
3893 }
3894 }
3895 }
3896
3897 // ticket exists? if not create a new one
3898 //
3899 bool const new_ticket(t == nullptr);
3900 if(new_ticket)
3901 {
3902 // create a new ticket, some of the parameters are there just
3903 // because they are required; they will be replaced by the
3904 // unserialize call below...
3905 //
3906 t = std::make_shared<ticket>(
3907 this
3908 , f_messenger
3909 , object_name
3910 , ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG
3911 , entering_key
3912 , cluck::CLUCK_DEFAULT_TIMEOUT + snapdev::now()
3915 , cluck::g_name_cluck_service_name);
3916 }
3917
3918 t->unserialize(l);
3919 added_tickets = true;
3920
3921 // do a couple of additional sanity tests to
3922 // make sure that we want to keep new tickets
3923 //
3924 // first make sure it is marked as "locked"
3925 //
3926 // second check that the owner is a leader that
3927 // exists (the sender uses a LOCK message for
3928 // locks that are not yet locked or require
3929 // a new owner)
3930 //
3931 if(new_ticket
3932 && t->is_locked())
3933 {
3934 auto li(std::find_if(
3935 f_leaders.begin()
3936 , f_leaders.end()
3937 , [&t](auto const & c)
3938 {
3939 return t->get_owner() == c->get_name();
3940 }));
3941 if(li != f_leaders.end())
3942 {
3943 f_tickets[object_name][t->get_ticket_key()] = t;
3944 }
3945 }
3946 }
3947 }
3948 }
3949
3950 // if we updated some tickets, we need to make sure our timer is setup
3951 // appropriately
3952 //
3953 if(added_tickets)
3954 {
3955 cleanup();
3956 }
3957}
3958
3959
3978void cluckd::msg_max_ticket(ed::message & msg)
3979{
3980 std::string object_name;
3981 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3982 std::string key;
3983 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
3984 {
3985 return; // LCOV_EXCL_LINE
3986 }
3987
3988 // the MAX_TICKET is an answer that has to go in a still un-added ticket
3989 //
3990 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3991 if(obj_entering_ticket != f_entering_tickets.end())
3992 {
3993 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3994 if(key_entering_ticket != obj_entering_ticket->second.end())
3995 {
3996 key_entering_ticket->second->max_ticket(msg.get_integer_parameter(cluck::g_name_cluck_param_ticket_id));
3997 }
3998 }
3999}
4000
4001
4021void cluckd::msg_server_gone(ed::message & msg)
4022{
4023 // if the server is not defined, ignore that message
4024 // (already tested by the dispatcher)
4025 //
4026 if(!msg.has_parameter(communicator::g_name_communicator_param_server_name))
4027 {
4028 return; // LCOV_EXCL_LINE
4029 }
4030
4031 // is it us?
4032 //
4033 std::string const server_name(msg.get_parameter(communicator::g_name_communicator_param_server_name));
4034 if(server_name.empty()
4035 || server_name == f_server_name)
4036 {
4037 // we never want to remove ourselves
4038 //
4039 return;
4040 }
4041
4042 // is "server_name" known?
4043 //
4044 auto it(f_computers.find(server_name));
4045 if(it == f_computers.end())
4046 {
4047 // no computer found, nothing else to do here
4048 //
4049 return;
4050 }
4051 computer::pointer_t c(it->second);
4052
4053 // got it, remove it
4054 //
4055 f_computers.erase(it);
4056SNAP_LOG_WARNING << "removed \"" << server_name << "\"" << SNAP_LOG_SEND;
4057
4058 // is that computer a leader?
4059 //
4060 auto li(std::find(
4061 f_leaders.begin()
4062 , f_leaders.end()
4063 , c));
4064 if(li != f_leaders.end())
4065 {
4066 f_leaders.erase(li);
4067
4068 if(f_messenger != nullptr)
4069 {
4070 // elect another computer in case the one we just erased was a leader
4071 //
4072 // (of course, no elections occur unless we are the computer with the
4073 // smallest IP address)
4074 //
4076
4077 // if too many leaders were dropped, we may go back to the NO_LOCK status
4078 //
4079 // we only send a NO_LOCK if the election could not re-assign another
4080 // computer to replace the missing leader(s)
4081 //
4083 }
4084 }
4085}
4086
4087
4097void cluckd::msg_status(ed::message & msg)
4098{
4099 // check the service name, it has to be one that means it is a remote
4100 // connection with another communicator daemon
4101 //
4102 std::string const service(msg.get_parameter(communicator::g_name_communicator_param_service));
4103
4104 if(service.starts_with(communicator::g_name_communicator_connection_remote_communicator_in) // remote host connected to us
4105 || service.starts_with(communicator::g_name_communicator_connection_remote_communicator_out)) // we connected to remote host
4106 {
4107 // check what the status is now: "up" or "down"
4108 //
4109 std::string const status(msg.get_parameter(communicator::g_name_communicator_param_status));
4110 if(status == communicator::g_name_communicator_value_up)
4111 {
4112 // we already broadcast a LOCK_STARTED from CLUSTER_UP
4113 // and that's enough
4114 //
4115 ;
4116 }
4117 else
4118 {
4119 // host is down, remove from our list of hosts
4120 //
4121 msg_server_gone(msg);
4122 }
4123 }
4124}
4125
4126
4134void cluckd::msg_ticket_added(ed::message & msg)
4135{
4136 std::string object_name;
4137 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4138 std::string key;
4139 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4140 {
4141 return; // LCOV_EXCL_LINE
4142 }
4143
4144 auto const obj_ticket(f_tickets.find(object_name));
4145 if(obj_ticket != f_tickets.end())
4146 {
4147 auto const key_ticket(obj_ticket->second.find(key));
4148 if(key_ticket != obj_ticket->second.end())
4149 {
4150 // this ticket exists on this system
4151 //
4152 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
4153 if(obj_entering_ticket == f_entering_tickets.end())
4154 {
4155 // this happens all the time because the entering ticket
4156 // gets removed on the first TICKET_ADDED we receive so
4157 // on the second one we get here...
4158 //
4159 SNAP_LOG_TRACE
4160 << "called with object \""
4161 << object_name
4162 << "\" not present in f_entering_ticket (key: \""
4163 << key
4164 << "\")."
4165 << SNAP_LOG_SEND;
4166 return;
4167 }
4168 key_ticket->second->ticket_added(obj_entering_ticket->second);
4169 }
4170 else
4171 {
4172 SNAP_LOG_WARNING
4173 << "found object \""
4174 << object_name
4175 << "\" but could not find a corresponding ticket with key \""
4176 << key
4177 << "\"..."
4178 << SNAP_LOG_SEND;
4179 }
4180 }
4181 else
4182 {
4183 SNAP_LOG_WARNING
4184 << "object \""
4185 << object_name
4186 << "\" not found."
4187 << SNAP_LOG_SEND;
4188 }
4189}
4190
4191
4199void cluckd::msg_ticket_ready(ed::message & msg)
4200{
4201 std::string object_name;
4202 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4203 std::string key;
4204 if(!get_parameters(msg, &object_name, &tag, nullptr, nullptr, &key, nullptr))
4205 {
4206 return; // LCOV_EXCL_LINE
4207 }
4208
4209 auto obj_ticket(f_tickets.find(object_name));
4210 if(obj_ticket != f_tickets.end())
4211 {
4212 auto key_ticket(obj_ticket->second.find(key));
4213 if(key_ticket != obj_ticket->second.end())
4214 {
4215 // we can mark this ticket as activated
4216 //
4217 key_ticket->second->set_ready();
4218 }
4219 }
4220}
4221
4222
4231void cluckd::msg_unlock(ed::message & msg)
4232{
4233 if(!is_daemon_ready())
4234 {
4235 SNAP_LOG_ERROR
4236 << "received an UNLOCK when cluckd is not ready to receive lock related messages."
4237 << SNAP_LOG_SEND;
4238 return;
4239 }
4240
4241 if(is_leader() == nullptr)
4242 {
4243 // we are not a leader, we need to forward to a leader to handle
4244 // the message properly
4245 //
4247 return;
4248 }
4249
4250 std::string object_name;
4251 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4252 pid_t client_pid(0);
4253 if(!get_parameters(msg, &object_name, &tag, &client_pid, nullptr, nullptr, nullptr))
4254 {
4255 return;
4256 }
4257
4258 // if the ticket still exists, send the UNLOCKED and then erase it
4259 //
4260 auto obj_ticket(f_tickets.find(object_name));
4261 if(obj_ticket != f_tickets.end())
4262 {
4263 std::string const server_name(msg.has_parameter("lock_proxy_server_name")
4264 ? msg.get_parameter("lock_proxy_server_name")
4265 : msg.get_sent_from_server());
4266
4267 //std::string const service_name(msg.has_parameter("lock_proxy_service_name")
4268 // ? msg.get_parameter("lock_proxy_service_name")
4269 // : msg.get_sent_from_service());
4270
4271 std::string const entering_key(server_name + '/' + std::to_string(client_pid));
4272 auto key_ticket(std::find_if(
4273 obj_ticket->second.begin()
4274 , obj_ticket->second.end()
4275 , [&entering_key](auto const & t)
4276 {
4277 return t.second->get_entering_key() == entering_key;
4278 }));
4279 if(key_ticket != obj_ticket->second.end())
4280 {
4281 // this function will send a DROPTICKET to the other leaders
4282 // and the UNLOCKED to the source (unless we already sent the
4283 // UNLOCKED which gets sent at most once.)
4284 //
4285 key_ticket->second->drop_ticket();
4286
4287 obj_ticket->second.erase(key_ticket);
4288 if(obj_ticket->second.empty())
4289 {
4290 // we are done with this one!
4291 //
4292 f_tickets.erase(obj_ticket);
4293 }
4294
4295 // TBD: the clean up calls this function if it removes a lock
4296 // that timed out but not otherwise; maybe we could consider
4297 // doing it slightly differently where we pass a flag to
4298 // the clean up function to force the call either way
4299 //
4300 activate_first_lock(object_name);
4301 }
4302 else
4303 {
4304 SNAP_LOG_MAJOR
4305 << "UNLOCK could not find key \""
4306 << entering_key
4307 << "\" in object \""
4308 << object_name
4309 << "\"."
4310 << SNAP_LOG_SEND;
4311 }
4312 }
4313 else
4314 {
4315 SNAP_LOG_WARNING
4316 << "UNLOCK could not find object \""
4317 << object_name
4318 << "\"."
4319 << SNAP_LOG_SEND;
4320 }
4321
4322 // reset the timeout with the other locks
4323 //
4324 cleanup();
4325}
4326
4327
4328
4329} // namespace cluck_daemon
4330// vim: ts=4 sw=4 et
Class handling intercomputer locking.
Definition cluckd.h:48
void msg_get_max_ticket(ed::message &msg)
Search for the largest ticket.
Definition cluckd.cpp:2671
bool is_daemon_ready() const
Check whether the cluck daemon is ready to process lock requests.
Definition cluckd.cpp:478
timer::pointer_t f_timer
Definition cluckd.h:131
void add_connections()
Finish the cluck daemon initialization.
Definition cluckd.cpp:381
void msg_lock_status(ed::message &msg)
A service asked about the lock status.
Definition cluckd.cpp:3801
void msg_lock_entered(ed::message &msg)
Tell the specified ticket LOCK_ENTERED was received.
Definition cluckd.cpp:3135
std::string const & get_server_name() const
Get the name of the server we are running on.
Definition cluckd.cpp:461
time_t f_pace_lockstarted
Definition cluckd.h:146
void run()
Run the cluck daemon.
Definition cluckd.cpp:421
void cleanup()
Clean timed out entries if any.
Definition cluckd.cpp:1688
void lock_exiting(ed::message &msg)
Used to simulate a LOCK_EXITING message.
Definition cluckd.cpp:1960
computer::pointer_t get_leader_b() const
Get pointer to leader B.
Definition cluckd.cpp:748
void msg_max_ticket(ed::message &msg)
Got the largest ticket from another leader.
Definition cluckd.cpp:3978
void msg_add_ticket(ed::message &msg)
Add a ticket from another cluckd.
Definition cluckd.cpp:2254
ed::communicator::pointer_t f_communicator
Definition cluckd.h:128
void msg_ticket_ready(ed::message &msg)
Let other leaders know that the ticket is ready.
Definition cluckd.cpp:4199
void msg_cluster_down(ed::message &msg)
The communicatord lost too many connections.
Definition cluckd.cpp:2492
ticket::ticket_id_t get_last_ticket(std::string const &lock_name)
Determine the last ticket defined in this cluck daemon.
Definition cluckd.cpp:1871
void msg_absolutely(ed::message &msg)
Lock the resource after confirmation that client is alive.
Definition cluckd.cpp:2109
ticket::object_map_t f_entering_tickets
Definition cluckd.h:142
std::string ticket_list() const
Generate the output for "cluck-status --list".
Definition cluckd.cpp:917
void msg_activate_lock(ed::message &msg)
Acknowledge the ACTIVATE_LOCK with what we think is our first lock.
Definition cluckd.cpp:2193
std::size_t f_neighbors_count
Definition cluckd.h:132
snapdev::timespec_ex f_election_date
Definition cluckd.h:144
void set_my_ip_address(addr::addr const &a)
Definition cluckd.cpp:407
addr::addr f_my_ip_address
Definition cluckd.h:135
void msg_unlock(ed::message &msg)
Unlock the resource.
Definition cluckd.cpp:4231
computer::map_t f_computers
Definition cluckd.h:138
void msg_info(ed::message &msg)
Return a JSON with the state of this cluckd object.
Definition cluckd.cpp:790
ticket::object_map_t f_tickets
Definition cluckd.h:143
void msg_lock_started(ed::message &msg)
Called whenever a cluck computer is acknowledging itself.
Definition cluckd.cpp:3655
void stop(bool quitting)
Called whenever we receive the STOP command or equivalent.
Definition cluckd.cpp:1284
std::string f_server_name
Definition cluckd.h:127
void synchronize_leaders()
Synchronize leaders.
Definition cluckd.cpp:1419
void msg_lock_leaders(ed::message &msg)
The list of leaders.
Definition cluckd.cpp:3584
void election_status()
Check the status of the election.
Definition cluckd.cpp:972
void msg_lock_activated(ed::message &msg)
Acknowledgement of the lock to activate.
Definition cluckd.cpp:3098
void forward_message_to_leader(ed::message &message)
Forward a user message to a leader.
Definition cluckd.cpp:1648
virtual ~cluckd()
Do some clean ups.
Definition cluckd.cpp:366
ticket::serial_t f_ticket_serial
Definition cluckd.h:145
computer::vector_t f_leaders
Definition cluckd.h:139
std::string serialized_tickets()
Definition cluckd.cpp:1970
void send_lock_started(ed::message const *msg)
Definition cluckd.cpp:1225
void msg_ticket_added(ed::message &msg)
Acknowledgement that the ticket was properly added.
Definition cluckd.cpp:4134
int get_computer_count() const
Return the number of known computers running cluckd.
Definition cluckd.cpp:445
messenger::pointer_t f_messenger
Definition cluckd.h:129
void msg_lock_entering(ed::message &msg)
Create an entering ticket.
Definition cluckd.cpp:3176
ticket::key_map_t const get_entering_tickets(std::string const &lock_name)
Get a reference to the list of entering tickets.
Definition cluckd.cpp:1930
void msg_list_tickets(ed::message &msg)
Reply to the LIST_TICKETS message with the TICKET_LIST.
Definition cluckd.cpp:2707
std::string f_my_id
Definition cluckd.h:134
void msg_lock_exiting(ed::message &msg)
Exit a ticket.
Definition cluckd.cpp:3353
void msg_server_gone(ed::message &msg)
Called whenever a remote connection is disconnected.
Definition cluckd.cpp:4021
void msg_lock(ed::message &msg)
Lock the named resource.
Definition cluckd.cpp:2753
void msg_clock_stable(ed::message &msg)
Message telling us whether the clock is stable.
Definition cluckd.cpp:2472
computer::pointer_t is_leader(std::string id=std::string()) const
Search for a leader.
Definition cluckd.cpp:658
cluck::timeout_t f_start_time
Definition cluckd.h:126
void activate_first_lock(std::string const &object_name)
Make sure the very first ticket is marked as LOCKED.
Definition cluckd.cpp:1328
cluckd(int argc, char *argv[])
Initializes a cluckd object.
Definition cluckd.cpp:323
advgetopt::getopt f_opts
Definition cluckd.h:124
void set_ticket(std::string const &object_name, std::string const &key, ticket::pointer_t ticket)
Set the ticket.
Definition cluckd.cpp:1910
void msg_cluster_up(ed::message &msg)
Cluster is ready, send the LOCK_STARTED message.
Definition cluckd.cpp:2521
void msg_drop_ticket(ed::message &msg)
One of the cluckd processes asked for a ticket to be dropped.
Definition cluckd.cpp:2581
bool get_parameters(ed::message const &message, std::string *object_name, ed::dispatcher_match::tag_t *tag, pid_t *client_pid, cluck::timeout_t *timeout, std::string *key, std::string *source)
Try to get a set of parameters.
Definition cluckd.cpp:2017
void msg_lock_failed(ed::message &msg)
Acknowledge a lock failure.
Definition cluckd.cpp:3468
interrupt::pointer_t f_interrupt
Definition cluckd.h:130
void msg_lock_tickets(ed::message &msg)
Another cluckd is sending us its list of tickets.
Definition cluckd.cpp:3831
void msg_status(ed::message &msg)
With the STATUS message we know of new communicatord services.
Definition cluckd.cpp:4097
computer::pointer_t get_leader_a() const
Get pointer to leader A.
Definition cluckd.cpp:701
std::size_t f_neighbors_quorum
Definition cluckd.h:133
message_cache::list_t f_message_cache
Definition cluckd.h:141
ticket::pointer_t find_first_lock(std::string const &lock_name)
Definition cluckd.cpp:1343
void set_start_time(snapdev::timespec_ex const &start_time)
Definition computer.cpp:277
void set_connected(bool connected)
Definition computer.cpp:125
std::string const & get_name() const
Definition computer.cpp:289
static priority_t const PRIORITY_USER_MIN
Definition computer.h:56
std::map< std::string, pointer_t > map_t
Definition computer.h:50
static priority_t const PRIORITY_MAX
Definition computer.h:59
static priority_t const PRIORITY_OFF
Definition computer.h:58
std::int8_t priority_t
Definition computer.h:47
bool set_id(std::string const &id)
Initialize this computer object from id.
Definition computer.cpp:153
std::shared_ptr< computer > pointer_t
Definition computer.h:49
Handle the ticket messages.
Definition ticket.h:43
void set_owner(std::string const &owner)
Define whether this ticket is the owner of that lock.
Definition ticket.cpp:1073
void set_serial(serial_t owner)
Give the lock a serial number for some form of unicity.
Definition ticket.cpp:1147
void entering()
Enter the mode that lets us retrieve our ticket number.
Definition ticket.cpp:471
void set_alive_timeout(cluck::timeout_t timeout)
Define a time when the ticket times out while waiting.
Definition ticket.cpp:1353
static ticket_id_t const NO_TICKET
Definition ticket.h:53
void set_unlock_duration(cluck::timeout_t duration)
Change the unlock duration to the specified value.
Definition ticket.cpp:1188
void activate_lock()
Check whether this ticket can be activated and do so if so.
Definition ticket.cpp:750
std::uint32_t ticket_id_t
Definition ticket.h:50
void lock_activated()
Check whether this ticket can be activated and do so if so.
Definition ticket.cpp:779
std::string const & get_ticket_key() const
Retrieve a reference to the ticket key.
Definition ticket.cpp:1561
std::shared_ptr< ticket > pointer_t
Definition ticket.h:45
std::int32_t serial_t
Definition ticket.h:49
std::map< std::string, pointer_t > key_map_t
Definition ticket.h:47
Daemon handling inter-computer locking.
constexpr std::string_view g_default_candidate_priority
Definition cluckd.cpp:144
constexpr char const *const g_configuration_files[]
Definition cluckd.cpp:188
advgetopt::group_description const g_group_descriptions[]
Definition cluckd.cpp:172
advgetopt::options_environment const g_options_environment
Definition cluckd.cpp:195
timeout_t get_lock_obtention_timeout()
Definition cluck.cpp:136
timeout_t CLUCK_DEFAULT_TIMEOUT
Definition cluck.h:82
snapdev::timespec_ex timeout_t
A timeout delay.
Definition cluck.h:80
timeout_t CLUCK_UNLOCK_MINIMUM_TIMEOUT
Definition cluck.h:89
timeout_t CLUCK_MINIMUM_TIMEOUT
Definition cluck.h:83
std::size_t CLUCK_MAXIMUM_ENTERING_LOCKS
Definition cluck.h:92
std::list< message_cache > list_t
Version header.
#define CLUCK_VERSION_STRING
Definition version.h:27

This document is part of the Snap! Websites Project.

Copyright by Made to Order Software Corp.