22#include "cluck/names.h"
27#include <eventdispatcher/names.h>
32#include <communicator/communicator_connection.h>
33#include <communicator/names.h>
38#include <cppthread/guard.h>
39#include <cppthread/mutex.h>
40#include <cppthread/thread.h>
45#include <snapdev/not_reached.h>
50#include <snapdev/poison.h>
98cppthread::mutex
g_mutex = cppthread::mutex();
107 if(m->f_expr !=
nullptr
108 && m->f_expr == msg.get_command()
109 && msg.has_parameter(g_name_cluck_param_tag)
110 && msg.get_integer_parameter(g_name_cluck_param_tag) == m->f_tag)
112 return ed::match_t::MATCH_TRUE;
115 return ed::match_t::MATCH_FALSE;
121 cppthread::guard lock(
g_mutex);
138 cppthread::guard lock(g_mutex);
139 return g_lock_obtention_timeout;
154 cppthread::guard lock(g_mutex);
155 g_lock_obtention_timeout = timeout;
161 cppthread::guard lock(g_mutex);
162 return g_lock_duration_timeout;
177 cppthread::guard lock(g_mutex);
178 g_lock_duration_timeout = timeout;
184 cppthread::guard lock(g_mutex);
185 return g_unlock_timeout;
200 cppthread::guard lock(g_mutex);
201 g_unlock_timeout = timeout;
378 std::string
const & object_name
379 , ed::connection_with_send_message::pointer_t messenger
380 , ed::dispatcher::pointer_t dispatcher
383 , f_object_name(object_name)
384 , f_tag(ed::dispatcher_match::get_next_tag())
385 , f_connection(messenger)
386 , f_dispatcher(dispatcher)
392 if(messenger ==
nullptr
393 || dispatcher ==
nullptr)
395 throw invalid_parameter(
"messenger & dispatcher parameters must be defined in cluck::cluck() constructor.");
399 set_name(
"cluck::" + object_name);
401 f_connection->add_help_callback(std::bind(&
cluck::help,
this, std::placeholders::_1));
415 f_dispatcher->remove_matches(f_tag);
431 return f_lock_obtained_callbacks.add_callback(func, priority);
447 return f_lock_obtained_callbacks.remove_callback(
id);
463 return f_lock_failed_callbacks.add_callback(func, priority);
479 return f_lock_failed_callbacks.remove_callback(
id);
495 return f_finally_callbacks.add_callback(func, priority);
511 return f_finally_callbacks.remove_callback(
id);
525 return f_lock_obtention_timeout;
557 f_lock_obtention_timeout = timeout;
579 return f_lock_duration_timeout;
599 f_lock_duration_timeout = timeout;
618 return f_unlock_timeout;
637 f_unlock_timeout = timeout;
655 return f_object_name;
718 throw busy(
"this cluck object is busy, you cannot change its type at the moment.");
783 commands.insert(g_name_cluck_cmd_locked);
784 commands.insert(g_name_cluck_cmd_lock_failed);
785 commands.insert(g_name_cluck_cmd_unlocked);
786 commands.insert(g_name_cluck_cmd_unlocking);
880 timeout_t obtention_timeout_date(snapdev::now());
889 obtention_timeout_date += f_lock_obtention_timeout;
892 f_serial = get_next_serial();
896 ed::message lock_message;
897 lock_message.set_command(g_name_cluck_cmd_lock);
898 lock_message.set_service(g_name_cluck_service_name);
899 lock_message.add_parameter(g_name_cluck_param_object_name, f_object_name);
900 lock_message.add_parameter(g_name_cluck_param_tag,
static_cast<int>(f_tag));
901 lock_message.add_parameter(g_name_cluck_param_pid, cppthread::gettid());
902 lock_message.add_parameter(ed::g_name_ed_param_serial, f_serial);
903 lock_message.add_parameter(g_name_cluck_param_timeout, obtention_timeout_date);
904 communicator::request_failure(lock_message);
907 lock_message.add_parameter(g_name_cluck_param_duration, f_lock_duration_timeout);
911 lock_message.add_parameter(g_name_cluck_param_unlock_duration, f_unlock_timeout);
915 lock_message.add_parameter(g_name_cluck_param_type,
static_cast<int>(f_type));
917 if(!f_connection->send_message(lock_message))
922 snapdev::NOT_REACHED_IN_TEST();
927 set_timeout_date(obtention_timeout_date);
935 ed::dispatcher_match locked(ed::define_match(
936 ed::Expression(g_name_cluck_cmd_locked)
938 , ed::MatchFunc(&match_command_and_tag)
940 f_dispatcher->add_match(locked);
942 ed::dispatcher_match lock_failed(ed::define_match(
943 ed::Expression(g_name_cluck_cmd_lock_failed)
945 , ed::MatchFunc(&match_command_and_tag)
947 f_dispatcher->add_match(lock_failed);
949 ed::dispatcher_match unlocked(ed::define_match(
950 ed::Expression(g_name_cluck_cmd_unlocked)
952 , ed::MatchFunc(&match_command_and_tag)
954 f_dispatcher->add_match(unlocked);
956 ed::dispatcher_match unlocking(ed::define_match(
957 ed::Expression(g_name_cluck_cmd_unlocking)
959 , ed::MatchFunc(&match_command_and_tag)
961 f_dispatcher->add_match(unlocking);
963 ed::dispatcher_match transmission_report(ed::define_match(
964 ed::Expression(communicator::g_name_communicator_cmd_transmission_report)
966 , ed::MatchFunc(&ed::one_to_one_callback_match)
968 , ed::Priority(ed::dispatcher_match::DISPATCHER_MATCH_CALLBACK_PRIORITY)));
969 f_dispatcher->add_match(transmission_report);
975 f_connection->send_commands();
1001 <<
"this cluck object is not currently locked."
1013 ed::message unlock_message;
1014 unlock_message.set_command(g_name_cluck_cmd_unlock);
1015 unlock_message.set_service(g_name_cluck_service_name);
1016 unlock_message.add_parameter(g_name_cluck_param_object_name, f_object_name);
1017 unlock_message.add_parameter(g_name_cluck_param_tag,
static_cast<int>(f_tag));
1018 unlock_message.add_parameter(g_name_cluck_param_pid, gettid());
1019 unlock_message.add_parameter(ed::g_name_ed_param_serial, f_serial);
1020 if(!f_connection->send_message(unlock_message))
1027 snapdev::NOT_REACHED_IN_TEST();
1035 timeout_t unlock_timeout_date(snapdev::now());
1037 set_timeout_date(unlock_timeout_date);
1079 return f_lock_timeout_date;
1111 && f_lock_timeout_date > snapdev::now();
1165 if(msg.get_integer_parameter(g_name_cluck_param_tag) != f_tag)
1171 throw logic_error(
"tag mismatch in is_cluck_msg().");
1174 if(msg.get_parameter(g_name_cluck_param_object_name) != f_object_name)
1178 ed::message invalid;
1179 invalid.user_data(msg.user_data<
void>());
1180 invalid.reply_to(msg);
1181 invalid.set_command(ed::g_name_ed_cmd_invalid);
1182 invalid.add_parameter(ed::g_name_ed_param_command, msg.get_command());
1183 invalid.add_parameter(
1184 ed::g_name_ed_param_message
1185 ,
"the \"object_name\" parameter does not match this cluck object. Got \""
1186 + msg.get_parameter(
"object_name")
1190 f_connection->send_message(invalid);
1218 <<
"process_timeout() called with state set to CLUCK_STATE_IDLE."
1220 snapdev::NOT_REACHED_IN_TEST();
1250 <<
"process_timeout() called with state set to CLUCK_STATE_FAILED."
1252 snapdev::NOT_REACHED_IN_TEST();
1290 f_lock_obtained_callbacks.call(
this);
1325 f_lock_failed_callbacks.call(
this);
1351 f_dispatcher->remove_matches(f_tag);
1352 f_finally_callbacks.call(
this);
1369 if(!is_cluck_msg(msg))
1378 f_lock_timeout_date = msg.get_timespec_parameter(g_name_cluck_param_timeout_date);
1379 f_unlocked_timeout_date = msg.get_timespec_parameter(g_name_cluck_param_unlocked_date);
1383 set_timeout_date(f_lock_timeout_date);
1400 if(!is_cluck_msg(msg))
1406 std::string
const error(msg.get_parameter(g_name_cluck_param_error));
1407 if(error == g_name_cluck_value_timedout)
1416 <<
"communicatord did not like our LOCK message: "
1452 std::string
const status(msg.get_parameter(communicator::g_name_communicator_param_status));
1453 if(msg.has_parameter(communicator::g_name_communicator_param_command)
1454 && msg.get_parameter(communicator::g_name_communicator_param_command) == g_name_cluck_cmd_lock
1455 && status == communicator::g_name_communicator_value_failed)
1457 SNAP_LOG_RECOVERABLE_ERROR
1458 <<
"the transmission of our \""
1459 << msg.get_parameter(communicator::g_name_communicator_param_command)
1460 <<
"\" message failed to travel to a cluckd service."
1490 if(!is_cluck_msg(msg))
1498 if(snapdev::now() >= f_unlocked_timeout_date)
1531 if(!is_cluck_msg(msg))
1540 if(snapdev::now() >= f_unlocked_timeout_date)
bool help(advgetopt::string_set_t &commands)
Called whenever the HELP message is received or new messages are added.
reason_t get_reason() const
The reason for the last failure.
void set_lock_duration_timeout(timeout_t timeout)
Set how long inter-process locks last.
timeout_t get_unlock_timeout() const
Retrieve the current unlock duration.
callback_manager_t::callback_id_t add_finally_callback(callback_t func, callback_manager_t::priority_t priority=callback_manager_t::DEFAULT_PRIORITY)
Add a callback function to call when done with the lock.
mode_t get_mode() const
Retrieve the mode.
bool remove_finally_callback(callback_manager_t::callback_id_t id)
Remove a callback function from the lock finally list.
void set_lock_obtention_timeout(timeout_t timeout)
Set how long to wait for an inter-process lock to take.
bool is_busy() const
Check whether the object is currently busy.
void msg_lock_failed(ed::message &msg)
Process the LOCK_FAILED message.
timeout_t get_timeout_date() const
Get the exact time when the lock times out.
void msg_locked(ed::message &msg)
Process the LOCKED message.
void set_unlock_timeout(timeout_t timeout)
Set how long we wait on an inter-process unlock acknowledgement.
void msg_unlocked(ed::message &msg)
Process the UNLOCK acknowledgement.
std::string const & get_object_name() const
Retrieve the object name.
virtual void process_timeout() override
Process the timeout event.
bool lock()
Attempt a lock.
void msg_transmission_report(ed::message &msg)
Get a transmission report on errors.
virtual void lock_failed()
The lock did not take or an error was reported.
timeout_t get_lock_duration_timeout() const
Retrieve the current lock duration.
timeout_t get_lock_obtention_timeout() const
Retrieve the current lock obtention duration.
callback_manager_t::callback_id_t add_lock_failed_callback(callback_t func, callback_manager_t::priority_t priority=callback_manager_t::DEFAULT_PRIORITY)
Add a callback function to call when the lock has failed.
bool remove_lock_failed_callback(callback_manager_t::callback_id_t id)
Remove a callback function from the lock failed list.
virtual void finally()
The lock cycle is finally complete.
virtual ~cluck() override
Make sure to clean up the dispatcher.
bool is_cluck_msg(ed::message &msg) const
Verify a message we received.
type_t get_type() const
Retrieve the lock type.
void set_type(type_t type)
Set the lock type.
void set_reason(reason_t reason)
Change the reason why a lock failed.
bool remove_lock_obtained_callback(callback_manager_t::callback_id_t id)
Remove a callback function from the lock obtained list.
void msg_unlocking(ed::message &msg)
The cluckd service sent us an UNLOCKING message.
virtual void lock_obtained()
This function gets called whenever the lock is in effect.
std::function< bool(cluck *)> callback_t
bool is_locked() const
This function checks whether the lock is considered locked.
void unlock()
Release the inter-process lock.
callback_manager_t::callback_id_t add_lock_obtained_callback(callback_t func, callback_manager_t::priority_t priority=callback_manager_t::DEFAULT_PRIORITY)
Add a callback function to call when the lock is obtained.
cluck::serial_t get_next_serial()
timeout_t g_lock_duration_timeout
timeout_t g_unlock_timeout
timeout_t g_lock_obtention_timeout
ed::match_t match_command_and_tag(ed::dispatcher_match const *m, ed::message &msg)
timeout_t CLUCK_UNLOCK_DEFAULT_TIMEOUT
timeout_t CLUCK_LOCK_OBTENTION_MAXIMUM_TIMEOUT
timeout_t get_unlock_timeout()
void set_lock_duration_timeout(timeout_t timeout)
timeout_t get_lock_obtention_timeout()
@ CLUCK_REASON_REMOTE_TIMEOUT
@ CLUCK_REASON_TRANSMISSION_ERROR
@ CLUCK_REASON_LOCAL_TIMEOUT
timeout_t CLUCK_DEFAULT_TIMEOUT
snapdev::timespec_ex timeout_t
A timeout delay.
timeout_t CLUCK_LOCK_OBTENTION_DEFAULT_TIMEOUT
timeout_t CLUCK_UNLOCK_MINIMUM_TIMEOUT
timeout_t CLUCK_MINIMUM_TIMEOUT
timeout_t get_lock_duration_timeout()
void set_lock_obtention_timeout(timeout_t timeout)
timeout_t CLUCK_MAXIMUM_TIMEOUT
void set_unlock_timeout(timeout_t timeout)
timeout_t CLUCK_LOCK_DURATION_DEFAULT_TIMEOUT