提交 5ed6a1d4 编写于 作者: A Alexey Milovidov

dbms: QueryLog: development [#METR-16946].

上级 719f0ee8
......@@ -39,6 +39,8 @@ struct BlockIO
{
if (exception_callback)
exception_callback();
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
BlockIO & operator= (const BlockIO & rhs)
......
......@@ -34,7 +34,8 @@ struct QueryLogElement
SHUTDOWN = 0, /// Эта запись имеет служебное значение.
QUERY_START = 1,
QUERY_FINISH = 2,
EXCEPTION = 3,
EXCEPTION_BEFORE_START = 3,
EXCEPTION_WHILE_PROCESSING = 4,
};
Type type = QUERY_START;
......@@ -51,6 +52,8 @@ struct QueryLogElement
UInt64 result_rows{};
UInt64 result_bytes{};
UInt64 memory_usage{};
String query;
String exception;
......
......@@ -83,7 +83,7 @@ struct QuotaForInterval
/// Увеличить соответствующее значение.
void addQuery(time_t current_time, const String & quota_name);
void addError(time_t current_time, const String & quota_name);
void addError(time_t current_time, const String & quota_name) noexcept;
/// Проверить, не превышена ли квота уже. Если превышена - кидает исключение.
void checkExceeded(time_t current_time, const String & quota_name);
......@@ -139,7 +139,7 @@ public:
void setMax(const QuotaForIntervals & quota);
void addQuery(time_t current_time);
void addError(time_t current_time);
void addError(time_t current_time) noexcept;
void checkExceeded(time_t current_time);
......
......@@ -189,6 +189,8 @@ Block QueryLog::createBlock()
{new ColumnUInt64, new DataTypeUInt64, "result_rows"},
{new ColumnUInt64, new DataTypeUInt64, "result_bytes"},
{new ColumnUInt64, new DataTypeUInt64, "memory_usage"},
{new ColumnString, new DataTypeString, "query"},
{new ColumnString, new DataTypeString, "exception"},
{new ColumnString, new DataTypeString, "stack_trace"},
......@@ -214,25 +216,6 @@ void QueryLog::flush()
for (const QueryLogElement & elem : data)
{
block.unsafeGetByPosition(0).column.get()->insert(static_cast<UInt64>(elem.type));
block.unsafeGetByPosition(1).column.get()->insert(static_cast<UInt64>(date_lut.toDayNum(elem.event_time)));
block.unsafeGetByPosition(2).column.get()->insert(static_cast<UInt64>(elem.event_time));
block.unsafeGetByPosition(3).column.get()->insert(static_cast<UInt64>(elem.query_start_time));
block.unsafeGetByPosition(4).column.get()->insert(static_cast<UInt64>(elem.query_duration_ms));
block.unsafeGetByPosition(5).column.get()->insert(static_cast<UInt64>(elem.read_rows));
block.unsafeGetByPosition(6).column.get()->insert(static_cast<UInt64>(elem.read_bytes));
block.unsafeGetByPosition(7).column.get()->insert(static_cast<UInt64>(elem.result_rows));
block.unsafeGetByPosition(8).column.get()->insert(static_cast<UInt64>(elem.result_bytes));
block.unsafeGetByPosition(9).column.get()->insertData(elem.query.data(), elem.query.size());
block.unsafeGetByPosition(10).column.get()->insertData(elem.exception.data(), elem.exception.size());
block.unsafeGetByPosition(11).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size());
block.unsafeGetByPosition(12).column.get()->insert(static_cast<UInt64>(elem.interface));
block.unsafeGetByPosition(13).column.get()->insert(static_cast<UInt64>(elem.http_method));
char ipv6_binary[16];
if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family())
{
......@@ -249,10 +232,33 @@ void QueryLog::flush()
else
memset(ipv6_binary, 0, 16);
block.unsafeGetByPosition(14).column.get()->insertData(ipv6_binary, 16);
size_t i = 0;
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.type));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(date_lut.toDayNum(elem.event_time)));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.event_time));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_start_time));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_duration_ms));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_rows));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_bytes));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_rows));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_bytes));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.memory_usage));
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query.data(), elem.query.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.exception.data(), elem.exception.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size());
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.interface));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.http_method));
block.unsafeGetByPosition(i++).column.get()->insertData(ipv6_binary, 16);
block.unsafeGetByPosition(15).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(16).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
}
BlockOutputStreamPtr stream = table->write(nullptr);
......
......@@ -66,7 +66,7 @@ void QuotaForInterval::addQuery(time_t current_time, const String & quota_name)
__sync_fetch_and_add(&used.queries, 1);
}
void QuotaForInterval::addError(time_t current_time, const String & quota_name)
void QuotaForInterval::addError(time_t current_time, const String & quota_name) noexcept
{
__sync_fetch_and_add(&used.errors, 1);
}
......@@ -181,7 +181,7 @@ void QuotaForIntervals::addQuery(time_t current_time)
it->second.addQuery(current_time, name);
}
void QuotaForIntervals::addError(time_t current_time)
void QuotaForIntervals::addError(time_t current_time) noexcept
{
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.addError(current_time, name);
......
......@@ -43,6 +43,53 @@ static void logQuery(const String & query, const Context & context)
}
static void setClientInfo(QueryLogElement & elem, Context & context)
{
elem.interface = context.getInterface();
elem.http_method = context.getHTTPMethod();
elem.ip_address = context.getIPAddress();
elem.user = context.getUser();
elem.query_id = context.getCurrentQueryId();
}
static void onExceptionBeforeStart(const String & query, Context & context, time_t current_time)
{
/// Эксепшен до начала выполнения запроса.
context.getQuota().addError(current_time);
bool log_queries = context.getSettingsRef().log_queries;
/// Логгируем в таблицу начало выполнения запроса, если нужно.
if (log_queries)
{
QueryLogElement elem;
elem.type = QueryLogElement::EXCEPTION_BEFORE_START;
elem.event_time = current_time;
elem.query_start_time = current_time;
elem.query = query;
elem.exception = getCurrentExceptionMessage(false);
setClientInfo(elem, context);
try
{
throw;
}
catch (const Exception & e)
{
elem.stack_trace = e.getStackTrace().toString();
}
catch (...) {}
context.getQueryLog().add(elem);
}
}
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
IParser::Pos begin,
IParser::Pos end,
......@@ -50,9 +97,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool internal,
QueryProcessingStage::Enum stage)
{
/// TODO Логгировать здесь эксепшены, возникающие до начала выполнения запроса.
ProfileEvents::increment(ProfileEvents::Query);
time_t current_time = time(0);
ParserQuery parser;
ASTPtr ast;
......@@ -73,100 +119,103 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
catch (...)
{
/// Всё равно логгируем запрос.
logQuery(String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size))), context);
if (!internal)
{
String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));
logQuery(query, context);
tryLogCurrentException(__PRETTY_FUNCTION__);
onExceptionBeforeStart(query, context, current_time);
}
throw;
}
String query(begin, query_size);
BlockIO res;
if (!internal)
logQuery(query, context);
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
try
{
if (!internal)
logQuery(query, context);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
quota.checkExceeded(current_time);
QuotaForIntervals & quota = context.getQuota();
const Settings & settings = context.getSettingsRef();
quota.checkExceeded(current_time);
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
settings.limits.max_memory_usage,
settings.queue_max_wait_ms.totalMilliseconds(),
settings.replace_running_query,
settings.priority);
context.setProcessListElement(&process_list_entry->get());
}
const Settings & settings = context.getSettingsRef();
BlockIO res;
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
settings.limits.max_memory_usage,
settings.queue_max_wait_ms.totalMilliseconds(),
settings.replace_running_query,
settings.priority);
context.setProcessListElement(&process_list_entry->get());
}
try
{
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time); /// TODO Было бы лучше добавить ещё в exception_callback
throw;
}
quota.addQuery(current_time);
quota.addQuery(current_time);
/// Всё, что связано с логом запросов.
{
QueryLogElement elem;
/// Всё, что связано с логом запросов.
{
QueryLogElement elem;
elem.type = QueryLogElement::QUERY_START;
elem.type = QueryLogElement::QUERY_START;
elem.event_time = current_time;
elem.query_start_time = current_time;
elem.event_time = current_time;
elem.query_start_time = current_time;
elem.query = query;
elem.query = query;
elem.interface = context.getInterface();
elem.http_method = context.getHTTPMethod();
elem.ip_address = context.getIPAddress();
elem.user = context.getUser();
elem.query_id = context.getCurrentQueryId();
setClientInfo(elem, context);
bool log_queries = settings.log_queries;
bool log_queries = settings.log_queries;
/// Логгируем в таблицу начало выполнения запроса, если нужно.
if (log_queries)
context.getQueryLog().add(elem);
/// Логгируем в таблицу начало выполнения запроса, если нужно.
if (log_queries)
context.getQueryLog().add(elem);
/// Также дадим вызывающему коду в дальнейшем логгировать завершение запроса и эксепшен.
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream & stream) mutable
{
elem.type = QueryLogElement::QUERY_FINISH;
/// Также дадим вызывающему коду в дальнейшем логгировать завершение запроса и эксепшен.
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream & stream) mutable
{
ProcessListElement * process_list_elem = context.getProcessListElement();
elem.event_time = time(0);
elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time); /// Грубое время для запросов без profiling_stream;
if (!process_list_elem)
return;
if (IProfilingBlockInputStream * profiling_stream = dynamic_cast<IProfilingBlockInputStream *>(&stream))
{
const BlockStreamProfileInfo & info = profiling_stream->getInfo();
double elapsed_seconds = process_list_elem->watch.elapsedSeconds();
elem.type = QueryLogElement::QUERY_FINISH;
double elapsed_seconds = info.total_stopwatch.elapsedSeconds(); /// TODO этот Stopwatch - coarse, использовать другой
elem.event_time = time(0);
elem.query_duration_ms = elapsed_seconds * 1000;
stream.getLeafRowsBytes(elem.read_rows, elem.read_bytes); /// TODO неверно для распределённых запросов?
elem.read_rows = process_list_elem->progress.rows;
elem.read_bytes = process_list_elem->progress.bytes;
elem.result_rows = info.rows;
elem.result_bytes = info.bytes;
auto memory_usage = process_list_elem->memory_tracker.getPeak();
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
if (IProfilingBlockInputStream * profiling_stream = dynamic_cast<IProfilingBlockInputStream *>(&stream))
{
const BlockStreamProfileInfo & info = profiling_stream->getInfo();
elem.result_rows = info.rows;
elem.result_bytes = info.bytes;
}
if (elem.read_rows != 0)
{
......@@ -176,48 +225,70 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
<< static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec.");
}
}
if (log_queries)
context.getQueryLog().add(elem);
};
if (log_queries)
context.getQueryLog().add(elem);
};
res.exception_callback = [elem, &context, log_queries] () mutable
{
elem.type = QueryLogElement::EXCEPTION;
res.exception_callback = [elem, &context, log_queries, current_time] () mutable
{
context.getQuota().addError(current_time);
elem.event_time = time(0);
elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time); /// Низкая точность. Можно сделать лучше.
elem.exception = getCurrentExceptionMessage(false);
elem.type = QueryLogElement::EXCEPTION_WHILE_PROCESSING;
/// Достаём стек трейс, если возможно.
try
{
throw;
}
catch (const Exception & e)
{
elem.stack_trace = e.getStackTrace().toString();
elem.event_time = time(0);
elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
elem.exception = getCurrentExceptionMessage(false);
LOG_ERROR(&Logger::get("executeQuery"), elem.exception << ", Stack trace:\n\n" << elem.stack_trace);
}
catch (...)
{
LOG_ERROR(&Logger::get("executeQuery"), elem.exception);
}
ProcessListElement * process_list_elem = context.getProcessListElement();
if (log_queries)
context.getQueryLog().add(elem);
};
if (process_list_elem)
{
double elapsed_seconds = process_list_elem->watch.elapsedSeconds();
if (!internal && res.in)
{
std::stringstream log_str;
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
elem.query_duration_ms = elapsed_seconds * 1000;
elem.read_rows = process_list_elem->progress.rows;
elem.read_bytes = process_list_elem->progress.bytes;
auto memory_usage = process_list_elem->memory_tracker.getPeak();
elem.memory_usage = memory_usage > 0 ? memory_usage : 0;
}
/// Достаём стек трейс, если возможно.
try
{
throw;
}
catch (const Exception & e)
{
elem.stack_trace = e.getStackTrace().toString();
LOG_ERROR(&Logger::get("executeQuery"), elem.exception << ", Stack trace:\n\n" << elem.stack_trace);
}
catch (...)
{
LOG_ERROR(&Logger::get("executeQuery"), elem.exception);
}
if (log_queries)
context.getQueryLog().add(elem);
};
if (!internal && res.in)
{
std::stringstream log_str;
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
}
}
}
catch (...)
{
onExceptionBeforeStart(query, context, current_time);
throw;
}
return std::make_tuple(ast, res);
}
......@@ -274,7 +345,6 @@ void executeQuery(
std::tie(ast, streams) = executeQueryImpl(begin, end, context, internal, stage);
bool exception = false;
try
{
if (streams.out)
......@@ -325,12 +395,11 @@ void executeQuery(
}
catch (...)
{
exception = true;
streams.onException();
throw;
}
if (!exception)
streams.onFinish();
streams.onFinish();
}
}
......@@ -146,9 +146,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
context.setHTTPMethod(http_method);
Stopwatch watch;
executeQuery(*in, *used_output.out_maybe_compressed, context, query_plan);
watch.stop();
/// Если не было эксепшена и данные ещё не отправлены - отправляются HTTP заголовки с кодом 200.
used_output.out->finalize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册