Initial version

This commit is contained in:
Alexander Kobjolke 2022-02-26 00:23:43 +01:00
commit 04b878f078
28 changed files with 1771 additions and 0 deletions

View file

@ -0,0 +1,37 @@
#ifndef PROCESS_COMMAND_RECEIVER_HPP
#define PROCESS_COMMAND_RECEIVER_HPP
#include <asap/asap.hpp>
#include "process/process_collection.hpp"
#include "process_command.hpp"
namespace process::controller {
class CommandListener;
class CommandReceiver {
public:
CommandReceiver(const std::shared_ptr<ProcessCollection>& collection,
const std::shared_ptr<asap::Participant>& participant,
const std::string& topic_name);
CommandReceiver(const std::shared_ptr<ProcessCollection>& collection,
const std::shared_ptr<asap::Participant>& participant,
const std::string& topic_name,
const std::function<bool(const ProcessCommand&)>& validator);
virtual ~CommandReceiver() = default;
CommandReceiver(CommandReceiver&&) = delete;
CommandReceiver& operator=(CommandReceiver&&) & = delete;
CommandReceiver(const CommandReceiver&) = delete;
CommandReceiver& operator=(const CommandReceiver&) & = delete;
private:
const std::string m_participant_topic;
std::shared_ptr<CommandListener> m_cmd_listener;
std::shared_ptr<asap::Participant> participant;
std::shared_ptr<asap::Subscriber<ProcessCommand>> subscriber;
};
}
#endif //PROCESS_COMMAND_RECEIVER_HPP

View file

@ -0,0 +1,36 @@
#ifndef PROCESS_DEFAULT_CONTROLLER_HPP
#define PROCESS_DEFAULT_CONTROLLER_HPP
#include <memory>
#include <filesystem>
#include <asap/asap.hpp>
#include <process/watchdog.hpp>
namespace process::controller {
class ProcessCollection;
class CommandReceiver;
class DefinitionReceiver;
class DefaultProcessController {
public:
DefaultProcessController(std::shared_ptr<ProcessCollection> collection, const std::string& node_name,
const std::string& transport, const std::string& ctrl_prefix, const std::string& info_prefix);
virtual ~DefaultProcessController() = default;
void start();
void stop();
private:
std::shared_ptr<const asap::Configuration> m_cfg;
std::shared_ptr<asap::Participant> m_participant;
std::shared_ptr<ProcessCollection> m_collection;
std::shared_ptr<DefinitionReceiver> m_definition_receiver;
std::shared_ptr<CommandReceiver> m_command_receiver;
Watchdog m_watchdog;
};
}
#endif

View file

@ -0,0 +1,21 @@
#ifndef PROCESS_DEFAULT_CONTROLLER_TOPICS_HPP
#define PROCESS_DEFAULT_CONTROLLER_TOPICS_HPP
#include <string>
namespace process::controller {
struct DefaultTopics {
static const std::string TOPIC;
static const std::string TOPIC_COMMAND;
static const std::string TOPIC_DEFINITIONS;
static const std::string TOPIC_NODE;
static const std::string TOPIC_NODE_STATS;
static const std::string TOPIC_PROC;
static const std::string TOPIC_PROC_STATE;
static const std::string TOPIC_PROC_STATS;
};
}
#endif /* PROCESS_DEFAULT_CONTROLLER_TOPICS_HPP */

View file

@ -0,0 +1,24 @@
#ifndef PROCESS_LIBPROCESS_EXT_INCLUDE_DEFINITION_RECEIVER_HPP
#define PROCESS_LIBPROCESS_EXT_INCLUDE_DEFINITION_RECEIVER_HPP
#include <asap/asap.hpp>
#include <process/process_collection.hpp>
#include "process_definition.hpp" // TODO generate to <process/{idl/}process_definition.hpp>
namespace process::controller {
class DefinitionListener;
class DefinitionReceiver {
public:
DefinitionReceiver(std::shared_ptr<ProcessCollection> collection, std::shared_ptr<asap::Participant> participant, const std::string& topic_name);
virtual ~DefinitionReceiver() = default;
protected: //for testing purposes
std::shared_ptr<DefinitionListener> m_def_listener;
std::shared_ptr<asap::Participant> m_participant;
std::shared_ptr<asap::Subscriber<ProcessDefinition>> m_subscriber;
};
}
#endif //PROCESS_LIBPROCESS_EXT_INCLUDE_DEFINITION_RECEIVER_HPP

View file

@ -0,0 +1,28 @@
#ifndef PROCESS_FILESYSTEM_HPP
#define PROCESS_FILESYSTEM_HPP
#include <fstream>
#include <memory>
namespace process {
class Filesystem {
public:
Filesystem() = default;
Filesystem(const Filesystem&) = default;
virtual ~Filesystem() noexcept = default;
Filesystem& operator=(const Filesystem&) & = default;
Filesystem& operator=(Filesystem&&) & = default;
Filesystem(Filesystem&&) = default;
[[nodiscard]] virtual std::shared_ptr<std::istream> open_read(const std::string& path) const;
[[nodiscard]] virtual uint32_t get_folder_size(const std::string& path) const;
};
}
#endif

View file

@ -0,0 +1,42 @@
#ifndef LIBPROCESS_EXPORT_H
#define LIBPROCESS_EXPORT_H
#ifdef LIBPROCESS_STATIC_DEFINE
# define LIBPROCESS_EXPORT
# define LIBPROCESS_NO_EXPORT
#else
# ifndef LIBPROCESS_EXPORT
# ifdef libprocess_EXPORTS
/* We are building this library */
# define LIBPROCESS_EXPORT
# else
/* We are using this library */
# define LIBPROCESS_EXPORT
# endif
# endif
# ifndef LIBPROCESS_NO_EXPORT
# define LIBPROCESS_NO_EXPORT
# endif
#endif
#ifndef LIBPROCESS_DEPRECATED
# define LIBPROCESS_DEPRECATED
#endif
#ifndef LIBPROCESS_DEPRECATED_EXPORT
# define LIBPROCESS_DEPRECATED_EXPORT LIBPROCESS_EXPORT LIBPROCESS_DEPRECATED
#endif
#ifndef LIBPROCESS_DEPRECATED_NO_EXPORT
# define LIBPROCESS_DEPRECATED_NO_EXPORT LIBPROCESS_NO_EXPORT LIBPROCESS_DEPRECATED
#endif
#if 0 /* DEFINE_NO_DEPRECATED */
# ifndef LIBPROCESS_NO_DEPRECATED
# define LIBPROCESS_NO_DEPRECATED
# endif
#endif
#endif /* LIBPROCESS_EXPORT_H */

View file

@ -0,0 +1,31 @@
#ifndef PROCESS_LIBPROCESS_EXT_INCLUDE_NODE_CHECKER_HPP
#define PROCESS_LIBPROCESS_EXT_INCLUDE_NODE_CHECKER_HPP
#include <process/watchdog.hpp>
#include "node_statistic.hpp"
#include <memory>
namespace process::controller {
class NodeOSInterface;
class NodeCheckerCallback {
public:
virtual void on_node_statistic_changed(const ::process::controller::NodeStatistic& ps) = 0;
};
class NodeChecker : public WatchdogChecker<NodeCheckerCallback> {
public:
//TODO default OSInterface in constructor, here should be nullptr if there are include problems/forward declare?
NodeChecker(std::shared_ptr<NodeCheckerCallback> cb,
std::shared_ptr<NodeOSInterface> interface = nullptr);
void check() override;
protected:
std::shared_ptr<NodeOSInterface> m_interface;
::process::controller::NodeStatistic m_stat;
};
}
#endif

View file

@ -0,0 +1,76 @@
#ifndef PROCESS_CONTROLLER_PREDICATE_HPP
#define PROCESS_CONTROLLER_PREDICATE_HPP
#include "process_definition.hpp" // TODO: generate to <build>/idl/process/process_definition.hpp
#include <string>
#include <functional>
#include <type_traits>
namespace process::controller {
namespace predicate {
/**
* combines two predicates into one using a given binary operation
*/
template <typename P1, typename P2, typename Op>
[[nodiscard]] inline auto combine(const P1& p1, const P2& p2, const Op op) noexcept -> std::function<bool(const ProcessDefinition&)>
{
static_assert(std::is_invocable_r_v<bool, decltype(op), bool, bool>);
static_assert(std::is_invocable_r_v<bool, decltype(p1), const ProcessDefinition&>);
static_assert(std::is_invocable_r_v<bool, decltype(p2), const ProcessDefinition&>);
return [p1, p2, op](const ProcessDefinition& arg) noexcept -> bool {
static_assert(std::is_invocable_r_v<bool, decltype(op), bool, bool>);
static_assert(std::is_invocable_r_v<bool, decltype(p1), const ProcessDefinition&>);
static_assert(std::is_invocable_r_v<bool, decltype(p2), const ProcessDefinition&>);
return op(p1(arg), p2(arg));
};
}
/**
* combines two predicates using logical or
*
* returns a function operating on the same argument as p1 and p2
*/
template <typename P1, typename P2>
[[nodiscard]] inline auto Or(const P1& p1, const P2& p2) noexcept -> std::function<bool(const ProcessDefinition&)>
{
return combine(p1, p2, std::logical_or {});
}
/**
* equivalent to `const true`
*/
[[nodiscard]] constexpr bool allow_all(const ProcessDefinition&) noexcept { return true; }
/**
* returns a function that checks whether the node of a ProcessDefinition equals a given string
*/
[[nodiscard]] inline std::function<bool(const ProcessDefinition&)> node_equals(const std::string& node) noexcept
{
return [node](const ProcessDefinition& pd) noexcept -> bool {
return pd.node() == node;
};
}
/**
* returns true iff the node specifies "any node", i.e. '*'
*/
[[nodiscard]] inline std::function<bool(const ProcessDefinition&)> is_any_node() noexcept
{
return node_equals("*");
}
/**
* returns a function that checks whether the node of a ProcessDefinition matches a given pattern
*
* TODO implement more sophisticated glob matching, we currently support '*' and exact match
*/
[[nodiscard]] inline std::function<bool(const ProcessDefinition&)> node_matches(const std::string& pattern) noexcept
{
return Or(is_any_node(), node_equals(pattern));
}
}
}
#endif /* PROCESS_CONTROLLER_PREDICATE_HPP */

View file

@ -0,0 +1,97 @@
#ifndef PROCESS_LIBPROCESS_INCLUDE_PROCESS_HPP
#define PROCESS_LIBPROCESS_INCLUDE_PROCESS_HPP
#include <utility>
#include "process/libprocess_export.hpp"
#include "process_status.hpp"
#include "process_statistic.hpp"
namespace process::controller {
class ProcessDefinition;
}
namespace process {
/**
Process interface expected to be used by library user.
This interface represents a process, be it for a console application,
a window application or a services.
Processes can be started, stopped or terminated using this interfaces.
Instances of this interface are created through the factory method
create_process_for() declared in process_factory.
This file is a public header, that is, one that a user of libprocess is
expected to see and use.
*/
class LIBPROCESS_EXPORT Process {
public:
virtual ~Process() = default;
Process(Process const&) = delete;
Process& operator=(Process const&) & = delete;
Process(Process&&) = delete;
Process& operator=(Process&&) & = delete;
/**
Start this process.
Attempts to start process.
@throws ProcessAlreadyStartedException if process is already running
@throws ProcessStartException if start failed for some reason
@throws ProcessUpdateException if updating the process (essentially calling Process::update()) failed
*/
virtual void start() = 0;
/**
Stop this process.
Attempts to stop process.
@throws ProcessAlreadyStoppedException if process is already stopped
@throws ProcessStopException if stop failed for some reason
@throws ProcessUpdateException if updating the process (essentially calling Process::update()) failed
*/
virtual void stop() = 0;
/**
Terminates this process.
Attempts to terminate this process. THIS IS ONLY A LAST RESORT WAY TO STOP A PROCESS!
@throws ProcessAlreadyStoppedException if process is already stopped
@throws ProcessTerminationException if terminate failed for some reason
@throws ProcessUpdateException if updating the process (essentially calling Process::update()) failed
*/
virtual void terminate() = 0;
/**
Update the process, that is, its state.
Attempts to update the state of the process (currently just whether or not it is running and its PID).
May at a later time also update resource consumption.
@throws ProcessUpdateException if updating the process (essentially calling Process::update()) failed
*/
virtual void update() = 0;
/**
Check whether or not the process is running.
*/
virtual bool is_running() const = 0;
/**
Retrieve the process definition used to create this process.
@returns process definition
*/
virtual const controller::ProcessDefinition& get_definition() const = 0;
/**
Retrieve the process status.
The process status is updated on every call to @ref Process::update().
@param status Struct contains all available status information
*/
virtual std::pair<controller::ProcessStatus, controller::ProcessStatistic> get_status() const = 0;
protected:
Process() = default;
};
}
#endif

View file

@ -0,0 +1,31 @@
#ifndef PROCESS_LIBPROCESS_EXT_INCLUDE_PROCESS_CHECKER_HPP
#define PROCESS_LIBPROCESS_EXT_INCLUDE_PROCESS_CHECKER_HPP
#include <process/watchdog.hpp>
#include "process_statistic.hpp"
#include "process_status.hpp"
namespace process::controller {
class ProcessCheckerCallback {
public:
virtual void on_process_status_changed(const ProcessStatus& ps) = 0;
virtual void on_process_statistic_changed(const ProcessStatistic& ps) = 0;
};
class ProcessChecker : public WatchdogChecker<ProcessCheckerCallback> {
public:
ProcessChecker(std::shared_ptr<ProcessCollection> collection, std::shared_ptr<ProcessCheckerCallback> cb);
void check() override;
private:
void check_process(const std::shared_ptr<Process>& process);
using ProcessDataMap = std::map<std::string, std::pair<ProcessStatus, ProcessStatistic>>;
ProcessDataMap m_process_data;
const std::shared_ptr<ProcessCollection> m_collection;
};
}
#endif

View file

@ -0,0 +1,80 @@
#ifndef PROCESS_PROCESS_COLLECTION_HPP
#define PROCESS_PROCESS_COLLECTION_HPP
#include <shared_mutex>
#include <functional>
#include <process/process.hpp>
#include <process/predicate.hpp>
namespace process::controller {
class ProcessDefinition;
/**
@TODO This collection holds up to N ProcessDefinitions with 0 or 1 Process created.
Each of N definition is unique by definition.
Todo: Check uniqueness of added definition!
Currently there is a scope lock inside for_each() function (because of maybe invalidated iter).
This blocks all concurrent action in all other threads.
ToDo: Reduce lock contention.
Proposal: Change vector to list. Do not remove an element from that list
in case the corresponding process has been stopped. Only insert at end of list.
Unless any element is deleted the collection is thread safe.
Use lock to get valid begin() and end() iterator only. Release lock afterwards
and iter through collection without lock.
*/
class ProcessCollection {
using create_process_fn = std::function<std::shared_ptr<Process>(
const process::controller::ProcessDefinition& pd)>;
public:
/// Initialize the create_process_for function with an own implementation
explicit ProcessCollection(create_process_fn create_process_for_f) noexcept(false);
ProcessCollection(create_process_fn create_process_for_f,
std::function<bool(const process::controller::ProcessDefinition)> predicate) noexcept(false);
/**
* Add a process using its definition.
*
* The definition gets passed to a predicate function deciding whether to
* accept it or not.
*
* @return true iff it was added to the collection, false otherwise
*/
void add(const process::controller::ProcessDefinition& definition);
/**
* Lookup a single process by its id.
*
* @return the process or null if it wasn't found
*/
std::shared_ptr<Process> get(const std::string& pd_id) const;
/**
* Iterate over all processes while holding the monitor lock.
*/
void for_each(const std::function<void(std::shared_ptr<Process>)>& pred) const;
/**
*
* @return the number of registered processes
*/
size_t size() const noexcept;
private:
/**
@return Searches and returns process with a given process definition ID. Nullptr if not found.
@param pd_id ID of process definition
@note Does not lock any mutex.
*/
std::shared_ptr<Process> lookup(const std::string& pd_id) const;
mutable std::shared_mutex m_mutex;
std::vector<std::shared_ptr<Process>> m_processes;
create_process_fn m_create_process;
std::function<bool(const process::controller::ProcessDefinition&)> m_predicate;
};
}
#endif

View file

@ -0,0 +1,114 @@
#ifndef PROCESS_LIBPROCESS_INCLUDE_PROCESS_EXCEPTIONS_HPP
#define PROCESS_LIBPROCESS_INCLUDE_PROCESS_EXCEPTIONS_HPP
#include <stdexcept>
namespace process {
/**
Base class for all libprocess exceptions
*/
class ProcessException : public std::runtime_error {
public:
explicit ProcessException(const std::basic_string<char>& what) noexcept
: std::runtime_error(what)
{
}
~ProcessException() override = default;
ProcessException(ProcessException const&) = delete;
ProcessException& operator=(ProcessException const&) & = delete;
ProcessException(ProcessException&&) = delete;
ProcessException& operator=(ProcessException&&) & = delete;
};
/**
Thrown if something goes wrong during process creation
*/
class ProcessCreationException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessCreationException() override = default;
ProcessCreationException(ProcessCreationException const&) = delete;
ProcessCreationException& operator=(ProcessCreationException const&) & = delete;
ProcessCreationException(ProcessCreationException&&) = delete;
ProcessCreationException& operator=(ProcessCreationException&&) & = delete;
};
/**
Thrown if something goes wrong while attempting to start a process
*/
class ProcessStartException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessStartException() override = default;
ProcessStartException(ProcessStartException const&) = delete;
ProcessStartException& operator=(ProcessStartException const&) & = delete;
ProcessStartException(ProcessStartException&&) = delete;
ProcessStartException& operator=(ProcessStartException&&) & = delete;
};
/**
Thrown if something goes wrong while attempting to stop a process
*/
class ProcessStopException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessStopException() override = default;
ProcessStopException(ProcessStopException const&) = delete;
ProcessStopException& operator=(ProcessStopException const&) & = delete;
ProcessStopException(ProcessStopException&&) = delete;
ProcessStopException& operator=(ProcessStopException&&) & = delete;
};
/**
Thrown if Process::start() is called but the process is already running
*/
class ProcessAlreadyStartedException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessAlreadyStartedException() override = default;
ProcessAlreadyStartedException(ProcessAlreadyStartedException const&) = delete;
ProcessAlreadyStartedException& operator=(ProcessAlreadyStartedException const&) & = delete;
ProcessAlreadyStartedException(ProcessAlreadyStartedException&&) = delete;
ProcessAlreadyStartedException& operator=(ProcessAlreadyStartedException&&) & = delete;
};
/**
Thrown if Process::stop() is called but the process is already stopped
*/
class ProcessAlreadyStoppedException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessAlreadyStoppedException() override = default;
ProcessAlreadyStoppedException(ProcessAlreadyStoppedException const&) = delete;
ProcessAlreadyStoppedException& operator=(ProcessAlreadyStoppedException const&) & = delete;
ProcessAlreadyStoppedException(ProcessAlreadyStoppedException&&) = delete;
ProcessAlreadyStoppedException& operator=(ProcessAlreadyStoppedException&&) & = delete;
};
/**
Thrown if the state of a process cannot be updated
*/
class ProcessUpdateException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessUpdateException() override = default;
ProcessUpdateException(ProcessUpdateException const&) = delete;
ProcessUpdateException& operator=(ProcessUpdateException const&) & = delete;
ProcessUpdateException(ProcessUpdateException&&) = delete;
ProcessUpdateException& operator=(ProcessUpdateException&&) & = delete;
};
/**
Thrown if the termination of a process fails
*/
class ProcessTerminationException : public ProcessException {
public:
using ProcessException::ProcessException;
~ProcessTerminationException() override = default;
ProcessTerminationException(ProcessTerminationException const&) = delete;
ProcessTerminationException& operator=(ProcessTerminationException const&) & = delete;
ProcessTerminationException(ProcessTerminationException&&) = delete;
ProcessTerminationException& operator=(ProcessTerminationException&&) & = delete;
};
}
#endif

View file

@ -0,0 +1,46 @@
#ifndef PROCESS_LIBPROCESS_INCLUDE_PROCESS_FACTORY_HPP
#define PROCESS_LIBPROCESS_INCLUDE_PROCESS_FACTORY_HPP
#include <memory>
#include <string>
#include "process/process.hpp"
#include "process/libprocess_export.hpp"
namespace process::controller {
class ProcessDefinition;
enum class ProcessType : uint32_t;
}
namespace process {
/**
@brief Factory method that creates Process instances for specific process definitions.
In order to control a process, you need an instance of Process controlling a process
based on a process description.
Given a valid process definition, create_process_for will create an appropriate
instance of Process for you.
Note that multiple calls to create_process_for with the same process definition
will create separate instances of Process for this definition.
This file is a public header, that is, one that a user of libprocess is
expected to see and use.
@param pd ProcessDefinition to use for the process object
@param user optional parameter if using this library in an application; Required if running as a service. if specified will try to run the process under that user.
Note that this (at least if using windows) only works if that specific user is logged in.
@note In case function is invoked by multipe threads they must be serialized.
*/
LIBPROCESS_EXPORT extern std::shared_ptr<Process> create_process(const process::controller::ProcessDefinition& pd, const std::string& default_user = "");
/**
* @brief creates a Process instance for an already existing process id
* @param pt Type of process, necessary to order to stop the process correctly
* @return Instance of process for the given process id
*/
LIBPROCESS_EXPORT extern std::shared_ptr<Process> attach_process(const process::controller::ProcessType pt, const os_pid_t os_pid);
}
#endif

View file

@ -0,0 +1,15 @@
#ifndef PROCESS_LIBPROCESS_INCLUDE_STATE_EXTENSIONS_HPP
#define PROCESS_LIBPROCESS_INCLUDE_STATE_EXTENSIONS_HPP
#include <process_status.hpp>
namespace process::controller::extensions {
inline static bool is_running(const ProcessState state)
{
return ((state == ProcessState::RUNNING) || (state == ProcessState::RESTARTED));
}
}
#endif //PROCESS_LIBPROCESS_INCLUDE_STATE_EXTENSIONS_HPP

View file

@ -0,0 +1,22 @@
#ifndef PROCESS_SERVICE_WRAPPER_HPP
#define PROCESS_SERVICE_WRAPPER_HPP
#include <cstdint>
namespace process::servicewrapper {
class ServiceWrapper {
public:
/**
* Tells the wrapper to run the service for execution.
* This will start the service run and will block until
* the service is done or died
* @returns exit code of service implementation; return value for service main
*/
virtual uint32_t run() const = 0;
virtual ~ServiceWrapper() = default;
};
}
#endif //!< PROCESS_SERVICE_WRAPPER_HPP

View file

@ -0,0 +1,23 @@
#ifndef PROCESS_SERVICE_WRAPPER_FACTORY_HPP
#define PROCESS_SERVICE_WRAPPER_FACTORY_HPP
#include <memory>
#include <process/service_wrapper.hpp>
#include <process/service_wrapper_listener.hpp>
namespace process::servicewrapper {
/**
@briefFactory method that creates a service wrapper for a ServicewrapperListener.
@note THIS METHOD CAN BE USED ONCE AND ONLY ONCE PER EXECUTABLE
@param listener the actual service implementation. Since this might do other stuff in other threads, we use a shared_ptr
@returns the service wrapper ready to run the service. Call ServiceWrapper::run() to actually execute
*/
std::unique_ptr<ServiceWrapper> make_wrapper(std::shared_ptr<ServiceWrapperListener> listener);
}
#endif // !PROCESS_SERVICE_WRAPPER_FACTORY_HPP

View file

@ -0,0 +1,38 @@
#ifndef PROCESS_SERVICE_WRAPPER_LISTENER_HPP
#define PROCESS_SERVICE_WRAPPER_LISTENER_HPP
#include <string>
#include <cstdint>
namespace process::servicewrapper {
/**
* @brief class that houses the actual implementation of a service
*/
class ServiceWrapperListener {
public:
/*
* Starts the wrapped service.
*
* This is supposed to be a blocking function. Block until your service has a reason to stop,
* either because it is done, because it encountered a fatal error or because it received a
* call to on_stop and is closing down gracefully.
*
* @returns the exit code you want the service to return.
* Note that this may differ from the actual exit code in case the service wrapper encounters
* some platform specific errors
*/
virtual uint32_t on_start() = 0;
/**
* Indicates to the wrapped service that is shall stop, that is, leave the on_start() method
*/
virtual void on_stop() = 0;
/**
Retrieves the internal name of the service
*/
virtual std::string get_name() = 0;
virtual ~ServiceWrapperListener() = default;
};
}
#endif

View file

@ -0,0 +1,89 @@
#ifndef PROCESS_PROCESS_EXT_INCLUDE_WATCHDOG_HPP
#define PROCESS_PROCESS_EXT_INCLUDE_WATCHDOG_HPP
#include <chrono>
#include <atomic>
#include <thread>
#include <map>
#include <process/process.hpp> // note: Public include from libprocess
#include "process_status.hpp" // note: Though in different folder this is also a public
// include from libprocess. Generated from IDL
#include "process_statistic.hpp"
#include "process/process_collection.hpp"
namespace process::controller {
class WatchdogCheckerBase {
public:
virtual void check() = 0;
};
template <typename T>
class WatchdogChecker : public WatchdogCheckerBase {
// Add assert to satisfy A14-1-1
static_assert(std::is_class<T>::value);
public:
WatchdogChecker(std::shared_ptr<T> cb)
: m_callback(std::move(cb))
{
if (m_callback.get() == nullptr) {
throw std::runtime_error("invalid callback");
}
}
protected:
std::shared_ptr<T> m_callback;
};
/**
@brief Periodically updates all provided processes.
@note
- Callback is used to keep class Watchdog free of specific dependencies and user types (No linkage to ASAP)
- Watchdog may be moved into libprocess as well as ProcessCollection
- API of Watchdog itself is not thread safe
*/
class Watchdog {
public:
/**
@brief Constructs the Watchdog.
@param collection List of all processes which shall be monitored
@param interval_ms All processes are checked periodically using the given interval
@param cb Callback object which will be invoked whenever a status change has been detected
*/
explicit Watchdog(const std::chrono::milliseconds interval_ms);
Watchdog(const Watchdog& other) = delete;
Watchdog(Watchdog&& other) = delete;
Watchdog& operator=(const Watchdog& other) & = delete;
Watchdog& operator=(Watchdog&& other) & = delete;
/** This class is not intended to be derived */
virtual ~Watchdog();
/**/
virtual void add_checker(std::shared_ptr<WatchdogCheckerBase> checker);
/** Watchdog starts processing */
virtual void start();
/** Stops the watchdog worker thread */
virtual void stop();
/** @return true if watchdog has been started and monitors the collection periodically */
virtual bool is_running() const;
protected:
virtual void loop();
std::chrono::milliseconds m_interval_ms;
std::atomic<bool> m_stop_loop;
std::thread m_thread;
std::vector<std::shared_ptr<WatchdogCheckerBase>> m_checker;
};
}
#endif