提交 9d540abc 编写于 作者: N Nikita Lapkov

refactor

上级 e935cb08
......@@ -295,7 +295,7 @@
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period and query_profiler_cpu_time_period settings. -->
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<trace_log>
<database>system</database>
<table>trace_log</table>
......
......@@ -3,6 +3,7 @@
#include <memory>
#include <string>
#include <common/likely.h>
#include <common/StringRef.h>
#include <Common/ThreadStatus.h>
......@@ -70,7 +71,12 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
static StringRef getQueryId();
static StringRef getQueryId()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryId();
}
/// Non-master threads call this method in destructor automatically
static void detachQuery();
......
......@@ -434,6 +434,9 @@ namespace ErrorCodes
extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458;
extern const int CANNOT_SET_THREAD_PRIORITY = 459;
extern const int CANNOT_CREATE_TIMER = 460;
extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
#include "QueryProfiler.h"
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
namespace DB
{
LazyPipe trace_pipe;
extern LazyPipe trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
writePODBinary(stack_trace, out);
writePODBinary(timer_type, out);
out.next();
}
const UInt32 TIMER_PRECISION = 1e9;
}
namespace ErrorCodes
{
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_CREATE_TIMER;
extern const int CANNOT_SET_TIMER_PERIOD;
extern const int CANNOT_DELETE_TIMER;
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal)
: log(&Logger::get("QueryProfiler"))
, pause_signal(pause_signal)
{
struct sigaction sa{};
sa.sa_sigaction = ProfilerImpl::signalHandler;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Failed to clean signal mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, pause_signal))
throwFromErrno("Failed to add signal to mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(pause_signal, &sa, previous_handler))
throwFromErrno("Failed to setup signal handler for query profiler", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
try
{
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
sev._sigev_un._tid = thread_id;
if (timer_create(clock_type, &sev, &timer_id))
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval};
if (timer_settime(timer_id, 0, &timer_spec, nullptr))
throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD);
}
catch (...)
{
tryCleanup();
throw;
}
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
{
tryCleanup();
}
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::tryCleanup()
{
if (timer_id != nullptr && timer_delete(timer_id))
LOG_ERROR(log, "Failed to delete query profiler timer " + errnoToString(ErrorCodes::CANNOT_DELETE_TIMER));
if (previous_handler != nullptr && sigaction(pause_signal, previous_handler, nullptr))
LOG_ERROR(log, "Failed to restore signal handler after query profiler " + errnoToString(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER));
}
template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
QueryProfilerReal::QueryProfilerReal(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1)
{}
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
}
}
#pragma once
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Core/Types.h>
#include <signal.h>
#include <time.h>
......@@ -19,47 +13,12 @@ namespace Poco
namespace DB
{
extern LazyPipe trace_pipe;
enum class TimerType : UInt8
{
Real,
Cpu,
};
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
DB::WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
DB::writeChar(false, out);
DB::writeStringBinary(query_id, out);
DB::writePODBinary(stack_trace, out);
DB::writePODBinary(timer_type, out);
out.next();
}
const UInt32 TIMER_PRECISION = 1e9;
}
/**
* Query profiler implementation for selected thread.
*
......@@ -75,46 +34,13 @@ template <typename ProfilerImpl>
class QueryProfilerBase
{
public:
QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM)
: log(&Logger::get("QueryProfiler"))
, pause_signal(pause_signal)
{
struct sigaction sa{};
sa.sa_sigaction = ProfilerImpl::signalHandler;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Failed to clean signal mask for query profiler");
if (sigaddset(&sa.sa_mask, pause_signal))
throw Poco::Exception("Failed to add signal to mask for query profiler");
if (sigaction(pause_signal, &sa, previous_handler))
throw Poco::Exception("Failed to setup signal handler for query profiler");
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
sev._sigev_un._tid = thread_id;
if (timer_create(clock_type, &sev, &timer_id))
throw Poco::Exception("Failed to create thread timer");
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval};
if (timer_settime(timer_id, 0, &timer_spec, nullptr))
throw Poco::Exception("Failed to set thread timer");
}
~QueryProfilerBase()
{
if (timer_delete(timer_id))
LOG_ERROR(log, "Failed to delete query profiler timer");
if (sigaction(pause_signal, previous_handler, nullptr))
LOG_ERROR(log, "Failed to restore signal handler after query profiler");
}
QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM);
~QueryProfilerBase();
private:
void tryCleanup();
Poco::Logger * log;
/// Timer id from timer_create(2)
......@@ -131,28 +57,18 @@ private:
class QueryProfilerReal : public QueryProfilerBase<QueryProfilerReal>
{
public:
QueryProfilerReal(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1)
{}
static void signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
}
QueryProfilerReal(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
{
public:
QueryProfilerCpu(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
static void signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
}
QueryProfilerCpu(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
}
......@@ -119,7 +119,10 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
StringRef getQueryId() const;
StringRef getQueryId() const
{
return query_id;
}
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();
......
......@@ -6,7 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <common/Sleep.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
#include <port/clock.h>
......@@ -77,7 +77,7 @@ public:
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
SleepForNanoseconds(sleep_ns);
sleepForNanoseconds(sleep_ns);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
}
......
......@@ -2,15 +2,50 @@
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Common/QueryProfiler.h>
#include <Interpreters/TraceLog.h>
using namespace DB;
namespace DB
{
LazyPipe trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log)
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
thread.join();
}
trace_pipe.close();
}
/**
* Sends TraceCollector stop message
......@@ -22,21 +57,13 @@ using namespace DB;
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void DB::NotifyTraceCollectorToStop()
void TraceCollector::notifyToStop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
writeIntBinary(true, out);
writeChar(true, out);
out.next();
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log)
{
if (trace_log == nullptr)
throw Poco::Exception("Invalid trace log pointer passed");
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
......@@ -57,7 +84,7 @@ void TraceCollector::run()
readPODBinary(timer_type, in);
const auto size = stack_trace.getSize();
const auto& frames = stack_trace.getFrames();
const auto & frames = stack_trace.getFrames();
Array trace;
trace.reserve(size);
......@@ -69,3 +96,5 @@ void TraceCollector::run()
trace_log->add(element);
}
}
}
#pragma once
#include <Poco/Runnable.h>
#include <Interpreters/Context.h>
#include <Common/ThreadPool.h>
namespace Poco
{
......@@ -11,18 +10,23 @@ namespace Poco
namespace DB
{
void NotifyTraceCollectorToStop();
class TraceLog;
class TraceCollector : public Poco::Runnable
class TraceCollector
{
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log);
TraceCollector(std::shared_ptr<TraceLog> & trace_log);
void run() override;
~TraceCollector();
};
}
......@@ -220,8 +220,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \
M(SettingUInt64, query_profiler_real_time_period, 500000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \
M(SettingUInt64, query_profiler_cpu_time_period, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \
M(SettingUInt64, query_profiler_real_time_period_ns, 500000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \
\
\
/** Limits during query execution are part of the settings. \
......
......@@ -3,7 +3,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Common/CurrentThread.h>
#include <common/Sleep.h>
#include <common/sleep.h>
namespace ProfileEvents
{
......@@ -255,7 +255,7 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
SleepForMicroseconds(sleep_microseconds);
sleepForMicroseconds(sleep_microseconds);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
}
......
......@@ -4,7 +4,7 @@
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/FieldVisitors.h>
#include <common/Sleep.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
......@@ -88,7 +88,7 @@ public:
throw Exception("The maximum sleep time is 3 seconds. Requested: " + toString(seconds), ErrorCodes::TOO_SLOW);
UInt64 microseconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
SleepForMicroseconds(microseconds);
sleepForMicroseconds(microseconds);
}
/// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each block.
......
......@@ -54,8 +54,8 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include "TraceCollector.h"
namespace ProfileEvents
......@@ -155,8 +155,7 @@ struct ContextShared
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
std::optional<SystemLogs> system_logs; /// Used to log queries and operations on parts
Poco::Thread trace_collector_thread; /// Thread collecting traces from threads executing queries
std::unique_ptr<Poco::Runnable> trace_collector;
std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
......@@ -291,17 +290,8 @@ struct ContextShared
schedule_pool.reset();
ddl_worker.reset();
/// Trace collector is only initialized in server program
if (hasTraceCollector())
{
/// Stop trace collector
NotifyTraceCollectorToStop();
trace_collector_thread.join();
/// Close trace pipe - definitely nobody needs to write there after
/// databases shutdown
trace_pipe.close();
}
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector()
......@@ -314,9 +304,7 @@ struct ContextShared
if (trace_log == nullptr)
return;
trace_pipe.open();
trace_collector.reset(new TraceCollector(trace_log));
trace_collector_thread.start(*trace_collector);
trace_collector = std::make_unique<TraceCollector>(trace_log);
}
private:
......
......@@ -136,7 +136,6 @@ private:
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context or nullptr. Could be equal to this.
UInt64 session_close_cycle = 0;
bool session_is_used = false;
......
......@@ -22,7 +22,7 @@
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <common/Sleep.h>
#include <common/sleep.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
......@@ -953,7 +953,7 @@ void DDLWorker::runMainThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Avoid busy loop when ZooKeeper is not available.
SleepForSeconds(1);
sleepForSeconds(1);
}
}
catch (...)
......
......@@ -46,11 +46,6 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
initQueryProfiler();
}
StringRef ThreadStatus::getQueryId() const
{
return query_id;
}
void CurrentThread::defaultThreadDeleter()
{
if (unlikely(!current_thread))
......@@ -161,18 +156,18 @@ void ThreadStatus::initQueryProfiler()
if (!global_context->hasTraceCollector())
return;
auto & settings = query_context->getSettingsRef();
const auto & settings = query_context->getSettingsRef();
if (settings.query_profiler_real_time_period > 0)
if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period)
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns)
);
if (settings.query_profiler_cpu_time_period > 0)
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period)
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns)
);
}
......@@ -291,13 +286,6 @@ void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group
current_thread->deleter = CurrentThread::defaultThreadDeleter;
}
StringRef CurrentThread::getQueryId()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryId();
}
void CurrentThread::attachQueryContext(Context & query_context)
{
if (unlikely(!current_thread))
......
......@@ -21,9 +21,10 @@ add_library (common
src/demangle.cpp
src/setTerminalEcho.cpp
src/getThreadNumber.cpp
src/Sleep.cpp
src/sleep.cpp
src/argsToConfig.cpp
src/StackTrace.cpp
src/Pipe.cpp
include/common/SimpleCache.h
include/common/StackTrace.h
......@@ -48,7 +49,7 @@ add_library (common
include/common/constexpr_helpers.h
include/common/Pipe.h
include/common/getThreadNumber.h
include/common/Sleep.h
include/common/sleep.h
include/common/SimpleCache.h
include/ext/bit_cast.h
......
......@@ -4,56 +4,31 @@
#include <fcntl.h>
#include <stdexcept>
/**
* Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipe
{
int fds_rw[2] = {-1, -1};
LazyPipe() = default;
virtual void open()
{
for (int &fd : fds_rw)
{
if (fd >= 0)
{
throw std::logic_error("Pipe is already opened");
}
}
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throw std::runtime_error("Cannot create pipe");
#else
if (0 != pipe(fds_rw))
throw std::runtime_error("Cannot create pipe");
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe");
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe");
#endif
}
virtual void close() {
for (int fd : fds_rw)
{
if (fd >= 0)
{
::close(fd);
}
}
}
void open();
void close();
virtual ~LazyPipe() = default;
};
struct Pipe : public LazyPipe {
Pipe()
{
open();
}
/**
* Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct Pipe : public LazyPipe
{
Pipe();
~Pipe()
{
close();
}
~Pipe();
};
#pragma once
#include <cstdint>
void SleepForNanoseconds(uint64_t nanoseconds);
void SleepForMicroseconds(uint64_t microseconds);
void SleepForMilliseconds(uint64_t milliseconds);
void SleepForSeconds(uint64_t seconds);
#pragma once
#include <cstdint>
/**
* Sleep functions tolerant to signal interruptions (which can happen
* when query profiler is turned on for example)
*/
void sleepForNanoseconds(uint64_t nanoseconds);
void sleepForMicroseconds(uint64_t microseconds);
void sleepForMilliseconds(uint64_t milliseconds);
void sleepForSeconds(uint64_t seconds);
#include "common/Pipe.h"
void LazyPipe::open()
{
for (int & fd : fds_rw)
{
if (fd >= 0)
{
throw std::logic_error("Pipe is already opened");
}
}
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throw std::runtime_error("Cannot create pipe");
#else
if (0 != pipe(fds_rw))
throw std::runtime_error("Cannot create pipe");
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe");
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe");
#endif
}
void LazyPipe::close()
{
for (int fd : fds_rw)
{
if (fd >= 0)
{
::close(fd);
}
}
}
Pipe::Pipe()
{
open();
}
Pipe::~Pipe()
{
close();
}
#include "common/Sleep.h"
#include "common/sleep.h"
#include <time.h>
#include <errno.h>
/**
* Sleep with nanoseconds precision
* Sleep with nanoseconds precision. Tolerant to signal interruptions
*
* In case query profiler is turned on, all threads spawned for
* query execution are repeatedly interrupted by signals from timer.
......@@ -12,14 +12,14 @@
* problems in this setup and man page for nanosleep(2) suggests
* using absolute deadlines, for instance clock_nanosleep(2).
*/
void SleepForNanoseconds(uint64_t nanoseconds)
void sleepForNanoseconds(uint64_t nanoseconds)
{
const auto clock_type = CLOCK_REALTIME;
constexpr auto clock_type = CLOCK_MONOTONIC;
struct timespec current_time;
clock_gettime(clock_type, &current_time);
const uint64_t resolution = 1'000'000'000;
constexpr uint64_t resolution = 1'000'000'000;
struct timespec finish_time = current_time;
finish_time.tv_nsec += nanoseconds % resolution;
......@@ -31,17 +31,17 @@ void SleepForNanoseconds(uint64_t nanoseconds)
while (clock_nanosleep(clock_type, TIMER_ABSTIME, &finish_time, nullptr) == EINTR);
}
void SleepForMicroseconds(uint64_t microseconds)
void sleepForMicroseconds(uint64_t microseconds)
{
SleepForNanoseconds(microseconds * 1000);
sleepForNanoseconds(microseconds * 1000);
}
void SleepForMilliseconds(uint64_t milliseconds)
void sleepForMilliseconds(uint64_t milliseconds)
{
SleepForMicroseconds(milliseconds * 1000);
sleepForMicroseconds(milliseconds * 1000);
}
void SleepForSeconds(uint64_t seconds)
void sleepForSeconds(uint64_t seconds)
{
SleepForMilliseconds(seconds * 1000);
sleepForMilliseconds(seconds * 1000);
}
......@@ -8,7 +8,7 @@
#include <mysqlxx/Pool.h>
#include <common/Sleep.h>
#include <common/sleep.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
......@@ -135,7 +135,7 @@ Pool::Entry Pool::Get()
}
lock.unlock();
SleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
}
......@@ -195,7 +195,7 @@ void Pool::Entry::forceConnected() const
if (first)
first = false;
else
SleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
app.logger().information("MYSQL: Reconnecting to " + pool->description);
data->conn.connect(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册