From 9d540abc84ca2b8ba648b7dd78e5b6641b5b73d3 Mon Sep 17 00:00:00 2001 From: Nikita Lapkov Date: Wed, 10 Jul 2019 20:47:39 +0000 Subject: [PATCH] refactor --- dbms/programs/server/config.xml | 2 +- dbms/src/Common/CurrentThread.h | 8 +- dbms/src/Common/ErrorCodes.cpp | 3 + dbms/src/Common/QueryProfiler.cpp | 128 +++++++++++++++++- dbms/src/Common/QueryProfiler.h | 108 ++------------- dbms/src/Common/ThreadStatus.h | 5 +- dbms/src/Common/Throttler.h | 4 +- .../TraceCollector.cpp | 55 ++++++-- dbms/src/Common/TraceCollector.h | 32 +++++ dbms/src/Core/Settings.h | 4 +- dbms/src/DataStreams/IBlockInputStream.cpp | 4 +- dbms/src/Functions/sleep.h | 4 +- dbms/src/Interpreters/Context.cpp | 22 +-- dbms/src/Interpreters/Context.h | 1 - dbms/src/Interpreters/DDLWorker.cpp | 4 +- dbms/src/Interpreters/ThreadStatusExt.cpp | 22 +-- dbms/src/Interpreters/TraceCollector.h | 28 ---- libs/libcommon/CMakeLists.txt | 5 +- libs/libcommon/include/common/Pipe.h | 57 +++----- libs/libcommon/include/common/Sleep.h | 11 -- libs/libcommon/include/common/sleep.h | 16 +++ libs/libcommon/src/Pipe.cpp | 45 ++++++ libs/libcommon/src/{Sleep.cpp => sleep.cpp} | 22 +-- libs/libmysqlxx/src/Pool.cpp | 6 +- 24 files changed, 342 insertions(+), 254 deletions(-) rename dbms/src/{Interpreters => Common}/TraceCollector.cpp (65%) create mode 100644 dbms/src/Common/TraceCollector.h delete mode 100644 dbms/src/Interpreters/TraceCollector.h delete mode 100644 libs/libcommon/include/common/Sleep.h create mode 100644 libs/libcommon/include/common/sleep.h create mode 100644 libs/libcommon/src/Pipe.cpp rename libs/libcommon/src/{Sleep.cpp => sleep.cpp} (62%) diff --git a/dbms/programs/server/config.xml b/dbms/programs/server/config.xml index e78e3c255f..a8f37a2cd7 100644 --- a/dbms/programs/server/config.xml +++ b/dbms/programs/server/config.xml @@ -295,7 +295,7 @@ + See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. --> system trace_log
diff --git a/dbms/src/Common/CurrentThread.h b/dbms/src/Common/CurrentThread.h index 3c248ad903..8b15bc7c3e 100644 --- a/dbms/src/Common/CurrentThread.h +++ b/dbms/src/Common/CurrentThread.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -70,7 +71,12 @@ public: static void finalizePerformanceCounters(); /// Returns a non-empty string if the thread is attached to a query - static StringRef getQueryId(); + static StringRef getQueryId() + { + if (unlikely(!current_thread)) + return {}; + return current_thread->getQueryId(); + } /// Non-master threads call this method in destructor automatically static void detachQuery(); diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index a3b788c230..c472a336d7 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -434,6 +434,9 @@ namespace ErrorCodes extern const int BAD_QUERY_PARAMETER = 457; extern const int CANNOT_UNLINK = 458; extern const int CANNOT_SET_THREAD_PRIORITY = 459; + extern const int CANNOT_CREATE_TIMER = 460; + extern const int CANNOT_SET_TIMER_PERIOD = 461; + extern const int CANNOT_DELETE_TIMER = 462; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/QueryProfiler.cpp b/dbms/src/Common/QueryProfiler.cpp index ce0ddef094..9832b64ee8 100644 --- a/dbms/src/Common/QueryProfiler.cpp +++ b/dbms/src/Common/QueryProfiler.cpp @@ -1,8 +1,134 @@ #include "QueryProfiler.h" +#include +#include +#include +#include +#include +#include +#include +#include + namespace DB { -LazyPipe trace_pipe; +extern LazyPipe trace_pipe; + +namespace +{ + /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. + /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. + constexpr size_t QUERY_ID_MAX_LEN = 1024; + + void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context) + { + constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag + 8 * sizeof(char) + // maximum VarUInt length for string size + QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length + sizeof(StackTrace) + // collected stack trace + sizeof(TimerType); // timer type + char buffer[buf_size]; + WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer); + + StringRef query_id = CurrentThread::getQueryId(); + query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); + + const auto signal_context = *reinterpret_cast(context); + const StackTrace stack_trace(signal_context); + + writeChar(false, out); + writeStringBinary(query_id, out); + writePODBinary(stack_trace, out); + writePODBinary(timer_type, out); + out.next(); + } + + const UInt32 TIMER_PRECISION = 1e9; +} + +namespace ErrorCodes +{ + extern const int CANNOT_MANIPULATE_SIGSET; + extern const int CANNOT_SET_SIGNAL_HANDLER; + extern const int CANNOT_CREATE_TIMER; + extern const int CANNOT_SET_TIMER_PERIOD; + extern const int CANNOT_DELETE_TIMER; +} + +template +QueryProfilerBase::QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal) + : log(&Logger::get("QueryProfiler")) + , pause_signal(pause_signal) +{ + struct sigaction sa{}; + sa.sa_sigaction = ProfilerImpl::signalHandler; + sa.sa_flags = SA_SIGINFO | SA_RESTART; + + if (sigemptyset(&sa.sa_mask)) + throwFromErrno("Failed to clean signal mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET); + + if (sigaddset(&sa.sa_mask, pause_signal)) + throwFromErrno("Failed to add signal to mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET); + + if (sigaction(pause_signal, &sa, previous_handler)) + throwFromErrno("Failed to setup signal handler for query profiler", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER); + + try + { + struct sigevent sev; + sev.sigev_notify = SIGEV_THREAD_ID; + sev.sigev_signo = pause_signal; + sev._sigev_un._tid = thread_id; + if (timer_create(clock_type, &sev, &timer_id)) + throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER); + + struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION}; + struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval}; + if (timer_settime(timer_id, 0, &timer_spec, nullptr)) + throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD); + } + catch (...) + { + tryCleanup(); + throw; + } +} + +template +QueryProfilerBase::~QueryProfilerBase() +{ + tryCleanup(); +} + +template +void QueryProfilerBase::tryCleanup() +{ + if (timer_id != nullptr && timer_delete(timer_id)) + LOG_ERROR(log, "Failed to delete query profiler timer " + errnoToString(ErrorCodes::CANNOT_DELETE_TIMER)); + + if (previous_handler != nullptr && sigaction(pause_signal, previous_handler, nullptr)) + LOG_ERROR(log, "Failed to restore signal handler after query profiler " + errnoToString(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER)); +} + +template class QueryProfilerBase; +template class QueryProfilerBase; + +QueryProfilerReal::QueryProfilerReal(const Int32 thread_id, const UInt32 period) + : QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1) +{} + +void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context) +{ + writeTraceInfo(TimerType::Real, sig, info, context); +} + +QueryProfilerCpu::QueryProfilerCpu(const Int32 thread_id, const UInt32 period) + : QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2) +{} + +void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context) +{ + writeTraceInfo(TimerType::Cpu, sig, info, context); +} } diff --git a/dbms/src/Common/QueryProfiler.h b/dbms/src/Common/QueryProfiler.h index eea1bb0374..d4e92f25a1 100644 --- a/dbms/src/Common/QueryProfiler.h +++ b/dbms/src/Common/QueryProfiler.h @@ -1,12 +1,6 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include +#include #include #include @@ -19,47 +13,12 @@ namespace Poco namespace DB { -extern LazyPipe trace_pipe; - enum class TimerType : UInt8 { Real, Cpu, }; -namespace -{ - /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. - /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. - constexpr size_t QUERY_ID_MAX_LEN = 1024; - - void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context) - { - constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag - 8 * sizeof(char) + // maximum VarUInt length for string size - QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length - sizeof(StackTrace) + // collected stack trace - sizeof(TimerType); // timer type - char buffer[buf_size]; - DB::WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer); - - StringRef query_id = CurrentThread::getQueryId(); - query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); - - const auto signal_context = *reinterpret_cast(context); - const StackTrace stack_trace(signal_context); - - DB::writeChar(false, out); - DB::writeStringBinary(query_id, out); - DB::writePODBinary(stack_trace, out); - DB::writePODBinary(timer_type, out); - out.next(); - } - - const UInt32 TIMER_PRECISION = 1e9; -} - - /** * Query profiler implementation for selected thread. * @@ -75,46 +34,13 @@ template class QueryProfilerBase { public: - QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM) - : log(&Logger::get("QueryProfiler")) - , pause_signal(pause_signal) - { - struct sigaction sa{}; - sa.sa_sigaction = ProfilerImpl::signalHandler; - sa.sa_flags = SA_SIGINFO | SA_RESTART; - - if (sigemptyset(&sa.sa_mask)) - throw Poco::Exception("Failed to clean signal mask for query profiler"); - - if (sigaddset(&sa.sa_mask, pause_signal)) - throw Poco::Exception("Failed to add signal to mask for query profiler"); - - if (sigaction(pause_signal, &sa, previous_handler)) - throw Poco::Exception("Failed to setup signal handler for query profiler"); - - struct sigevent sev; - sev.sigev_notify = SIGEV_THREAD_ID; - sev.sigev_signo = pause_signal; - sev._sigev_un._tid = thread_id; - if (timer_create(clock_type, &sev, &timer_id)) - throw Poco::Exception("Failed to create thread timer"); - - struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION}; - struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval}; - if (timer_settime(timer_id, 0, &timer_spec, nullptr)) - throw Poco::Exception("Failed to set thread timer"); - } - - ~QueryProfilerBase() - { - if (timer_delete(timer_id)) - LOG_ERROR(log, "Failed to delete query profiler timer"); - - if (sigaction(pause_signal, previous_handler, nullptr)) - LOG_ERROR(log, "Failed to restore signal handler after query profiler"); - } + QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM); + + ~QueryProfilerBase(); private: + void tryCleanup(); + Poco::Logger * log; /// Timer id from timer_create(2) @@ -131,28 +57,18 @@ private: class QueryProfilerReal : public QueryProfilerBase { public: - QueryProfilerReal(const Int32 thread_id, const UInt32 period) - : QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1) - {} - - static void signalHandler(int sig, siginfo_t * info, void * context) - { - writeTraceInfo(TimerType::Real, sig, info, context); - } + QueryProfilerReal(const Int32 thread_id, const UInt32 period); + + static void signalHandler(int sig, siginfo_t * info, void * context); }; /// Query profiler with timer based on CPU clock class QueryProfilerCpu : public QueryProfilerBase { public: - QueryProfilerCpu(const Int32 thread_id, const UInt32 period) - : QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2) - {} - - static void signalHandler(int sig, siginfo_t * info, void * context) - { - writeTraceInfo(TimerType::Cpu, sig, info, context); - } + QueryProfilerCpu(const Int32 thread_id, const UInt32 period); + + static void signalHandler(int sig, siginfo_t * info, void * context); }; } diff --git a/dbms/src/Common/ThreadStatus.h b/dbms/src/Common/ThreadStatus.h index 87ddd32044..0b737ccb6a 100644 --- a/dbms/src/Common/ThreadStatus.h +++ b/dbms/src/Common/ThreadStatus.h @@ -119,7 +119,10 @@ public: return thread_state.load(std::memory_order_relaxed); } - StringRef getQueryId() const; + StringRef getQueryId() const + { + return query_id; + } /// Starts new query and create new thread group for it, current thread becomes master thread of the query void initializeQuery(); diff --git a/dbms/src/Common/Throttler.h b/dbms/src/Common/Throttler.h index 6fd5a7f16c..3ad50215b9 100644 --- a/dbms/src/Common/Throttler.h +++ b/dbms/src/Common/Throttler.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include @@ -77,7 +77,7 @@ public: if (desired_ns > elapsed_ns) { UInt64 sleep_ns = desired_ns - elapsed_ns; - SleepForNanoseconds(sleep_ns); + sleepForNanoseconds(sleep_ns); ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL); } diff --git a/dbms/src/Interpreters/TraceCollector.cpp b/dbms/src/Common/TraceCollector.cpp similarity index 65% rename from dbms/src/Interpreters/TraceCollector.cpp rename to dbms/src/Common/TraceCollector.cpp index 671b77eee8..293e4c38e9 100644 --- a/dbms/src/Interpreters/TraceCollector.cpp +++ b/dbms/src/Common/TraceCollector.cpp @@ -2,15 +2,50 @@ #include #include +#include #include #include #include #include +#include +#include #include -#include #include -using namespace DB; +namespace DB +{ + +LazyPipe trace_pipe; + +namespace ErrorCodes +{ + extern const int NULL_POINTER_DEREFERENCE; + extern const int THREAD_IS_NOT_JOINABLE; +} + +TraceCollector::TraceCollector(std::shared_ptr & trace_log) + : log(&Poco::Logger::get("TraceCollector")) + , trace_log(trace_log) +{ + if (trace_log == nullptr) + throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE); + + trace_pipe.open(); + thread = ThreadFromGlobalPool(&TraceCollector::run, this); +} + +TraceCollector::~TraceCollector() +{ + if (!thread.joinable()) + LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined"); + else + { + TraceCollector::notifyToStop(); + thread.join(); + } + + trace_pipe.close(); +} /** * Sends TraceCollector stop message @@ -22,21 +57,13 @@ using namespace DB; * NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe * before stop message. */ -void DB::NotifyTraceCollectorToStop() +void TraceCollector::notifyToStop() { WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]); - writeIntBinary(true, out); + writeChar(true, out); out.next(); } -TraceCollector::TraceCollector(std::shared_ptr trace_log) - : log(&Poco::Logger::get("TraceCollector")) - , trace_log(trace_log) -{ - if (trace_log == nullptr) - throw Poco::Exception("Invalid trace log pointer passed"); -} - void TraceCollector::run() { ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]); @@ -57,7 +84,7 @@ void TraceCollector::run() readPODBinary(timer_type, in); const auto size = stack_trace.getSize(); - const auto& frames = stack_trace.getFrames(); + const auto & frames = stack_trace.getFrames(); Array trace; trace.reserve(size); @@ -69,3 +96,5 @@ void TraceCollector::run() trace_log->add(element); } } + +} diff --git a/dbms/src/Common/TraceCollector.h b/dbms/src/Common/TraceCollector.h new file mode 100644 index 0000000000..7c07f48776 --- /dev/null +++ b/dbms/src/Common/TraceCollector.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +namespace Poco +{ + class Logger; +} + +namespace DB +{ + +class TraceLog; + +class TraceCollector +{ +private: + Poco::Logger * log; + std::shared_ptr trace_log; + ThreadFromGlobalPool thread; + + void run(); + + static void notifyToStop(); + +public: + TraceCollector(std::shared_ptr & trace_log); + + ~TraceCollector(); +}; + +} diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index e718da6fa9..967615d6da 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -220,8 +220,8 @@ struct Settings : public SettingsCollection M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \ M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \ - M(SettingUInt64, query_profiler_real_time_period, 500000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \ - M(SettingUInt64, query_profiler_cpu_time_period, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \ + M(SettingUInt64, query_profiler_real_time_period_ns, 500000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \ + M(SettingUInt64, query_profiler_cpu_time_period_ns, 500000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \ \ \ /** Limits during query execution are part of the settings. \ diff --git a/dbms/src/DataStreams/IBlockInputStream.cpp b/dbms/src/DataStreams/IBlockInputStream.cpp index a2d52dfc36..406a660879 100644 --- a/dbms/src/DataStreams/IBlockInputStream.cpp +++ b/dbms/src/DataStreams/IBlockInputStream.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace ProfileEvents { @@ -255,7 +255,7 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i if (desired_microseconds > total_elapsed_microseconds) { UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds; - SleepForMicroseconds(sleep_microseconds); + sleepForMicroseconds(sleep_microseconds); ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds); } diff --git a/dbms/src/Functions/sleep.h b/dbms/src/Functions/sleep.h index ff7cb36786..5e9732d59f 100644 --- a/dbms/src/Functions/sleep.h +++ b/dbms/src/Functions/sleep.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include @@ -88,7 +88,7 @@ public: throw Exception("The maximum sleep time is 3 seconds. Requested: " + toString(seconds), ErrorCodes::TOO_SLOW); UInt64 microseconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6; - SleepForMicroseconds(microseconds); + sleepForMicroseconds(microseconds); } /// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each block. diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 407cb9366d..bd2c03cb78 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -54,8 +54,8 @@ #include #include #include +#include #include -#include "TraceCollector.h" namespace ProfileEvents @@ -155,8 +155,7 @@ struct ContextShared ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers std::optional system_logs; /// Used to log queries and operations on parts - Poco::Thread trace_collector_thread; /// Thread collecting traces from threads executing queries - std::unique_ptr trace_collector; + std::unique_ptr trace_collector; /// Thread collecting traces from threads executing queries /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. @@ -291,17 +290,8 @@ struct ContextShared schedule_pool.reset(); ddl_worker.reset(); - /// Trace collector is only initialized in server program - if (hasTraceCollector()) - { - /// Stop trace collector - NotifyTraceCollectorToStop(); - trace_collector_thread.join(); - - /// Close trace pipe - definitely nobody needs to write there after - /// databases shutdown - trace_pipe.close(); - } + /// Stop trace collector if any + trace_collector.reset(); } bool hasTraceCollector() @@ -314,9 +304,7 @@ struct ContextShared if (trace_log == nullptr) return; - trace_pipe.open(); - trace_collector.reset(new TraceCollector(trace_log)); - trace_collector_thread.start(*trace_collector); + trace_collector = std::make_unique(trace_log); } private: diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 4f622a0eae..58745d97ad 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -136,7 +136,6 @@ private: Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this. Context * global_context = nullptr; /// Global context or nullptr. Could be equal to this. - UInt64 session_close_cycle = 0; bool session_is_used = false; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 4a3665f3d5..e445b7503c 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -953,7 +953,7 @@ void DDLWorker::runMainThread() tryLogCurrentException(__PRETTY_FUNCTION__); /// Avoid busy loop when ZooKeeper is not available. - SleepForSeconds(1); + sleepForSeconds(1); } } catch (...) diff --git a/dbms/src/Interpreters/ThreadStatusExt.cpp b/dbms/src/Interpreters/ThreadStatusExt.cpp index d0d1badc8a..c4f1fa055b 100644 --- a/dbms/src/Interpreters/ThreadStatusExt.cpp +++ b/dbms/src/Interpreters/ThreadStatusExt.cpp @@ -46,11 +46,6 @@ void ThreadStatus::attachQueryContext(Context & query_context_) initQueryProfiler(); } -StringRef ThreadStatus::getQueryId() const -{ - return query_id; -} - void CurrentThread::defaultThreadDeleter() { if (unlikely(!current_thread)) @@ -161,18 +156,18 @@ void ThreadStatus::initQueryProfiler() if (!global_context->hasTraceCollector()) return; - auto & settings = query_context->getSettingsRef(); + const auto & settings = query_context->getSettingsRef(); - if (settings.query_profiler_real_time_period > 0) + if (settings.query_profiler_real_time_period_ns > 0) query_profiler_real = std::make_unique( /* thread_id */ os_thread_id, - /* period */ static_cast(settings.query_profiler_real_time_period) + /* period */ static_cast(settings.query_profiler_real_time_period_ns) ); - if (settings.query_profiler_cpu_time_period > 0) + if (settings.query_profiler_cpu_time_period_ns > 0) query_profiler_cpu = std::make_unique( /* thread_id */ os_thread_id, - /* period */ static_cast(settings.query_profiler_cpu_time_period) + /* period */ static_cast(settings.query_profiler_cpu_time_period_ns) ); } @@ -291,13 +286,6 @@ void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group current_thread->deleter = CurrentThread::defaultThreadDeleter; } -StringRef CurrentThread::getQueryId() -{ - if (unlikely(!current_thread)) - return {}; - return current_thread->getQueryId(); -} - void CurrentThread::attachQueryContext(Context & query_context) { if (unlikely(!current_thread)) diff --git a/dbms/src/Interpreters/TraceCollector.h b/dbms/src/Interpreters/TraceCollector.h deleted file mode 100644 index 43a712a981..0000000000 --- a/dbms/src/Interpreters/TraceCollector.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include - -namespace Poco -{ - class Logger; -} - -namespace DB -{ - -void NotifyTraceCollectorToStop(); - -class TraceCollector : public Poco::Runnable -{ -private: - Poco::Logger * log; - std::shared_ptr trace_log; - -public: - TraceCollector(std::shared_ptr trace_log); - - void run() override; -}; - -} diff --git a/libs/libcommon/CMakeLists.txt b/libs/libcommon/CMakeLists.txt index 7469aabcee..eb77f43a37 100644 --- a/libs/libcommon/CMakeLists.txt +++ b/libs/libcommon/CMakeLists.txt @@ -21,9 +21,10 @@ add_library (common src/demangle.cpp src/setTerminalEcho.cpp src/getThreadNumber.cpp - src/Sleep.cpp + src/sleep.cpp src/argsToConfig.cpp src/StackTrace.cpp + src/Pipe.cpp include/common/SimpleCache.h include/common/StackTrace.h @@ -48,7 +49,7 @@ add_library (common include/common/constexpr_helpers.h include/common/Pipe.h include/common/getThreadNumber.h - include/common/Sleep.h + include/common/sleep.h include/common/SimpleCache.h include/ext/bit_cast.h diff --git a/libs/libcommon/include/common/Pipe.h b/libs/libcommon/include/common/Pipe.h index 3904099c9c..0137c3d97a 100644 --- a/libs/libcommon/include/common/Pipe.h +++ b/libs/libcommon/include/common/Pipe.h @@ -4,56 +4,31 @@ #include #include +/** + * Struct containing a pipe with lazy initialization. + * Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access + * pipe's file descriptors. + */ struct LazyPipe { int fds_rw[2] = {-1, -1}; LazyPipe() = default; - virtual void open() - { - for (int &fd : fds_rw) - { - if (fd >= 0) - { - throw std::logic_error("Pipe is already opened"); - } - } - -#ifndef __APPLE__ - if (0 != pipe2(fds_rw, O_CLOEXEC)) - throw std::runtime_error("Cannot create pipe"); -#else - if (0 != pipe(fds_rw)) - throw std::runtime_error("Cannot create pipe"); - if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC)) - throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe"); - if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC)) - throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe"); -#endif - } - - virtual void close() { - for (int fd : fds_rw) - { - if (fd >= 0) - { - ::close(fd); - } - } - } + void open(); + + void close(); virtual ~LazyPipe() = default; }; -struct Pipe : public LazyPipe { - Pipe() - { - open(); - } +/** + * Struct which opens new pipe on creation and closes it on destruction. + * Use `fds_rw` field to access pipe's file descriptors. + */ +struct Pipe : public LazyPipe +{ + Pipe(); - ~Pipe() - { - close(); - } + ~Pipe(); }; diff --git a/libs/libcommon/include/common/Sleep.h b/libs/libcommon/include/common/Sleep.h deleted file mode 100644 index 7c5c955f72..0000000000 --- a/libs/libcommon/include/common/Sleep.h +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include - -void SleepForNanoseconds(uint64_t nanoseconds); - -void SleepForMicroseconds(uint64_t microseconds); - -void SleepForMilliseconds(uint64_t milliseconds); - -void SleepForSeconds(uint64_t seconds); diff --git a/libs/libcommon/include/common/sleep.h b/libs/libcommon/include/common/sleep.h new file mode 100644 index 0000000000..6ae99a4a57 --- /dev/null +++ b/libs/libcommon/include/common/sleep.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +/** + * Sleep functions tolerant to signal interruptions (which can happen + * when query profiler is turned on for example) + */ + +void sleepForNanoseconds(uint64_t nanoseconds); + +void sleepForMicroseconds(uint64_t microseconds); + +void sleepForMilliseconds(uint64_t milliseconds); + +void sleepForSeconds(uint64_t seconds); diff --git a/libs/libcommon/src/Pipe.cpp b/libs/libcommon/src/Pipe.cpp new file mode 100644 index 0000000000..83268b76ea --- /dev/null +++ b/libs/libcommon/src/Pipe.cpp @@ -0,0 +1,45 @@ +#include "common/Pipe.h" + +void LazyPipe::open() +{ + for (int & fd : fds_rw) + { + if (fd >= 0) + { + throw std::logic_error("Pipe is already opened"); + } + } + +#ifndef __APPLE__ + if (0 != pipe2(fds_rw, O_CLOEXEC)) + throw std::runtime_error("Cannot create pipe"); +#else + if (0 != pipe(fds_rw)) + throw std::runtime_error("Cannot create pipe"); + if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC)) + throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe"); + if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC)) + throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe"); +#endif +} + +void LazyPipe::close() +{ + for (int fd : fds_rw) + { + if (fd >= 0) + { + ::close(fd); + } + } +} + +Pipe::Pipe() +{ + open(); +} + +Pipe::~Pipe() +{ + close(); +} diff --git a/libs/libcommon/src/Sleep.cpp b/libs/libcommon/src/sleep.cpp similarity index 62% rename from libs/libcommon/src/Sleep.cpp rename to libs/libcommon/src/sleep.cpp index 13b03f2b95..710b387d62 100644 --- a/libs/libcommon/src/Sleep.cpp +++ b/libs/libcommon/src/sleep.cpp @@ -1,10 +1,10 @@ -#include "common/Sleep.h" +#include "common/sleep.h" #include #include /** - * Sleep with nanoseconds precision + * Sleep with nanoseconds precision. Tolerant to signal interruptions * * In case query profiler is turned on, all threads spawned for * query execution are repeatedly interrupted by signals from timer. @@ -12,14 +12,14 @@ * problems in this setup and man page for nanosleep(2) suggests * using absolute deadlines, for instance clock_nanosleep(2). */ -void SleepForNanoseconds(uint64_t nanoseconds) +void sleepForNanoseconds(uint64_t nanoseconds) { - const auto clock_type = CLOCK_REALTIME; + constexpr auto clock_type = CLOCK_MONOTONIC; struct timespec current_time; clock_gettime(clock_type, ¤t_time); - const uint64_t resolution = 1'000'000'000; + constexpr uint64_t resolution = 1'000'000'000; struct timespec finish_time = current_time; finish_time.tv_nsec += nanoseconds % resolution; @@ -31,17 +31,17 @@ void SleepForNanoseconds(uint64_t nanoseconds) while (clock_nanosleep(clock_type, TIMER_ABSTIME, &finish_time, nullptr) == EINTR); } -void SleepForMicroseconds(uint64_t microseconds) +void sleepForMicroseconds(uint64_t microseconds) { - SleepForNanoseconds(microseconds * 1000); + sleepForNanoseconds(microseconds * 1000); } -void SleepForMilliseconds(uint64_t milliseconds) +void sleepForMilliseconds(uint64_t milliseconds) { - SleepForMicroseconds(milliseconds * 1000); + sleepForMicroseconds(milliseconds * 1000); } -void SleepForSeconds(uint64_t seconds) +void sleepForSeconds(uint64_t seconds) { - SleepForMilliseconds(seconds * 1000); + sleepForMilliseconds(seconds * 1000); } diff --git a/libs/libmysqlxx/src/Pool.cpp b/libs/libmysqlxx/src/Pool.cpp index 6e35a311b4..a17246e5d6 100644 --- a/libs/libmysqlxx/src/Pool.cpp +++ b/libs/libmysqlxx/src/Pool.cpp @@ -8,7 +8,7 @@ #include -#include +#include #include #include @@ -135,7 +135,7 @@ Pool::Entry Pool::Get() } lock.unlock(); - SleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); + sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); lock.lock(); } } @@ -195,7 +195,7 @@ void Pool::Entry::forceConnected() const if (first) first = false; else - SleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); + sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); app.logger().information("MYSQL: Reconnecting to " + pool->description); data->conn.connect( -- GitLab