未验证 提交 2442e653 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #6124 from yandex/merge-profiler

Fixes for query profiler.
Subproject commit 17a48fbfa7913ee889960a698516bd3ba51d63ee Subproject commit 5afe6d87ae9e66485c7fcb106d2f7c2c0359c8f6
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <common/phdr_cache.h>
/// Universal executable for various clickhouse applications /// Universal executable for various clickhouse applications
#if ENABLE_CLICKHOUSE_SERVER || !defined(ENABLE_CLICKHOUSE_SERVER) #if ENABLE_CLICKHOUSE_SERVER || !defined(ENABLE_CLICKHOUSE_SERVER)
int mainEntryClickHouseServer(int argc, char ** argv); int mainEntryClickHouseServer(int argc, char ** argv);
...@@ -144,6 +147,10 @@ int main(int argc_, char ** argv_) ...@@ -144,6 +147,10 @@ int main(int argc_, char ** argv_)
/// It is needed because LLVM library clobbers it. /// It is needed because LLVM library clobbers it.
std::set_new_handler(nullptr); std::set_new_handler(nullptr);
/// PHDR cache is required for query profiler to work reliably
/// It also speed up exception handling, but exceptions from dynamically loaded libraries (dlopen) won't work.
updatePHDRCache();
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
if (argc_ >= 2 && 0 == strcmp(argv_[1], "-cc1")) if (argc_ >= 2 && 0 == strcmp(argv_[1], "-cc1"))
return mainEntryClickHouseClang(argc_, argv_); return mainEntryClickHouseClang(argc_, argv_);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <Poco/Util/HelpFormatter.h> #include <Poco/Util/HelpFormatter.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <common/ErrorHandlers.h> #include <common/ErrorHandlers.h>
#include <common/getMemoryAmount.h> #include <common/getMemoryAmount.h>
#include <Common/ClickHouseRevision.h> #include <Common/ClickHouseRevision.h>
...@@ -509,7 +510,8 @@ int Server::main(const std::vector<std::string> & /*args*/) ...@@ -509,7 +510,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Loaded metadata."); LOG_DEBUG(log, "Loaded metadata.");
/// Init trace collector only after trace_log system table was created /// Init trace collector only after trace_log system table was created
global_context->initializeTraceCollector(); if (hasPHDRCache())
global_context->initializeTraceCollector();
global_context->setCurrentDatabase(default_database); global_context->setCurrentDatabase(default_database);
......
...@@ -437,6 +437,7 @@ namespace ErrorCodes ...@@ -437,6 +437,7 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_TIMER = 460; extern const int CANNOT_CREATE_TIMER = 460;
extern const int CANNOT_SET_TIMER_PERIOD = 461; extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462; extern const int CANNOT_DELETE_TIMER = 462;
extern const int CANNOT_FCNTL = 463;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
......
...@@ -171,6 +171,9 @@ ...@@ -171,6 +171,9 @@
M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \ M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \
M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \ M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \
M(CreatedHTTPConnections, "Total amount of created HTTP connections (closed or opened).") \ M(CreatedHTTPConnections, "Total amount of created HTTP connections (closed or opened).") \
\
M(QueryProfilerCannotWriteTrace, "Number of stack traces dropped by query profiler because pipe is full or cannot write to pipe.") \
M(QueryProfilerSignalOverruns, "Number of times we drop processing of a signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \
namespace ProfileEvents namespace ProfileEvents
{ {
......
#include "QueryProfiler.h" #include "QueryProfiler.h"
#include <random>
#include <pcg_random.hpp>
#include <common/Pipe.h> #include <common/Pipe.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StackTrace.h> #include <common/StackTrace.h>
#include <common/StringRef.h> #include <common/StringRef.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h> #include <IO/WriteBufferFromFileDescriptor.h>
namespace ProfileEvents
{
extern const Event QueryProfilerCannotWriteTrace;
extern const Event QueryProfilerSignalOverruns;
}
namespace DB namespace DB
{ {
...@@ -16,23 +28,76 @@ extern LazyPipe trace_pipe; ...@@ -16,23 +28,76 @@ extern LazyPipe trace_pipe;
namespace namespace
{ {
/** Write to file descriptor but drop the data if write would block or fail.
* To use within signal handler. Motivating example: a signal handler invoked during execution of malloc
* should not block because some mutex (or even worse - a spinlock) may be held.
*/
class WriteBufferDiscardOnFailure : public WriteBufferFromFileDescriptor
{
protected:
void nextImpl() override
{
size_t bytes_written = 0;
while (bytes_written != offset())
{
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
if ((-1 == res || 0 == res) && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerCannotWriteTrace);
break; /// Discard
}
if (res > 0)
bytes_written += res;
}
}
public:
using WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor;
~WriteBufferDiscardOnFailure() override {}
};
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024; constexpr size_t QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context) thread_local size_t write_trace_iteration = 0;
thread_local pcg64 rng{randomSeed()};
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
{ {
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (info && info->si_overrun > 0)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % info->si_overrun == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1);
return;
}
}
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size 8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type sizeof(TimerType) + // timer type
sizeof(UInt32); // thread_number
char buffer[buf_size]; char buffer[buf_size];
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer); WriteBufferDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId(); StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt32 thread_number = CurrentThread::get().thread_number;
const auto signal_context = *reinterpret_cast<ucontext_t *>(context); const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context); const StackTrace stack_trace(signal_context);
...@@ -40,6 +105,7 @@ namespace ...@@ -40,6 +105,7 @@ namespace
writeStringBinary(query_id, out); writeStringBinary(query_id, out);
writePODBinary(stack_trace, out); writePODBinary(stack_trace, out);
writePODBinary(timer_type, out); writePODBinary(timer_type, out);
writePODBinary(thread_number, out);
out.next(); out.next();
} }
...@@ -56,10 +122,19 @@ namespace ErrorCodes ...@@ -56,10 +122,19 @@ namespace ErrorCodes
} }
template <typename ProfilerImpl> template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal) QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const int clock_type, UInt32 period, const int pause_signal)
: log(&Logger::get("QueryProfiler")) : log(&Logger::get("QueryProfiler"))
, pause_signal(pause_signal) , pause_signal(pause_signal)
{ {
#if USE_INTERNAL_UNWIND_LIBRARY
/// Sanity check.
if (!hasPHDRCache())
throw Exception("QueryProfiler cannot be used without PHDR cache, that is not available for TSan build", ErrorCodes::NOT_IMPLEMENTED);
/// Too high frequency can introduce infinite busy loop of signal handlers. We will limit maximum frequency (with 1000 signals per second).
if (period < 1000000)
period = 1000000;
struct sigaction sa{}; struct sigaction sa{};
sa.sa_sigaction = ProfilerImpl::signalHandler; sa.sa_sigaction = ProfilerImpl::signalHandler;
sa.sa_flags = SA_SIGINFO | SA_RESTART; sa.sa_flags = SA_SIGINFO | SA_RESTART;
...@@ -82,8 +157,16 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const ...@@ -82,8 +157,16 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
if (timer_create(clock_type, &sev, &timer_id)) if (timer_create(clock_type, &sev, &timer_id))
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER); throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
/// Randomize offset as uniform random value from 0 to period - 1.
/// It will allow to sample short queries even if timer period is large.
/// (For example, with period of 1 second, query with 50 ms duration will be sampled with 1 / 20 probability).
/// It also helps to avoid interference (moire).
UInt32 period_rand = std::uniform_int_distribution<UInt32>(0, period)(rng);
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION}; struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval}; struct timespec offset{.tv_sec = period_rand / TIMER_PRECISION, .tv_nsec = period_rand % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = offset};
if (timer_settime(timer_id, 0, &timer_spec, nullptr)) if (timer_settime(timer_id, 0, &timer_spec, nullptr))
throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD); throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD);
} }
...@@ -92,6 +175,9 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const ...@@ -92,6 +175,9 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
tryCleanup(); tryCleanup();
throw; throw;
} }
#else
throw Exception("QueryProfiler cannot work with stock libunwind", ErrorCodes::NOT_IMPLEMENTED);
#endif
} }
template <typename ProfilerImpl> template <typename ProfilerImpl>
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <signal.h> #include <signal.h>
#include <time.h> #include <time.h>
namespace Poco namespace Poco
{ {
class Logger; class Logger;
...@@ -34,8 +35,7 @@ template <typename ProfilerImpl> ...@@ -34,8 +35,7 @@ template <typename ProfilerImpl>
class QueryProfilerBase class QueryProfilerBase
{ {
public: public:
QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM); QueryProfilerBase(const Int32 thread_id, const int clock_type, UInt32 period, const int pause_signal);
~QueryProfilerBase(); ~QueryProfilerBase();
private: private:
......
#include "SharedLibrary.h" #include "SharedLibrary.h"
#include <string> #include <string>
#include <boost/core/noncopyable.hpp> #include <boost/core/noncopyable.hpp>
#include <common/phdr_cache.h>
#include "Exception.h" #include "Exception.h"
...@@ -17,6 +18,8 @@ SharedLibrary::SharedLibrary(const std::string & path, int flags) ...@@ -17,6 +18,8 @@ SharedLibrary::SharedLibrary(const std::string & path, int flags)
handle = dlopen(path.c_str(), flags); handle = dlopen(path.c_str(), flags);
if (!handle) if (!handle)
throw Exception(std::string("Cannot dlopen: ") + dlerror(), ErrorCodes::CANNOT_DLOPEN); throw Exception(std::string("Cannot dlopen: ") + dlerror(), ErrorCodes::CANNOT_DLOPEN);
updatePHDRCache();
} }
SharedLibrary::~SharedLibrary() SharedLibrary::~SharedLibrary()
......
...@@ -12,6 +12,10 @@ ...@@ -12,6 +12,10 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Interpreters/TraceLog.h> #include <Interpreters/TraceLog.h>
#include <unistd.h>
#include <fcntl.h>
namespace DB namespace DB
{ {
...@@ -21,6 +25,7 @@ namespace ErrorCodes ...@@ -21,6 +25,7 @@ namespace ErrorCodes
{ {
extern const int NULL_POINTER_DEREFERENCE; extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE; extern const int THREAD_IS_NOT_JOINABLE;
extern const int CANNOT_FCNTL;
} }
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log) TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
...@@ -31,6 +36,28 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log) ...@@ -31,6 +36,28 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE); throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open(); trace_pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
constexpr int max_pipe_capacity_to_set = 1048576;
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
thread = ThreadFromGlobalPool(&TraceCollector::run, this); thread = ThreadFromGlobalPool(&TraceCollector::run, this);
} }
...@@ -78,10 +105,12 @@ void TraceCollector::run() ...@@ -78,10 +105,12 @@ void TraceCollector::run()
std::string query_id; std::string query_id;
StackTrace stack_trace(NoCapture{}); StackTrace stack_trace(NoCapture{});
TimerType timer_type; TimerType timer_type;
UInt32 thread_number;
readStringBinary(query_id, in); readStringBinary(query_id, in);
readPODBinary(stack_trace, in); readPODBinary(stack_trace, in);
readPODBinary(timer_type, in); readPODBinary(timer_type, in);
readPODBinary(thread_number, in);
const auto size = stack_trace.getSize(); const auto size = stack_trace.getSize();
const auto & frames = stack_trace.getFrames(); const auto & frames = stack_trace.getFrames();
...@@ -91,7 +120,7 @@ void TraceCollector::run() ...@@ -91,7 +120,7 @@ void TraceCollector::run()
for (size_t i = 0; i < size; i++) for (size_t i = 0; i < size; i++)
trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i]))); trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i])));
TraceLogElement element{std::time(nullptr), timer_type, query_id, trace}; TraceLogElement element{std::time(nullptr), timer_type, thread_number, query_id, trace};
trace_log->add(element); trace_log->add(element);
} }
......
...@@ -221,8 +221,8 @@ struct Settings : public SettingsCollection<Settings> ...@@ -221,8 +221,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \ M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \ M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \
M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \ M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Highly experimental. Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.") \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \ M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Highly experimental. Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.") \
\ \
\ \
/** Limits during query execution are part of the settings. \ /** Limits during query execution are part of the settings. \
......
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsIntrospection.h>
namespace DB
{
void registerFunctionsIntrospection(FunctionFactory & factory)
{
factory.registerFunction<FunctionSymbolizeTrace>();
}
}
...@@ -40,6 +40,7 @@ void registerFunctionsIntrospection(FunctionFactory &); ...@@ -40,6 +40,7 @@ void registerFunctionsIntrospection(FunctionFactory &);
void registerFunctionsNull(FunctionFactory &); void registerFunctionsNull(FunctionFactory &);
void registerFunctionsFindCluster(FunctionFactory &); void registerFunctionsFindCluster(FunctionFactory &);
void registerFunctionsJSON(FunctionFactory &); void registerFunctionsJSON(FunctionFactory &);
void registerFunctionSymbolizeAddress(FunctionFactory &);
void registerFunctions() void registerFunctions()
{ {
...@@ -75,10 +76,10 @@ void registerFunctions() ...@@ -75,10 +76,10 @@ void registerFunctions()
registerFunctionsVisitParam(factory); registerFunctionsVisitParam(factory);
registerFunctionsMath(factory); registerFunctionsMath(factory);
registerFunctionsGeo(factory); registerFunctionsGeo(factory);
registerFunctionsIntrospection(factory);
registerFunctionsNull(factory); registerFunctionsNull(factory);
registerFunctionsFindCluster(factory); registerFunctionsFindCluster(factory);
registerFunctionsJSON(factory); registerFunctionsJSON(factory);
registerFunctionSymbolizeAddress(factory);
} }
} }
#pragma once #include <dlfcn.h>
#include <unordered_map>
#include <common/StackTrace.h> #include <optional>
#include <common/unaligned.h>
#include <common/demangle.h>
#include <common/SimpleCache.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
namespace DB namespace DB
{ {
...@@ -21,13 +24,13 @@ namespace ErrorCodes ...@@ -21,13 +24,13 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
} }
class FunctionSymbolizeTrace : public IFunction class FunctionSymbolizeAddress : public IFunction
{ {
public: public:
static constexpr auto name = "symbolizeTrace"; static constexpr auto name = "symbolizeAddress";
static FunctionPtr create(const Context &) static FunctionPtr create(const Context &)
{ {
return std::make_shared<FunctionSymbolizeTrace>(); return std::make_shared<FunctionSymbolizeAddress>();
} }
String getName() const override String getName() const override
...@@ -44,20 +47,13 @@ public: ...@@ -44,20 +47,13 @@ public:
{ {
if (arguments.size() != 1) if (arguments.size() != 1)
throw Exception("Function " + getName() + " needs exactly one argument; passed " throw Exception("Function " + getName() + " needs exactly one argument; passed "
+ toString(arguments.size()) + ".", + toString(arguments.size()) + ".", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto array_type = checkAndGetDataType<DataTypeArray>(arguments[0].type.get());
if (!array_type) const auto & type = arguments[0].type;
throw Exception("The only argument for function " + getName() + " must be array. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypePtr nested_type = array_type->getNestedType(); if (!WhichDataType(type.get()).isUInt64())
throw Exception("The only argument for function " + getName() + " must be UInt64. Found "
if (!WhichDataType(nested_type).isUInt64()) + type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("The only argument for function " + getName() + " must be array of UInt64. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>(); return std::make_shared<DataTypeString>();
} }
...@@ -67,41 +63,53 @@ public: ...@@ -67,41 +63,53 @@ public:
return true; return true;
} }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override static std::string addressToSymbol(UInt64 uint_address)
{ {
const ColumnPtr column = block.getByPosition(arguments[0]).column; void * addr = unalignedLoad<void *>(&uint_address);
const ColumnArray * column_array = checkAndGetColumn<ColumnArray>(column.get());
if (!column_array) /// This is extremely slow.
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(), Dl_info info;
ErrorCodes::ILLEGAL_COLUMN); if (dladdr(addr, &info) && info.dli_sname)
{
int demangling_status = 0;
return demangle(info.dli_sname, demangling_status);
}
else
{
return {};
}
}
const ColumnPtr data_ptr = column_array->getDataPtr(); void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
const ColumnVector<UInt64> * data_vector = checkAndGetColumn<ColumnVector<UInt64>>(&*data_ptr); {
const ColumnPtr & column = block.getByPosition(arguments[0]).column;
const ColumnUInt64 * column_concrete = checkAndGetColumn<ColumnUInt64>(column.get());
const typename ColumnVector<UInt64>::Container & data = data_vector->getData(); if (!column_concrete)
const ColumnArray::Offsets & offsets = column_array->getOffsets(); throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
const typename ColumnVector<UInt64>::Container & data = column_concrete->getData();
auto result_column = ColumnString::create(); auto result_column = ColumnString::create();
StackTrace::Frames frames; static SimpleCache<decltype(addressToSymbol), &addressToSymbol> func_cached;
size_t current_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
size_t current_size = 0;
for (; current_size < frames.size() && current_offset + current_size < offsets[i]; ++current_size)
{
frames[current_size] = reinterpret_cast<void *>(data[current_offset + current_size]);
}
std::string backtrace = StackTrace(frames.begin(), frames.begin() + current_size).toString();
result_column->insertDataWithTerminatingZero(backtrace.c_str(), backtrace.length() + 1);
current_offset = offsets[i]; for (size_t i = 0; i < input_rows_count; ++i)
{
std::string symbol = func_cached(data[i]);
result_column->insertDataWithTerminatingZero(symbol.data(), symbol.size() + 1);
} }
block.getByPosition(result).column = std::move(result_column); block.getByPosition(result).column = std::move(result_column);
/// Do not let our cache to grow indefinitely (simply drop it)
if (func_cached.size() > 1000000)
func_cached.drop();
} }
}; };
void registerFunctionSymbolizeAddress(FunctionFactory & factory)
{
factory.registerFunction<FunctionSymbolizeAddress>();
}
} }
...@@ -41,8 +41,6 @@ void ThreadStatus::attachQueryContext(Context & query_context_) ...@@ -41,8 +41,6 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
if (!thread_group->global_context) if (!thread_group->global_context)
thread_group->global_context = global_context; thread_group->global_context = global_context;
} }
initQueryProfiler();
} }
void CurrentThread::defaultThreadDeleter() void CurrentThread::defaultThreadDeleter()
...@@ -124,6 +122,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool ...@@ -124,6 +122,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
#endif #endif
initPerformanceCounters(); initPerformanceCounters();
initQueryProfiler();
thread_state = ThreadState::AttachedToQuery; thread_state = ThreadState::AttachedToQuery;
} }
...@@ -155,7 +154,7 @@ void ThreadStatus::finalizePerformanceCounters() ...@@ -155,7 +154,7 @@ void ThreadStatus::finalizePerformanceCounters()
void ThreadStatus::initQueryProfiler() void ThreadStatus::initQueryProfiler()
{ {
/// query profilers are useless without trace collector /// query profilers are useless without trace collector
if (!global_context->hasTraceCollector()) if (!global_context || !global_context->hasTraceCollector())
return; return;
const auto & settings = query_context->getSettingsRef(); const auto & settings = query_context->getSettingsRef();
...@@ -163,14 +162,12 @@ void ThreadStatus::initQueryProfiler() ...@@ -163,14 +162,12 @@ void ThreadStatus::initQueryProfiler()
if (settings.query_profiler_real_time_period_ns > 0) if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>( query_profiler_real = std::make_unique<QueryProfilerReal>(
/* thread_id */ os_thread_id, /* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns) /* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
);
if (settings.query_profiler_cpu_time_period_ns > 0) if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>( query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
/* thread_id */ os_thread_id, /* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns) /* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
);
} }
void ThreadStatus::finalizeQueryProfiler() void ThreadStatus::finalizeQueryProfiler()
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <Common/ClickHouseRevision.h>
using namespace DB; using namespace DB;
...@@ -20,13 +22,15 @@ Block TraceLogElement::createBlock() ...@@ -20,13 +22,15 @@ Block TraceLogElement::createBlock()
{ {
{std::make_shared<DataTypeDate>(), "event_date"}, {std::make_shared<DataTypeDate>(), "event_date"},
{std::make_shared<DataTypeDateTime>(), "event_time"}, {std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "revision"},
{std::make_shared<TimerDataType>(timer_values), "timer_type"}, {std::make_shared<TimerDataType>(timer_values), "timer_type"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeString>(), "query_id"}, {std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"} {std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"}
}; };
} }
void TraceLogElement::appendToBlock(Block &block) const void TraceLogElement::appendToBlock(Block & block) const
{ {
MutableColumns columns = block.mutateColumns(); MutableColumns columns = block.mutateColumns();
...@@ -34,7 +38,9 @@ void TraceLogElement::appendToBlock(Block &block) const ...@@ -34,7 +38,9 @@ void TraceLogElement::appendToBlock(Block &block) const
columns[i++]->insert(DateLUT::instance().toDayNum(event_time)); columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
columns[i++]->insert(event_time); columns[i++]->insert(event_time);
columns[i++]->insert(ClickHouseRevision::get());
columns[i++]->insert(static_cast<UInt8>(timer_type)); columns[i++]->insert(static_cast<UInt8>(timer_type));
columns[i++]->insert(thread_number);
columns[i++]->insertData(query_id.data(), query_id.size()); columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace); columns[i++]->insert(trace);
......
...@@ -15,7 +15,8 @@ struct TraceLogElement ...@@ -15,7 +15,8 @@ struct TraceLogElement
static const TimerDataType::Values timer_values; static const TimerDataType::Values timer_values;
time_t event_time{}; time_t event_time{};
TimerType timer_type; TimerType timer_type{};
UInt32 thread_number{};
String query_id{}; String query_id{};
Array trace{}; Array trace{};
......
SET query_profiler_real_time_period_ns = 100000000;
SET log_queries = 1;
SELECT sleep(0.5), ignore('test real time query profiler');
SET log_queries = 0;
SYSTEM FLUSH LOGS;
WITH symbolizeAddress(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND query_id = (SELECT query_id FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%test real time query profiler%' ORDER BY event_time DESC LIMIT 1) AND symbol LIKE '%FunctionSleep%';
SET query_profiler_real_time_period_ns = 0;
SET query_profiler_cpu_time_period_ns = 100000000;
SET log_queries = 1;
SELECT count(), ignore('test cpu time query profiler') FROM numbers(1000000000);
SET log_queries = 0;
SYSTEM FLUSH LOGS;
WITH symbolizeAddress(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND query_id = (SELECT query_id FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%test cpu time query profiler%' ORDER BY event_time DESC LIMIT 1) AND symbol LIKE '%Numbers%';
...@@ -25,6 +25,7 @@ add_library (common ...@@ -25,6 +25,7 @@ add_library (common
src/argsToConfig.cpp src/argsToConfig.cpp
src/StackTrace.cpp src/StackTrace.cpp
src/Pipe.cpp src/Pipe.cpp
src/phdr_cache.cpp
include/common/SimpleCache.h include/common/SimpleCache.h
include/common/StackTrace.h include/common/StackTrace.h
...@@ -51,6 +52,7 @@ add_library (common ...@@ -51,6 +52,7 @@ add_library (common
include/common/getThreadNumber.h include/common/getThreadNumber.h
include/common/sleep.h include/common/sleep.h
include/common/SimpleCache.h include/common/SimpleCache.h
include/common/phdr_cache.h
include/ext/bit_cast.h include/ext/bit_cast.h
include/ext/collection_cast.h include/ext/collection_cast.h
......
...@@ -26,7 +26,7 @@ private: ...@@ -26,7 +26,7 @@ private:
using Result = typename function_traits<F>::result; using Result = typename function_traits<F>::result;
std::map<Key, Result> cache; std::map<Key, Result> cache;
std::mutex mutex; mutable std::mutex mutex;
public: public:
template <typename... Args> template <typename... Args>
...@@ -66,6 +66,12 @@ public: ...@@ -66,6 +66,12 @@ public:
} }
} }
size_t size() const
{
std::lock_guard lock(mutex);
return cache.size();
}
void drop() void drop()
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
......
...@@ -17,7 +17,7 @@ struct NoCapture ...@@ -17,7 +17,7 @@ struct NoCapture
}; };
/// Tries to capture current stack trace using libunwind or signal context /// Tries to capture current stack trace using libunwind or signal context
/// NOTE: StackTrace calculation is signal safe only if enablePHDRCache() was called beforehand. /// NOTE: StackTrace calculation is signal safe only if updatePHDRCache() was called beforehand.
class StackTrace class StackTrace
{ {
public: public:
......
...@@ -8,3 +8,4 @@ ...@@ -8,3 +8,4 @@
#cmakedefine01 USE_LIBEDIT #cmakedefine01 USE_LIBEDIT
#cmakedefine01 HAVE_READLINE_HISTORY #cmakedefine01 HAVE_READLINE_HISTORY
#cmakedefine01 UNBUNDLED #cmakedefine01 UNBUNDLED
#cmakedefine01 USE_INTERNAL_UNWIND_LIBRARY
#pragma once
/// This code was based on the code by Fedor Korotkiy (prime@yandex-team.ru) for YT product in Yandex.
/** Collects all dl_phdr_info items and caches them in a static array.
* Also rewrites dl_iterate_phdr with a lock-free version which consults the above cache
* thus eliminating scalability bottleneck in C++ exception unwinding.
* As a drawback, this only works if no dynamic object unloading happens after this point.
* This function is thread-safe. You should call it to update cache after loading new shared libraries.
* Otherwise exception handling from dlopened libraries won't work (will call std::terminate immediately).
*
* NOTE: It is disabled with Thread Sanitizer because TSan can only use original "dl_iterate_phdr" function.
*/
void updatePHDRCache();
/** Check if "dl_iterate_phdr" will be lock-free
* to determine if some features like Query Profiler can be used.
*/
bool hasPHDRCache();
/// This code was based on the code by Fedor Korotkiy (prime@yandex-team.ru) for YT product in Yandex.
#if defined(__has_feature)
#if __has_feature(address_sanitizer)
#define ADDRESS_SANITIZER 1
#endif
#if __has_feature(thread_sanitizer)
#define THREAD_SANITIZER 1
#endif
#else
#if defined(__SANITIZE_ADDRESS__)
#define ADDRESS_SANITIZER 1
#endif
#if defined(__SANITIZE_THREAD__)
#define THREAD_SANITIZER 1
#endif
#endif
#if defined(__linux__) && !defined(THREAD_SANITIZER)
#define USE_PHDR_CACHE 1
#endif
/// Thread Sanitizer uses dl_iterate_phdr function on initialization and fails if we provide our own.
#ifdef USE_PHDR_CACHE
#include <link.h>
#include <dlfcn.h>
#include <vector>
#include <atomic>
#include <cstddef>
#include <stdexcept>
namespace
{
// This is adapted from
// https://github.com/scylladb/seastar/blob/master/core/exception_hacks.hh
// https://github.com/scylladb/seastar/blob/master/core/exception_hacks.cc
using DLIterateFunction = int (*) (int (*callback) (dl_phdr_info * info, size_t size, void * data), void * data);
DLIterateFunction getOriginalDLIteratePHDR()
{
void * func = dlsym(RTLD_NEXT, "dl_iterate_phdr");
if (!func)
throw std::runtime_error("Cannot find dl_iterate_phdr function with dlsym");
return reinterpret_cast<DLIterateFunction>(func);
}
using PHDRCache = std::vector<dl_phdr_info>;
std::atomic<PHDRCache *> phdr_cache {};
}
extern "C"
#ifndef __clang__
[[gnu::visibility("default")]]
[[gnu::externally_visible]]
#endif
int dl_iterate_phdr(int (*callback) (dl_phdr_info * info, size_t size, void * data), void * data)
{
auto current_phdr_cache = phdr_cache.load();
if (!current_phdr_cache)
{
// Cache is not yet populated, pass through to the original function.
return getOriginalDLIteratePHDR()(callback, data);
}
int result = 0;
for (auto & entry : *current_phdr_cache)
{
result = callback(&entry, offsetof(dl_phdr_info, dlpi_adds), data);
if (result != 0)
break;
}
return result;
}
extern "C"
{
#ifdef ADDRESS_SANITIZER
void __lsan_ignore_object(const void *);
#else
void __lsan_ignore_object(const void *) {}
#endif
}
void updatePHDRCache()
{
// Fill out ELF header cache for access without locking.
// This assumes no dynamic object loading/unloading after this point
PHDRCache * new_phdr_cache = new PHDRCache;
getOriginalDLIteratePHDR()([] (dl_phdr_info * info, size_t /*size*/, void * data)
{
reinterpret_cast<PHDRCache *>(data)->push_back(*info);
return 0;
}, new_phdr_cache);
phdr_cache.store(new_phdr_cache);
/// Memory is intentionally leaked.
__lsan_ignore_object(new_phdr_cache);
}
bool hasPHDRCache()
{
return phdr_cache.load() != nullptr;
}
#else
void updatePHDRCache() {}
bool hasPHDRCache() { return false; }
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册