29#include <cluck/names.h>
35#include <communicator/flags.h>
36#include <communicator/names.h>
41#include <as2js/json.h>
46#include <eventdispatcher/names.h>
51#include <snapdev/gethostname.h>
52#include <snapdev/hexadecimal_string.h>
53#include <snapdev/stringize.h>
54#include <snapdev/tokenize_string.h>
55#include <snapdev/to_string_literal.h>
60#include <snaplogger/logger.h>
61#include <snaplogger/options.h>
62#include <snaplogger/severity.h>
67#include <advgetopt/advgetopt.h>
68#include <advgetopt/exception.h>
80#include <openssl/rand.h>
85#include <snapdev/poison.h>
145 snapdev::integer_to_string_literal<computer::PRIORITY_DEFAULT>.data();
150 advgetopt::define_option(
151 advgetopt::Name(
"candidate-priority")
152 , advgetopt::ShortName(
'p')
153 , advgetopt::Flags(advgetopt::all_flags<
154 advgetopt::GETOPT_FLAG_REQUIRED
155 , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
156 , advgetopt::Help(
"Define the priority of this candidate (1 to 14) to gain a leader position or \"off\".")
159 advgetopt::define_option(
160 advgetopt::Name(
"server-name")
161 , advgetopt::ShortName(
'n')
162 , advgetopt::Flags(advgetopt::all_flags<
163 advgetopt::GETOPT_FLAG_DYNAMIC_CONFIGURATION
164 , advgetopt::GETOPT_FLAG_REQUIRED
165 , advgetopt::GETOPT_FLAG_GROUP_OPTIONS>())
166 , advgetopt::Help(
"Set the name of this server instance.")
168 advgetopt::end_options()
174 advgetopt::define_group(
175 advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_COMMANDS)
176 , advgetopt::GroupName(
"command")
177 , advgetopt::GroupDescription(
"Commands:")
179 advgetopt::define_group(
180 advgetopt::GroupNumber(advgetopt::GETOPT_FLAG_GROUP_OPTIONS)
181 , advgetopt::GroupName(
"option")
182 , advgetopt::GroupDescription(
"Options:")
184 advgetopt::end_groups()
190 "/etc/cluck/cluckd.conf",
197 .f_project_name =
"cluckd",
198 .f_group_name =
"cluck",
200 .f_environment_variable_name =
"CLUCKD_OPTIONS",
202 .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_SYSTEM_PARAMETERS
203 | advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
204 .f_help_header =
"Usage: %p [-<opt>]\n"
205 "where -<opt> is one or more of:",
206 .f_help_footer =
"%c",
208 .f_license =
"GNU GPL v3",
209 .f_copyright =
"Copyright (c) 2013-"
210 SNAPDEV_STRINGIZE(UTC_BUILD_YEAR)
211 " by Made to Order Software Corporation -- All Rights Reserved",
324 : f_opts(g_options_environment)
326 snaplogger::add_logger_options(
f_opts);
334 f_opts.finish_parsing(argc, argv);
335 if(!snaplogger::process_logger_options(
f_opts,
"/etc/cluck/logger"))
337 throw advgetopt::getopt_exit(
"logger options generated an error.", 1);
348 if(
f_opts.is_defined(
"server-name"))
392 f_timer = std::make_shared<timer>(
this);
424 <<
"--------------------------------- cluckd started."
500 <<
"not considered ready: no leaders."
522 <<
"not considered ready: not enough leaders for this cluster."
543 <<
"not considered ready: quorum changed, re-election expected soon (number of neighbors: "
545 <<
", neighbors quorum: "
547 <<
", number of computers: "
561 <<
"not considered ready: quorum lost, re-election expected soon."
568 std::size_t ready(0);
572 if(l->get_connected())
585 time_t
const now(time(
nullptr));
594 ed::message temporary_message;
595 temporary_message.set_sent_from_server(l->get_name());
596 temporary_message.set_sent_from_service(cluck::g_name_cluck_service_name);
616 <<
"not considered ready: no direct connection with leader: \""
617 << last_leader->get_name()
665 auto const l(std::find_if(
668 , [
id](
auto const & c){
669 return c->get_id() == id;
706 throw cluck::logic_error(
"cluckd::get_leader_a(): only a leader can call this function.");
714 throw cluck::logic_error(
"cluckd::get_leader_a(): call this function only when leaders were elected.");
753 throw cluck::logic_error(
"cluckd::get_leader_b(): only a leader can call this function.");
761 throw cluck::unexpected_case(
"cluckd::get_leader_b(): call this function only when leaders were elected.");
792 std::stringstream ss;
795 p.set_filename(
"cluckd.cpp");
796 p.set_function(
"msg_info");
798 as2js::json::json_value::object_t obj;
799 as2js::json::json_value::pointer_t result(std::make_shared<as2js::json::json_value>(p, obj));
802 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
is_daemon_ready()));
803 result->set_member(
"daemon_ready", value);
808 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
f_my_id));
809 result->set_member(
"id", value);
814 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
817 :
f_my_ip_address.to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
818 result->set_member(
"ip", value);
822 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(
f_neighbors_count)));
823 result->set_member(
"neighbors_count", value);
827 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(
f_neighbors_quorum)));
828 result->set_member(
"neighbors_quorum", value);
832 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(
f_leaders.size())));
833 result->set_member(
"leaders_count", value);
838 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(
f_message_cache.size())));
839 result->set_member(
"cache_size", value);
843 as2js::json::json_value::array_t computers;
844 as2js::json::json_value::pointer_t list(std::make_shared<as2js::json::json_value>(p, computers));
848 as2js::json::json_value::object_t
computer;
849 as2js::json::json_value::pointer_t item(std::make_shared<as2js::json::json_value>(p,
computer));
852 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_name()));
853 item->set_member(
"name", value);
857 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_id()));
858 item->set_member(
"id", value);
862 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_ip_address().to_ipv4or6_string(addr::STRING_IP_ADDRESS | addr::STRING_IP_BRACKET_ADDRESS)));
863 item->set_member(
"ip", value);
867 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, c.second->get_connected()));
868 item->set_member(
"connected", value);
872 auto const it(std::find_if(
875 , [&c](
auto const & l)
877 return c.second == l;
882 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p, as2js::integer(it -
f_leaders.begin())));
883 item->set_member(
"leader", value);
887 list->set_item(list->get_array().size(), item);
889 result->set_member(
"computers", list);
892 if(msg.has_parameter(cluck::g_name_cluck_param_mode)
893 && msg.get_parameter(cluck::g_name_cluck_param_mode) == cluck::g_name_cluck_value_debug)
897 as2js::json::json_value::pointer_t value(std::make_shared<as2js::json::json_value>(p,
serialized_tickets()));
898 result->set_member(
"tickets", value);
901 ed::message reply_message;
902 reply_message.set_command(cluck::g_name_cluck_cmd_cluckd_status);
903 reply_message.reply_to(msg);
904 reply_message.add_parameter(communicator::g_name_communicator_param_status, result->to_string());
919 std::stringstream list;
922 for(
auto const & key_ticket : obj_ticket.second)
926 << key_ticket.second->get_ticket_number()
927 <<
" object name: \""
928 << key_ticket.second->get_object_name()
930 << key_ticket.second->get_entering_key();
932 cluck::timeout_t const lock_timeout(key_ticket.second->get_lock_timeout_date());
937 << lock_timeout.to_string();
941 cluck::timeout_t const obtention_timeout(key_ticket.second->get_obtention_timeout());
944 << obtention_timeout.to_string();
1065 sort_by_id[c.second->get_id()] = c.second;
1078 bool too_many_computers_off(
false);
1085 <<
"you cannot have any cluck computer turned OFF when you"
1086 " have three or less computers total in your cluster."
1087 " The elections cannot be completed in these"
1090 too_many_computers_off =
true;
1097 <<
"you have a total of "
1099 <<
" computers in your cluster. You turned off "
1101 <<
" of them, which means less than three are left"
1102 " as candidates for leadership which is not enough."
1103 " You can have a maximum of "
1105 <<
" that are turned off on this cluster."
1107 too_many_computers_off =
true;
1109 if(too_many_computers_off)
1119 communicator::flag::pointer_t flag(COMMUNICATOR_FLAG_UP(
1123 ,
"the cluck daemon detected that too many of the"
1124 " daemons have their priority set to OFF;"
1125 " you must turn some of these back ON."
1127 flag->set_priority(99);
1128 flag->add_tag(
"settings");
1129 flag->set_manual_down(
true);
1138 if(sort_by_id.size() < 2
1152 ed::message lock_leaders_message;
1153 lock_leaders_message.set_command(cluck::g_name_cluck_cmd_lock_leaders);
1154 lock_leaders_message.set_service(communicator::g_name_communicator_server_any);
1157 lock_leaders_message.add_parameter(cluck::g_name_cluck_param_election_date,
f_election_date);
1158 auto leader(sort_by_id.begin());
1159 std::size_t
const max(std::min(
static_cast<computer::map_t::size_type
>(3), sort_by_id.size()));
1160 for(std::size_t idx(0); idx < max; ++idx, ++leader)
1162 lock_leaders_message.add_parameter(
1163 cluck::g_name_cluck_param_leader + std::to_string(idx)
1164 , leader->second->get_id());
1171<<
"election status = add leader(s)... "
1196 ed::message status_message;
1198 ? cluck::g_name_cluck_cmd_lock_ready
1199 : cluck::g_name_cluck_cmd_no_lock);
1200 status_message.set_service(communicator::g_name_communicator_server_me);
1201 status_message.add_parameter(communicator::g_name_communicator_param_cache, communicator::g_name_communicator_value_no);
1217 for(
auto & mc : cache)
1231 ed::message lock_started_message;
1232 lock_started_message.set_command(cluck::g_name_cluck_cmd_lock_started);
1235 lock_started_message.set_service(communicator::g_name_communicator_service_public_broadcast);
1246 lock_started_message.reply_to(*msg);
1251 lock_started_message.add_parameter(communicator::g_name_communicator_param_server_name,
f_server_name);
1252 lock_started_message.add_parameter(cluck::g_name_cluck_param_lock_id,
f_my_id);
1253 lock_started_message.add_parameter(cluck::g_name_cluck_param_start_time,
f_start_time);
1259 lock_started_message.add_parameter(cluck::g_name_cluck_param_election_date,
f_election_date);
1260 for(
size_t idx(0); idx <
f_leaders.size(); ++idx)
1262 lock_started_message.add_parameter(
1263 cluck::g_name_cluck_param_leader + std::to_string(idx)
1346 auto const obj_ticket(
f_tickets.find(object_name));
1360 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1362 if(key_ticket->second->timed_out())
1367 key_ticket->second->lock_failed(
"timed out while searching for first lock");
1368 if(key_ticket->second->timed_out())
1372 key_ticket = obj_ticket->second.erase(key_ticket);
1377 if(first_ticket ==
nullptr)
1379 first_ticket = key_ticket->second;
1385 if(obj_ticket->second.empty())
1393 return first_ticket;
1452 ed::message::vector_t local_locks;
1465 for(
auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1467 std::string
const owner_name(key_entering->second->get_owner());
1468 auto key_leader(std::find_if(
1471 , [&owner_name](
auto const & l)
1473 return l->get_name() == owner_name;
1479 ed::message lock_message;
1480 lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1481 lock_message.set_server(
f_leaders[0]->get_name());
1482 lock_message.set_service(cluck::g_name_cluck_service_name);
1483 lock_message.set_sent_from_server(key_entering->second->get_server_name());
1484 lock_message.set_sent_from_service(key_entering->second->get_service_name());
1485 lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_entering->second->get_object_name());
1486 lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_entering->second->get_tag());
1487 lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_entering->second->get_client_pid());
1488 lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_entering->second->get_obtention_timeout());
1489 lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_entering->second->get_lock_duration());
1490 lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_entering->second->get_unlock_duration());
1498 key_entering = obj_entering->second.erase(key_entering);
1499 local_locks.push_back(lock_message);
1506 lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_entering->second->get_serial());
1523 std::string serialized;
1526 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1528 std::string
const owner_name(key_ticket->second->get_owner());
1529 auto key_leader(std::find_if(
1532 , [&owner_name](
auto const & l)
1534 return l->get_name() == owner_name;
1536 if(key_ticket->second->is_locked())
1543 key_ticket->second->set_owner(
f_leaders[0]->get_name());
1549 serialized += key_ticket->second->serialize();
1563 ed::message lock_message;
1564 lock_message.set_command(cluck::g_name_cluck_cmd_lock);
1565 lock_message.set_server(
f_leaders[0]->get_name());
1566 lock_message.set_service(cluck::g_name_cluck_service_name);
1567 lock_message.set_sent_from_server(key_ticket->second->get_server_name());
1568 lock_message.set_sent_from_service(key_ticket->second->get_service_name());
1569 lock_message.add_parameter(cluck::g_name_cluck_param_object_name, key_ticket->second->get_object_name());
1570 lock_message.add_parameter(cluck::g_name_cluck_param_tag, key_ticket->second->get_tag());
1571 lock_message.add_parameter(cluck::g_name_cluck_param_pid, key_ticket->second->get_client_pid());
1572 lock_message.add_parameter(cluck::g_name_cluck_param_timeout, key_ticket->second->get_obtention_timeout());
1573 lock_message.add_parameter(cluck::g_name_cluck_param_duration, key_ticket->second->get_lock_duration());
1574 lock_message.add_parameter(cluck::g_name_cluck_param_unlock_duration, key_ticket->second->get_unlock_duration());
1579 key_ticket = obj_ticket->second.erase(key_ticket);
1580 local_locks.push_back(lock_message);
1587 lock_message.add_parameter(cluck::g_name_cluck_param_serial, key_ticket->second->get_serial());
1604 for(
auto lm : local_locks)
1611 if(!serialized.empty())
1613 ed::message lock_tickets_message;
1614 lock_tickets_message.set_command(cluck::g_name_cluck_cmd_lock_tickets);
1615 lock_tickets_message.set_service(cluck::g_name_cluck_service_name);
1616 lock_tickets_message.add_parameter(cluck::g_name_cluck_param_tickets, serialized);
1621 lock_tickets_message.set_server(la->get_name());
1627 lock_tickets_message.set_server(lb->get_name());
1666 msg.set_service(cluck::g_name_cluck_service_name);
1667 msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_server_name, msg.get_sent_from_server());
1668 msg.add_parameter(cluck::g_name_cluck_param_lock_proxy_service_name, msg.get_sent_from_service());
1698 if(c->f_timeout <= now)
1700 std::string object_name;
1701 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
1702 pid_t client_pid(0);
1704 if(!
get_parameters(c->f_message, &object_name, &tag, &client_pid, &timeout,
nullptr,
nullptr))
1708 throw cluck::logic_error(
"cluck::cleanup() of LOCK message failed get_parameters().");
1718 <<
"\" timed out before leaders were known."
1721 std::string
const server_name(c->f_message.has_parameter(
"lock_proxy_server_name")
1722 ? c->f_message.get_parameter(
"lock_proxy_server_name")
1723 : c->f_message.get_sent_from_server());
1724 std::string
const service_name(c->f_message.has_parameter(
"lock_proxy_service_name")
1725 ? c->f_message.get_parameter(
"lock_proxy_service_name")
1726 : c->f_message.get_sent_from_service());
1727 std::string
const entering_key(server_name +
'/' + std::to_string(client_pid));
1729 ed::message lock_failed_message;
1730 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
1731 lock_failed_message.set_service(service_name);
1732 lock_failed_message.set_server(server_name);
1733 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
1734 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
1735 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
1736 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
1737#ifndef CLUCKD_OPTIMIZATIONS
1738 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"cleanup() found a timed out lock");
1746 if(c->f_timeout < next_timeout)
1748 next_timeout = c->f_timeout;
1758 bool try_activate(
false);
1759 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
1761 bool move_next(
true);
1762 if(key_ticket->second->timed_out())
1764 key_ticket->second->lock_failed(
"ticket: timed out while cleaning up");
1765 if(key_ticket->second->timed_out())
1769 key_ticket = obj_ticket->second.erase(key_ticket);
1770 try_activate =
true;
1776 if(key_ticket->second->get_current_timeout_date() < next_timeout)
1778 next_timeout = key_ticket->second->get_current_timeout_date();
1784 if(obj_ticket->second.empty())
1786 obj_ticket =
f_tickets.erase(obj_ticket);
1805 for(
auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
1807 if(key_entering->second->timed_out())
1809 key_entering->second->lock_failed(
"entering ticket: timed out while cleanup");
1810 if(key_entering->second->timed_out())
1814 key_entering = obj_entering->second.erase(key_entering);
1819 if(key_entering->second->get_current_timeout_date() < next_timeout)
1821 next_timeout = key_entering->second->get_current_timeout_date();
1827 if(obj_entering->second.empty())
1839 if(next_timeout != snapdev::timespec_ex::max())
1848 f_timer->set_timeout_date(-1);
1879 auto obj_ticket(
f_tickets.find(object_name));
1886 for(
auto key_ticket : obj_ticket->second)
1889 if(ticket_number > last_ticket)
1891 last_ticket = ticket_number;
1911 std::string
const & object_name
1912 , std::string
const & key
1941 throw std::logic_error(
1942 "could not find entering ticket with object name: \""
1972 std::stringstream result;
1974 for(
auto const & obj_ticket :
f_tickets)
1976 for(
auto const & key_ticket : obj_ticket.second)
1979 << key_ticket.second->serialize()
1984 return result.str();
2018 ed::message
const & msg
2019 , std::string * object_name
2020 , ed::dispatcher_match::tag_t * tag
2021 , pid_t * client_pid
2024 , std::string * source)
2029 if(object_name !=
nullptr)
2031 *object_name = msg.get_parameter(cluck::g_name_cluck_param_object_name);
2039 *tag = msg.get_integer_parameter(cluck::g_name_cluck_param_tag);
2046 if(client_pid !=
nullptr)
2048 *client_pid = msg.get_integer_parameter(cluck::g_name_cluck_param_pid);
2053 SNAP_LOG_NOISY_ERROR
2054 <<
"cluckd::get_parameters(): invalid pid specified for a lock ("
2055 << std::to_string(*client_pid)
2056 <<
"); it must be a positive decimal number."
2065 if(timeout !=
nullptr)
2067 if(msg.has_parameter(cluck::g_name_cluck_param_timeout))
2072 *timeout = msg.get_timespec_parameter(cluck::g_name_cluck_param_timeout);
2084 *key = msg.get_parameter(cluck::g_name_cluck_param_key);
2089 if(source !=
nullptr)
2091 *source = msg.get_parameter(cluck::g_name_cluck_param_source);
2114 if(!msg.has_parameter(ed::g_name_ed_param_serial))
2119 std::string
const serial(msg.get_parameter(ed::g_name_ed_param_serial));
2120 std::vector<std::string> segments;
2121 snapdev::tokenize_string(segments, serial,
"/");
2123 if(segments[0] ==
"cluckd")
2127 if(segments.size() != 4)
2130 <<
"ABSOLUTELY reply has an invalid cluckd serial parameters \""
2132 <<
"\" was expected to have exactly 4 segments."
2135 ed::message invalid;
2136 invalid.set_command(ed::g_name_ed_cmd_invalid);
2137 invalid.reply_to(msg);
2138 invalid.add_parameter(
2139 ed::g_name_ed_param_command
2140 , msg.get_command());
2141 invalid.add_parameter(
2142 ed::g_name_ed_param_message
2143 ,
"invalid number of segments in \""
2153 std::string
const object_name(segments[1]);
2154 std::string
const server_name(segments[2]);
2155 std::string
const client_pid(segments[3]);
2160 std::string
const entering_key(server_name +
'/' + client_pid);
2161 auto key_ticket(entering_ticket->second.find(entering_key));
2162 if(key_ticket != entering_ticket->second.end())
2170 key_ticket->second->entering();
2195 std::string object_name;
2196 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2198 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
2203 std::string first_key(
"no-key");
2212 if(key == first_key)
2223 ed::message lock_activated_message;
2224 lock_activated_message.set_command(cluck::g_name_cluck_cmd_lock_activated);
2225 lock_activated_message.reply_to(msg);
2226 lock_activated_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2227 lock_activated_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2228 lock_activated_message.add_parameter(cluck::g_name_cluck_param_key, key);
2229 lock_activated_message.add_parameter(cluck::g_name_cluck_param_other_key, first_key);
2230 f_messenger->send_message(lock_activated_message);
2256 std::string object_name;
2257 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2260 if(!
get_parameters(msg, &object_name, &tag,
nullptr, &timeout, &key,
nullptr))
2267 auto const obj_ticket(
f_tickets.find(object_name));
2270 auto const key_ticket(obj_ticket->second.find(key));
2271 if(key_ticket != obj_ticket->second.end())
2274 <<
"an existing ticket has the same object name \""
2283 ed::message lock_failed_message;
2284 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2285 lock_failed_message.reply_to(msg);
2286 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2287 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2288 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2289 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2290#ifndef CLUCKD_OPTIMIZATIONS
2291 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"ADD_TICKET called with an existing ticket name and key");
2295 communicator::flag::pointer_t flag(COMMUNICATOR_FLAG_UP(
2298 ,
"invalid-algorithm"
2299 ,
"msg_add_ticket() received a second call to add the"
2300 " same ticket. This either means there is a bug in our"
2301 " algorithm or there is a hacker sending us messages"
2302 " trying to create invalid tickets."
2304 flag->set_priority(25);
2305 flag->add_tag(
"bug");
2306 flag->set_manual_down(
true);
2315 std::vector<std::string> segments;
2316 snapdev::tokenize_string(segments, key,
"/");
2317 if(segments.size() != 3)
2320 <<
"Expected exactly 3 segments in \""
2322 <<
"\" to add a ticket."
2325 ed::message lock_failed_message;
2326 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2327 lock_failed_message.reply_to(msg);
2328 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2329 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2330 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2331 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2332#ifndef CLUCKD_OPTIMIZATIONS
2333 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"ADD_TICKET called with an invalid key (expected three segments)");
2344 std::uint32_t number(0);
2347 number = snapdev::hex_to_int<std::uint32_t>(segments[0]);
2349 catch(snapdev::hexadecimal_string_exception
const &)
2353 catch(snapdev::hexadecimal_string_out_of_range
const &)
2360 <<
"somehow ticket number \""
2362 <<
"\" is not a valid hexadecimal number."
2365 ed::message lock_failed_message;
2366 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2367 lock_failed_message.reply_to(msg);
2368 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2369 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2370 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2371 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2372#ifndef CLUCKD_OPTIMIZATIONS
2373 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"ADD_TICKET called with an invalid key (first segment is not a valid hexadecimal number)");
2387 <<
"Expected entering ticket object for \""
2389 <<
"\" not found when adding a ticket."
2392 ed::message lock_failed_message;
2393 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2394 lock_failed_message.reply_to(msg);
2395 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2396 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2397 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2398 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2399#ifndef CLUCKD_OPTIMIZATIONS
2400 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"ADD_TICKET could not find an entering ticket with the specified object_name");
2410 std::string
const entering_key(segments[1] +
'/' + segments[2]);
2411 auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2412 if(key_entering_ticket == obj_entering_ticket->second.end())
2415 <<
"Expected entering ticket key for \""
2417 <<
"\" not found when adding a ticket."
2420 ed::message lock_failed_message;
2421 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2422 lock_failed_message.reply_to(msg);
2423 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2424 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2425 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
2426 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2427#ifndef CLUCKD_OPTIMIZATIONS
2428 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"ADD_TICKET could not find the very entering ticket with the specified key");
2440 set_ticket(object_name, key, key_entering_ticket->second);
2446 f_tickets[object_name][key]->set_ticket_number(number);
2448 ed::message ticket_added_message;
2449 ticket_added_message.set_command(cluck::g_name_cluck_cmd_ticket_added);
2450 ticket_added_message.reply_to(msg);
2451 ticket_added_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2452 ticket_added_message.add_parameter(cluck::g_name_cluck_param_key, key);
2453 ticket_added_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2476 f_stable_clock = msg.get_parameter(communicator::g_name_communicator_param_clock_resolution)
2477 == communicator::g_name_communicator_value_verified;
2494 snapdev::NOT_USED(msg);
2497 <<
"cluster is down, canceling existing locks and we have to"
2498 " refuse any further lock requests for a while."
2527 std::string candidate_priority(
f_opts.get_string(
"candidate-priority"));
2528 if(candidate_priority !=
"off")
2530 priority =
f_opts.get_long(
"candidate-priority"
2541 throw cluck::logic_error(
"cluckd::msg_cluster_up(): somehow f_my_ip_address is still the default in msg_cluster_up().");
2553 <<
"cluster is up with "
2555 <<
" neighbors, attempt an election"
2556 " then check for leaders by sending a LOCK_STARTED message."
2583 std::string object_name;
2584 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2586 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
2591 std::vector<std::string> segments;
2592 snapdev::tokenize_string(segments, key,
"/");
2599 std::string entering_key;
2600 if(segments.size() == 3)
2602 auto obj_ticket(
f_tickets.find(object_name));
2605 auto key_ticket(obj_ticket->second.find(key));
2606 if(key_ticket != obj_ticket->second.end())
2608 obj_ticket->second.erase(key_ticket);
2611 if(obj_ticket->second.empty())
2625 entering_key = segments[1] +
'/' + segments[2];
2639 auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
2640 if(key_entering_ticket != obj_entering_ticket->second.end())
2642 obj_entering_ticket->second.erase(key_entering_ticket);
2645 if(obj_entering_ticket->second.empty())
2673 std::string object_name;
2674 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2676 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
2689 reply.set_command(cluck::g_name_cluck_cmd_max_ticket);
2690 reply.reply_to(msg);
2691 reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2692 reply.add_parameter(cluck::g_name_cluck_param_key, key);
2693 reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
2694 reply.add_parameter(cluck::g_name_cluck_param_ticket_id, last_ticket);
2709 ed::message list_message;
2710 list_message.set_command(cluck::g_name_cluck_cmd_ticket_list);
2711 list_message.reply_to(msg);
2712 list_message.add_parameter(cluck::g_name_cluck_param_list,
ticket_list());
2755 std::string object_name;
2756 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
2757 pid_t client_pid(0);
2759 if(!
get_parameters(msg, &object_name, &tag, &client_pid, &timeout,
nullptr,
nullptr))
2770 std::string
const server_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2771 ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_server_name)
2772 : msg.get_sent_from_server());
2774 std::string
const service_name(msg.has_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2775 ? msg.get_parameter(cluck::g_name_cluck_param_lock_proxy_service_name)
2776 : msg.get_sent_from_service());
2778 std::string
const entering_key(server_name +
'/' + std::to_string(client_pid));
2780 if(timeout <= snapdev::now())
2789 <<
"\" timed out before we could start the locking process."
2792 ed::message lock_failed_message;
2793 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2794 lock_failed_message.reply_to(msg);
2795 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2796 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2797 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2798 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_timedout);
2799#ifndef CLUCKD_OPTIMIZATIONS
2800 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK timeout date is already in the past");
2807 cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
2814 <<
" is an invalid duration, the minimum accepted is "
2819 ed::message lock_failed_message;
2820 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2821 lock_failed_message.reply_to(msg);
2822 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2823 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2824 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2825 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2826#ifndef CLUCKD_OPTIMIZATIONS
2827 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK called with a duration that is too small");
2835 if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
2837 unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
2844 <<
" is an invalid unlock duration, the minimum accepted is "
2849 ed::message lock_failed_message;
2850 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2851 lock_failed_message.reply_to(msg);
2852 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2853 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2854 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2855 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
2856#ifndef CLUCKD_OPTIMIZATIONS
2857 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK called with an unlock duration that is too small");
2868 <<
"caching LOCK message for \""
2872 <<
") as the cluck system is not yet considered ready."
2879 std::int64_t
const timeout_date(
f_timer->get_timeout_date());
2880 if(timeout_date == -1
2881 ||
cluck::timeout_t(timeout_date / 1'000'000, timeout_date % 1'000'000) > timeout)
2883 f_timer->set_timeout_date(timeout);
2902 auto key_ticket(entering_ticket->second.find(entering_key));
2903 if(key_ticket != entering_ticket->second.end())
2908 if(msg.has_parameter(cluck::g_name_cluck_param_serial))
2910 ticket::serial_t const serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
2911 if(key_ticket->second->get_serial() == serial)
2924 <<
"an entering ticket has the same object name \""
2928 <<
") and entering key \""
2933 ed::message lock_failed_message;
2934 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2935 lock_failed_message.reply_to(msg);
2936 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2937 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2938 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2939 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
2940#ifndef CLUCKD_OPTIMIZATIONS
2941 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK called with the same entering ticket object_name and entering_key");
2955 <<
"too many entering tickets for object name \""
2960 ed::message lock_failed_message;
2961 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
2962 lock_failed_message.reply_to(msg);
2963 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
2964 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
2965 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
2966 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_overflow);
2967#ifndef CLUCKD_OPTIMIZATIONS
2968 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK called too quickly that the number of entering tickets overflowed");
2983 auto obj_ticket(
f_tickets.find(object_name));
2986 auto key_ticket(std::find_if(
2987 obj_ticket->second.begin()
2988 , obj_ticket->second.end()
2989 , [&entering_key](
auto const & t)
2991 return t.second->get_entering_key() == entering_key;
2993 if(key_ticket != obj_ticket->second.end())
2998 <<
"a ticket has the same object name \""
3002 <<
") and entering key \""
3007 ed::message lock_failed_message;
3008 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3009 lock_failed_message.reply_to(msg);
3010 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3011 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3012 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, entering_key);
3013 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_duplicate);
3014#ifndef CLUCKD_OPTIMIZATIONS
3015 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK called with the same ticket object_name and entering_key");
3058 if(msg.has_parameter(cluck::g_name_cluck_param_serial))
3069 ed::message alive_message;
3070 alive_message.set_command(ed::g_name_ed_cmd_alive);
3071 alive_message.set_server(server_name);
3072 alive_message.set_service(service_name);
3073 alive_message.add_parameter(ed::g_name_ed_param_serial,
"cluckd/" + object_name +
'/' + entering_key);
3074 alive_message.add_parameter(ed::g_name_ed_param_timestamp, snapdev::now());
3100 std::string object_name;
3101 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3103 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
3108 std::string
const & other_key(msg.get_parameter(cluck::g_name_cluck_param_other_key));
3109 if(other_key == key)
3111 auto obj_ticket(
f_tickets.find(object_name));
3114 auto key_ticket(obj_ticket->second.find(key));
3115 if(key_ticket != obj_ticket->second.end())
3120 key_ticket->second->lock_activated();
3137 std::string object_name;
3138 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3140 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
3148 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3149 if(key_entering_ticket != obj_entering_ticket->second.end())
3151 key_entering_ticket->second->entered();
3178 std::string object_name;
3179 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3183 if(!
get_parameters(msg, &object_name, &tag,
nullptr, &timeout, &key, &source))
3190 if(timeout <= snapdev::now())
3193 <<
"received LOCK_ENTERING for \""
3195 <<
"\" that already timed out."
3205 <<
"received LOCK_ENTERING while we are thinking we are not ready."
3214 bool allocate(
true);
3218 auto const key_ticket(obj_ticket->second.find(key));
3219 allocate = key_ticket == obj_ticket->second.end();
3226 cluck::timeout_t const duration(msg.get_timespec_parameter(cluck::g_name_cluck_param_duration));
3233 <<
" is an invalid duration, the minimum accepted is "
3238 ed::message lock_failed_message;
3239 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3240 lock_failed_message.reply_to(msg);
3241 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3242 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3243 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3244 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3245#ifndef CLUCKD_OPTIMIZATIONS
3246 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK_ENTERING called with a duration which is too small");
3254 if(msg.has_parameter(cluck::g_name_cluck_param_unlock_duration))
3256 unlock_duration = msg.get_timespec_parameter(cluck::g_name_cluck_param_unlock_duration);
3264 <<
" is an invalid unlock duration, the minimum accepted is "
3269 ed::message lock_failed_message;
3270 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3271 lock_failed_message.reply_to(msg);
3272 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3273 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3274 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3275 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3276#ifndef CLUCKD_OPTIMIZATIONS
3277 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK_ENTERING called with an unlock duration which is too small or set to the default value");
3287 std::vector<std::string> source_segments;
3288 if(snapdev::tokenize_string(source_segments, source,
"/") != 2)
3291 <<
"Invalid number of parameters in source parameter \""
3294 << source_segments.size()
3298 ed::message lock_failed_message;
3299 lock_failed_message.set_command(cluck::g_name_cluck_cmd_lock_failed);
3300 lock_failed_message.reply_to(msg);
3301 lock_failed_message.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3302 lock_failed_message.add_parameter(cluck::g_name_cluck_param_tag, tag);
3303 lock_failed_message.add_parameter(cluck::g_name_cluck_param_key, key);
3304 lock_failed_message.add_parameter(cluck::g_name_cluck_param_error, cluck::g_name_cluck_value_invalid);
3305#ifndef CLUCKD_OPTIMIZATIONS
3306 lock_failed_message.add_parameter(cluck::g_name_cluck_param_description,
"LOCK_ENTERING called with an invalid source parameter");
3321 , source_segments[0]
3322 , source_segments[1]));
3330 ticket->
set_serial(msg.get_integer_parameter(cluck::g_name_cluck_param_serial));
3334 reply.set_command(cluck::g_name_cluck_cmd_lock_entered);
3335 reply.reply_to(msg);
3336 reply.add_parameter(cluck::g_name_cluck_param_object_name, object_name);
3337 reply.add_parameter(cluck::g_name_cluck_param_tag, tag);
3338 reply.add_parameter(cluck::g_name_cluck_param_key, key);
3355 std::string object_name;
3356 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3358 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
3368 auto const key_entering(obj_entering->second.find(key));
3369 if(key_entering != obj_entering->second.end())
3371 obj_entering->second.erase(key_entering);
3376 bool run_activation(
false);
3377 auto const obj_ticket(
f_tickets.find(object_name));
3380 for(
auto const & key_ticket : obj_ticket->second)
3382 key_ticket.second->remove_entering(key);
3383 run_activation =
true;
3411 if(obj_entering->second.empty())
3419 <<
"entering lock \""
3423 <<
"\" in LOCK_EXITING specified lock not found."
3430 <<
"LOCK_EXITING specified lock \""
3470 std::string object_name;
3471 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3473 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
3478 std::string errmsg(
"get LOCK_FAILED: ");
3479 errmsg += msg.get_parameter(
"error");
3481 std::string forward_server;
3482 std::string forward_service;
3489 auto key_entering(obj_entering->second.find(key));
3490 if(key_entering != obj_entering->second.end())
3492 forward_server = key_entering->second->get_server_name();
3493 forward_service = key_entering->second->get_service_name();
3495 obj_entering->second.erase(key_entering);
3497 errmsg +=
" -- happened while entering";
3500 if(obj_entering->second.empty())
3508 auto obj_ticket(
f_tickets.find(object_name));
3511 bool try_activate(
false);
3512 auto key_ticket(obj_ticket->second.find(key));
3513 if(key_ticket == obj_ticket->second.end())
3515 key_ticket = std::find_if(
3516 obj_ticket->second.begin()
3517 , obj_ticket->second.end()
3518 , [&key](
auto const & t)
3520 return t.second->get_entering_key() == key;
3523 if(key_ticket != obj_ticket->second.end())
3529 forward_server = key_ticket->second->get_server_name();
3530 forward_service = key_ticket->second->get_service_name();
3532 obj_ticket->second.erase(key_ticket);
3533 try_activate =
true;
3535 errmsg +=
" -- happened when locked";
3538 if(obj_ticket->second.empty())
3542 else if(try_activate)
3550 if(!forward_server.empty()
3551 && !forward_service.empty())
3556 msg.set_server(forward_server);
3557 msg.set_service(forward_service);
3561 errmsg += forward_server;
3563 errmsg += forward_service;
3586 f_election_date = msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date);
3591 for(
int idx(0); idx < 3; ++idx)
3593 std::string
const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3594 if(msg.has_parameter(param_name))
3597 std::string
const lockid(msg.get_parameter(param_name));
3598 if(leader->set_id(lockid))
3600 computer::map_t::iterator exists(
f_computers.find(leader->get_name()));
3613 leader->set_connected(
false);
3634 RAND_bytes(
reinterpret_cast<unsigned char *
>(&c),
sizeof(c));
3660 std::string
const server_name(msg.get_parameter(communicator::g_name_communicator_param_server_name));
3666 cluck::timeout_t const start_time(msg.get_timespec_parameter(cluck::g_name_cluck_param_start_time));
3668 computer::map_t::iterator it(
f_computers.find(server_name));
3678 if(!
computer->
set_id(msg.get_parameter(cluck::g_name_cluck_param_lock_id)))
3690 throw cluck::logic_error(
"cluckd::msg_lock_started(): server_name ("
3692 +
") does not match the new computer name ("
3702 if(!it->second->get_connected())
3708 new_computer =
true;
3712 if(it->second->get_start_time() != start_time)
3721 new_computer =
true;
3722 it->second->set_start_time(start_time);
3729 if(msg.has_parameter(cluck::g_name_cluck_param_election_date))
3731 snapdev::timespec_ex
const election_date(msg.get_timespec_parameter(cluck::g_name_cluck_param_election_date));
3741 for(
int idx(0); idx < 3; ++idx)
3743 std::string
const param_name(cluck::g_name_cluck_param_leader + std::to_string(idx));
3744 if(msg.has_parameter(param_name))
3747 std::string
const lockid(msg.get_parameter(param_name));
3748 if(leader->set_id(lockid))
3750 computer::map_t::iterator exists(
f_computers.find(leader->get_name()));
3763 leader->set_connected(
false);
3803 ed::message status_message;
3805 ? cluck::g_name_cluck_cmd_lock_ready
3806 : cluck::g_name_cluck_cmd_no_lock);
3808 status_message.reply_to(msg);
3809 status_message.add_parameter(communicator::g_name_communicator_param_cache, communicator::g_name_communicator_value_no);
3833 std::string
const tickets(msg.get_parameter(cluck::g_name_cluck_param_tickets));
3838 bool added_tickets(
false);
3839 std::list<std::string> lines;
3840 snapdev::tokenize_string(lines, tickets,
"\n",
true);
3841 for(
auto const & l : lines)
3844 std::list<std::string> vars;
3845 snapdev::tokenize_string(vars, tickets,
"|",
true);
3846 auto object_name_value(std::find_if(
3849 , [](std::string
const & vv)
3851 return vv.starts_with(
"object_name=");
3853 if(object_name_value != vars.end())
3855 auto entering_key_value(std::find_if(
3858 , [](std::string
const & vv)
3860 return vv.starts_with(
"entering_key=");
3862 if(entering_key_value != vars.end())
3866 std::string
const object_name(object_name_value->substr(12));
3867 std::string
const entering_key(entering_key_value->substr(13));
3872 auto key_ticket(entering_ticket->second.find(entering_key));
3873 if(key_ticket != entering_ticket->second.end())
3875 t = key_ticket->second;
3880 auto obj_ticket(
f_tickets.find(object_name));
3883 auto key_ticket(std::find_if(
3884 obj_ticket->second.begin()
3885 , obj_ticket->second.end()
3886 , [&entering_key](
auto const & o)
3888 return o.second->get_entering_key() == entering_key;
3890 if(key_ticket != obj_ticket->second.end())
3892 t = key_ticket->second;
3899 bool const new_ticket(t ==
nullptr);
3906 t = std::make_shared<ticket>(
3910 , ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG
3915 , cluck::g_name_cluck_service_name);
3919 added_tickets =
true;
3934 auto li(std::find_if(
3937 , [&t](
auto const & c)
3939 return t->get_owner() == c->get_name();
3943 f_tickets[object_name][t->get_ticket_key()] = t;
3980 std::string object_name;
3981 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
3983 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
3993 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3994 if(key_entering_ticket != obj_entering_ticket->second.end())
3996 key_entering_ticket->second->max_ticket(msg.get_integer_parameter(cluck::g_name_cluck_param_ticket_id));
4026 if(!msg.has_parameter(communicator::g_name_communicator_param_server_name))
4033 std::string
const server_name(msg.get_parameter(communicator::g_name_communicator_param_server_name));
4034 if(server_name.empty()
4056SNAP_LOG_WARNING <<
"removed \"" << server_name <<
"\"" << SNAP_LOG_SEND;
4102 std::string
const service(msg.get_parameter(communicator::g_name_communicator_param_service));
4104 if(service.starts_with(communicator::g_name_communicator_connection_remote_communicator_in)
4105 || service.starts_with(communicator::g_name_communicator_connection_remote_communicator_out))
4109 std::string
const status(msg.get_parameter(communicator::g_name_communicator_param_status));
4110 if(status == communicator::g_name_communicator_value_up)
4136 std::string object_name;
4137 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4139 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
4144 auto const obj_ticket(
f_tickets.find(object_name));
4147 auto const key_ticket(obj_ticket->second.find(key));
4148 if(key_ticket != obj_ticket->second.end())
4160 <<
"called with object \""
4162 <<
"\" not present in f_entering_ticket (key: \""
4168 key_ticket->second->ticket_added(obj_entering_ticket->second);
4173 <<
"found object \""
4175 <<
"\" but could not find a corresponding ticket with key \""
4201 std::string object_name;
4202 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4204 if(!
get_parameters(msg, &object_name, &tag,
nullptr,
nullptr, &key,
nullptr))
4209 auto obj_ticket(
f_tickets.find(object_name));
4212 auto key_ticket(obj_ticket->second.find(key));
4213 if(key_ticket != obj_ticket->second.end())
4217 key_ticket->second->set_ready();
4236 <<
"received an UNLOCK when cluckd is not ready to receive lock related messages."
4250 std::string object_name;
4251 ed::dispatcher_match::tag_t tag(ed::dispatcher_match::DISPATCHER_MATCH_NO_TAG);
4252 pid_t client_pid(0);
4253 if(!
get_parameters(msg, &object_name, &tag, &client_pid,
nullptr,
nullptr,
nullptr))
4260 auto obj_ticket(
f_tickets.find(object_name));
4263 std::string
const server_name(msg.has_parameter(
"lock_proxy_server_name")
4264 ? msg.get_parameter(
"lock_proxy_server_name")
4265 : msg.get_sent_from_server());
4271 std::string
const entering_key(server_name +
'/' + std::to_string(client_pid));
4272 auto key_ticket(std::find_if(
4273 obj_ticket->second.begin()
4274 , obj_ticket->second.end()
4275 , [&entering_key](
auto const & t)
4277 return t.second->get_entering_key() == entering_key;
4279 if(key_ticket != obj_ticket->second.end())
4285 key_ticket->second->drop_ticket();
4287 obj_ticket->second.erase(key_ticket);
4288 if(obj_ticket->second.empty())
4305 <<
"UNLOCK could not find key \""
4307 <<
"\" in object \""
4316 <<
"UNLOCK could not find object \""
Class handling intercomputer locking.
void msg_get_max_ticket(ed::message &msg)
Search for the largest ticket.
bool is_daemon_ready() const
Check whether the cluck daemon is ready to process lock requests.
void add_connections()
Finish the cluck daemon initialization.
void msg_lock_status(ed::message &msg)
A service asked about the lock status.
void msg_lock_entered(ed::message &msg)
Tell the specified ticket LOCK_ENTERED was received.
std::string const & get_server_name() const
Get the name of the server we are running on.
time_t f_pace_lockstarted
void run()
Run the cluck daemon.
void cleanup()
Clean timed out entries if any.
void lock_exiting(ed::message &msg)
Used to simulate a LOCK_EXITING message.
computer::pointer_t get_leader_b() const
Get pointer to leader B.
void msg_max_ticket(ed::message &msg)
Got the largest ticket from another leader.
void msg_add_ticket(ed::message &msg)
Add a ticket from another cluckd.
ed::communicator::pointer_t f_communicator
void msg_ticket_ready(ed::message &msg)
Let other leaders know that the ticket is ready.
void msg_cluster_down(ed::message &msg)
The communicatord lost too many connections.
ticket::ticket_id_t get_last_ticket(std::string const &lock_name)
Determine the last ticket defined in this cluck daemon.
void msg_absolutely(ed::message &msg)
Lock the resource after confirmation that client is alive.
ticket::object_map_t f_entering_tickets
std::string ticket_list() const
Generate the output for "cluck-status --list".
void msg_activate_lock(ed::message &msg)
Acknowledge the ACTIVATE_LOCK with what we think is our first lock.
std::size_t f_neighbors_count
snapdev::timespec_ex f_election_date
void set_my_ip_address(addr::addr const &a)
addr::addr f_my_ip_address
void msg_unlock(ed::message &msg)
Unlock the resource.
computer::map_t f_computers
void msg_info(ed::message &msg)
Return a JSON with the state of this cluckd object.
ticket::object_map_t f_tickets
void msg_lock_started(ed::message &msg)
Called whenever a cluck computer is acknowledging itself.
void stop(bool quitting)
Called whenever we receive the STOP command or equivalent.
std::string f_server_name
void synchronize_leaders()
Synchronize leaders.
void msg_lock_leaders(ed::message &msg)
The list of leaders.
void election_status()
Check the status of the election.
void msg_lock_activated(ed::message &msg)
Acknowledgement of the lock to activate.
void forward_message_to_leader(ed::message &message)
Forward a user message to a leader.
virtual ~cluckd()
Do some clean ups.
ticket::serial_t f_ticket_serial
computer::vector_t f_leaders
std::string serialized_tickets()
void send_lock_started(ed::message const *msg)
void msg_ticket_added(ed::message &msg)
Acknowledgement that the ticket was properly added.
int get_computer_count() const
Return the number of known computers running cluckd.
messenger::pointer_t f_messenger
void msg_lock_entering(ed::message &msg)
Create an entering ticket.
ticket::key_map_t const get_entering_tickets(std::string const &lock_name)
Get a reference to the list of entering tickets.
void msg_list_tickets(ed::message &msg)
Reply to the LIST_TICKETS message with the TICKET_LIST.
void msg_lock_exiting(ed::message &msg)
Exit a ticket.
void msg_server_gone(ed::message &msg)
Called whenever a remote connection is disconnected.
void msg_lock(ed::message &msg)
Lock the named resource.
void msg_clock_stable(ed::message &msg)
Message telling us whether the clock is stable.
computer::pointer_t is_leader(std::string id=std::string()) const
Search for a leader.
cluck::timeout_t f_start_time
void activate_first_lock(std::string const &object_name)
Make sure the very first ticket is marked as LOCKED.
cluckd(int argc, char *argv[])
Initializes a cluckd object.
void set_ticket(std::string const &object_name, std::string const &key, ticket::pointer_t ticket)
Set the ticket.
void msg_cluster_up(ed::message &msg)
Cluster is ready, send the LOCK_STARTED message.
void msg_drop_ticket(ed::message &msg)
One of the cluckd processes asked for a ticket to be dropped.
bool get_parameters(ed::message const &message, std::string *object_name, ed::dispatcher_match::tag_t *tag, pid_t *client_pid, cluck::timeout_t *timeout, std::string *key, std::string *source)
Try to get a set of parameters.
void msg_lock_failed(ed::message &msg)
Acknowledge a lock failure.
interrupt::pointer_t f_interrupt
void msg_lock_tickets(ed::message &msg)
Another cluckd is sending us its list of tickets.
void msg_status(ed::message &msg)
With the STATUS message we know of new communicatord services.
computer::pointer_t get_leader_a() const
Get pointer to leader A.
std::size_t f_neighbors_quorum
message_cache::list_t f_message_cache
ticket::pointer_t find_first_lock(std::string const &lock_name)
void set_start_time(snapdev::timespec_ex const &start_time)
void set_connected(bool connected)
std::string const & get_name() const
static priority_t const PRIORITY_USER_MIN
std::map< std::string, pointer_t > map_t
static priority_t const PRIORITY_MAX
static priority_t const PRIORITY_OFF
bool set_id(std::string const &id)
Initialize this computer object from id.
std::shared_ptr< computer > pointer_t
Handle the ticket messages.
void set_owner(std::string const &owner)
Define whether this ticket is the owner of that lock.
void set_serial(serial_t owner)
Give the lock a serial number for some form of unicity.
void entering()
Enter the mode that lets us retrieve our ticket number.
void set_alive_timeout(cluck::timeout_t timeout)
Define a time when the ticket times out while waiting.
static ticket_id_t const NO_TICKET
void set_unlock_duration(cluck::timeout_t duration)
Change the unlock duration to the specified value.
void activate_lock()
Check whether this ticket can be activated and do so if so.
std::uint32_t ticket_id_t
void lock_activated()
Check whether this ticket can be activated and do so if so.
std::string const & get_ticket_key() const
Retrieve a reference to the ticket key.
std::shared_ptr< ticket > pointer_t
std::map< std::string, pointer_t > key_map_t
Daemon handling inter-computer locking.
constexpr std::string_view g_default_candidate_priority
constexpr char const *const g_configuration_files[]
advgetopt::group_description const g_group_descriptions[]
advgetopt::option const g_options[]
advgetopt::options_environment const g_options_environment
timeout_t get_lock_obtention_timeout()
timeout_t CLUCK_DEFAULT_TIMEOUT
snapdev::timespec_ex timeout_t
A timeout delay.
timeout_t CLUCK_UNLOCK_MINIMUM_TIMEOUT
timeout_t CLUCK_MINIMUM_TIMEOUT
std::size_t CLUCK_MAXIMUM_ENTERING_LOCKS
std::list< message_cache > list_t
#define CLUCK_VERSION_STRING