未验证 提交 f5b75234 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #5899 from yandex/aku/perftest-mem

Report memory usage in performance tests.
......@@ -14,10 +14,15 @@ std::vector<XMLConfigurationPtr> ConfigPreprocessor::processConfig(
{
std::vector<XMLConfigurationPtr> result;
for (const auto & path : paths)
for (const auto & path_str : paths)
{
result.emplace_back(XMLConfigurationPtr(new XMLConfiguration(path)));
result.back()->setString("path", Poco::Path(path).absolute().toString());
auto test = XMLConfigurationPtr(new XMLConfiguration(path_str));
result.push_back(test);
const auto path = Poco::Path(path_str);
test->setString("path", path.absolute().toString());
if (test->getString("name", "") == "")
test->setString("name", path.getBaseName());
}
/// Leave tests:
......
......@@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/CpuId.h>
#include <common/getMemoryAmount.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
......@@ -296,6 +297,47 @@ void PerformanceTest::runQueries(
break;
}
}
if (got_SIGINT)
{
return;
}
// Pull memory usage data from query log. The log is normally filled in
// background, so we have to flush it synchronously here to see all the
// previous queries.
{
RemoteBlockInputStream flush_log(connection, "system flush logs",
{} /* header */, context);
flush_log.readPrefix();
while (flush_log.read());
flush_log.readSuffix();
}
for (auto & statistics : statistics_by_run)
{
RemoteBlockInputStream log_reader(connection,
"select memory_usage from system.query_log where type = 2 and query_id = '"
+ statistics.query_id + "'",
{} /* header */, context);
log_reader.readPrefix();
Block block = log_reader.read();
if (block.columns() == 0)
{
LOG_WARNING(log, "Query '" << statistics.query_id << "' is not found in query log.");
continue;
}
assert(block.columns() == 1);
assert(block.getDataTypes()[0]->getName() == "UInt64");
ColumnPtr column = block.getByPosition(0).column;
assert(column->size() == 1);
StringRef ref = column->getDataAt(0);
assert(ref.size == sizeof(UInt64));
statistics.memory_usage = *reinterpret_cast<const UInt64*>(ref.data);
log_reader.readSuffix();
}
}
......
......@@ -260,15 +260,12 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
if (input_files.empty())
throw DB::Exception("Did not find any xml files", DB::ErrorCodes::BAD_ARGUMENTS);
else
LOG_INFO(log, "Found " << input_files.size() << " files");
}
else
{
input_files = options["input-files"].as<std::vector<std::string>>();
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
std::vector<std::string> collected_files;
std::vector<std::string> collected_files;
for (const std::string & filename : input_files)
{
fs::path file(filename);
......@@ -290,6 +287,8 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
input_files = std::move(collected_files);
}
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
std::sort(input_files.begin(), input_files.end());
return input_files;
}
......
......@@ -157,6 +157,8 @@ std::string ReportBuilder::buildFullReport(
runJSON.set("avg_bytes_per_second", statistics.avg_bytes_speed_value);
}
runJSON.set("memory_usage", statistics.memory_usage);
run_infos.push_back(runJSON);
}
}
......
......@@ -19,6 +19,7 @@ struct TestStats
Stopwatch avg_bytes_speed_watch;
bool last_query_was_cancelled = false;
std::string query_id;
size_t queries = 0;
......@@ -49,6 +50,8 @@ struct TestStats
size_t number_of_rows_speed_info_batches = 0;
size_t number_of_bytes_speed_info_batches = 0;
UInt64 memory_usage = 0;
bool ready = false; // check if a query wasn't interrupted by SIGINT
std::string exception;
......
......@@ -2,9 +2,11 @@
#include <IO/Progress.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Core/Block.h>
#include <Poco/UUIDGenerator.h>
namespace DB
{
namespace
{
......@@ -36,7 +38,7 @@ void checkFulfilledConditionsAndUpdate(
}
}
}
} // anonymous namespace
void executeQuery(
Connection & connection,
......@@ -47,12 +49,18 @@ void executeQuery(
Context & context,
const Settings & settings)
{
static const std::string query_id_prefix
= Poco::UUIDGenerator::defaultGenerator().create().toString() + "-";
static int next_query_id = 1;
statistics.watch_per_query.restart();
statistics.last_query_was_cancelled = false;
statistics.last_query_rows_read = 0;
statistics.last_query_bytes_read = 0;
statistics.query_id = query_id_prefix + std::to_string(next_query_id++);
RemoteBlockInputStream stream(connection, query, {}, context, &settings);
stream.setQueryId(statistics.query_id);
stream.setProgressCallback(
[&](const Progress & value)
......@@ -70,4 +78,5 @@ void executeQuery(
statistics.setTotalTime();
}
}
......@@ -292,7 +292,7 @@ void RemoteBlockInputStream::sendQuery()
established = true;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true);
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, &context.getClientInfo(), true);
established = false;
sent_query = true;
......
......@@ -46,6 +46,11 @@ public:
~RemoteBlockInputStream() override;
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the
/// server.
void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; }
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; }
......@@ -95,6 +100,7 @@ private:
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
const String query;
String query_id = "";
Context context;
/// Temporary tables needed to be sent to remote servers
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册