提交 ebafc241 编写于 作者: P Pavel Kartavyy

BaseDaemon: waitForTerminationRequest() use sigaction instead of sigwait

上级 07a47ecd
......@@ -163,7 +163,7 @@ public:
protected:
int run();
void waitForTerminationRequest();
virtual void waitForTerminationRequest();
#if !defined(_WIN32_WCE)
void defineOptions(OptionSet& options);
#endif
......
......@@ -52,6 +52,8 @@ namespace Poco { class TaskManager; }
class BaseDaemon : public Poco::Util::ServerApplication
{
friend class SignalListener;
public:
BaseDaemon();
~BaseDaemon();
......@@ -132,6 +134,14 @@ protected:
/// Используется при exitOnTaskError()
void handleNotification(Poco::TaskFailedNotification *);
/// thread safe
virtual void handleSignal(int signal_id);
/// реализация обработки сигналов завершения через pipe не требует блокировки сигнала с помощью sigprocmask во всех потоках
void waitForTerminationRequest() override;
/// thread safe
virtual void onInterruptSignals(int signal_id);
std::unique_ptr<Poco::TaskManager> task_manager;
/// Создание и автоматическое удаление pid файла.
......@@ -156,8 +166,7 @@ protected:
PID pid;
/// Получен ли сигнал на завершение? Этот флаг устанавливается в BaseDaemonApplication.
bool is_cancelled = false;
std::atomic_bool is_cancelled{false};
/// Флаг устанавливается по сообщению из Task (при аварийном завершении).
bool task_failed = false;
......@@ -179,4 +188,8 @@ protected:
std::unique_ptr<GraphiteWriter> graphite_writer;
boost::optional<size_t> layer;
std::mutex signal_handler_mutex;
std::condition_variable signal_event;
size_t terminate_signals_counter = 0;
};
......@@ -131,9 +131,9 @@ static void call_default_signal_handler(int sig)
using ThreadNumber = decltype(Poco::ThreadNumber::get());
static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(ThreadNumber);
using signal_function = void(int, siginfo_t*, void*);
/** Обработчик сигналов HUP / USR1 */
static void close_logs_signal_handler(int sig, siginfo_t * info, void * context)
static void writeSignalIDtoSignalPipe(int sig)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
......@@ -141,10 +141,21 @@ static void close_logs_signal_handler(int sig, siginfo_t * info, void * context)
out.next();
}
/** Обработчик сигналов HUP / USR1 */
static void closeLogsSignalHandler(int sig, siginfo_t * info, void * context)
{
writeSignalIDtoSignalPipe(sig);
}
static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * context)
{
writeSignalIDtoSignalPipe(sig);
}
/** Обработчик некоторых сигналов. Выводит информацию в лог (если получится).
*/
static void fault_signal_handler(int sig, siginfo_t * info, void * context)
static void faultSignalHandler(int sig, siginfo_t * info, void * context)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
......@@ -174,7 +185,9 @@ static bool already_printed_stack_trace = false;
class SignalListener : public Poco::Runnable
{
public:
SignalListener() : log(&Logger::get("BaseDaemon"))
SignalListener(BaseDaemon & daemon_)
: log(&Logger::get("BaseDaemon"))
, daemon(daemon_)
{
}
......@@ -204,6 +217,12 @@ public:
onTerminate(message, thread_num);
}
else if (sig == SIGINT ||
sig == SIGQUIT ||
sig == SIGTERM)
{
daemon.handleSignal(sig);
}
else
{
siginfo_t info;
......@@ -221,8 +240,9 @@ public:
private:
Logger * log;
BaseDaemon & daemon;
private:
void onTerminate(const std::string & message, ThreadNumber thread_num) const
{
LOG_ERROR(log, "(from thread " << thread_num << ") " << message);
......@@ -739,42 +759,31 @@ void BaseDaemon::initialize(Application& self)
std::set_terminate(terminate_handler);
/// Ставим обработчики сигналов
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = fault_signal_handler;
sa.sa_flags = SA_SIGINFO;
{
int signals[] = {SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, 0};
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaddset(&sa.sa_mask, signals[i]))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaction(signals[i], &sa, 0))
throw Poco::Exception("Cannot set signal handler.");
}
sa.sa_sigaction = close_logs_signal_handler;
auto add_signal_handler =
[](const std::vector<int> & signals, signal_function handler)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = handler;
sa.sa_flags = SA_SIGINFO;
{
int signals[] = {SIGHUP, SIGUSR1, 0};
{
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (auto signal : signals)
if (sigaddset(&sa.sa_mask, signal))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaddset(&sa.sa_mask, signals[i]))
throw Poco::Exception("Cannot set signal handler.");
for (auto signal : signals)
if (sigaction(signal, &sa, 0))
throw Poco::Exception("Cannot set signal handler.");
}
};
for (size_t i = 0; signals[i]; ++i)
if (sigaction(signals[i], &sa, 0))
throw Poco::Exception("Cannot set signal handler.");
}
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE}, faultSignalHandler);
add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler);
add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler);
/// Ставим ErrorHandler для потоков
static KillingErrorHandler killing_error_handler;
......@@ -783,7 +792,7 @@ void BaseDaemon::initialize(Application& self)
/// Выведем ревизию демона
logRevision();
signal_listener.reset(new SignalListener);
signal_listener.reset(new SignalListener(*this));
signal_listener_thread.start(*signal_listener);
graphite_writer.reset(new GraphiteWriter("graphite"));
......@@ -890,3 +899,35 @@ void BaseDaemon::PID::clear()
file.clear();
}
}
void BaseDaemon::handleSignal(int signal_id)
{
if (signal_id == SIGINT ||
signal_id == SIGQUIT ||
signal_id == SIGTERM)
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
{
++terminate_signals_counter;
signal_event.notify_all();
}
onInterruptSignals(signal_id);
}
else
throw DB::Exception(std::string("Unsupported signal: ") + strsignal(signal_id));
}
void BaseDaemon::onInterruptSignals(int signal_id)
{
is_cancelled = true;
LOG_INFO(&logger(), "Received termination signal(" << strsignal(signal_id) << ")");
}
void BaseDaemon::waitForTerminationRequest()
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
signal_event.wait(lock, [this](){ return terminate_signals_counter > 0; });
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册