提交 108bfac1 编写于 作者: V Vitaliy Lyudvichenko

Added memory tracking events and columns for merges. [#METR-23911]

上级 220c7f2b
...@@ -14,6 +14,9 @@ callgrind.out.* ...@@ -14,6 +14,9 @@ callgrind.out.*
*.kdev4 *.kdev4
*.kdev_include_paths *.kdev_include_paths
# Qt Creator files
*.user
# ignore perf output # ignore perf output
*/perf.data */perf.data
......
...@@ -11,9 +11,9 @@ namespace CurrentMetrics ...@@ -11,9 +11,9 @@ namespace CurrentMetrics
} }
/** Отслеживает потребление памяти. /** Tracks memory consumption.
* Кидает исключение, если оно стало бы больше некоторого предельного значения. * It throws an exception if amount of consumed memory become greater than certain limit.
* Один объект может использоваться одновременно в разных потоках. * The same memory tracker could be simultaneously used in different threads.
*/ */
class MemoryTracker class MemoryTracker
{ {
...@@ -21,16 +21,17 @@ class MemoryTracker ...@@ -21,16 +21,17 @@ class MemoryTracker
std::atomic<Int64> peak {0}; std::atomic<Int64> peak {0};
Int64 limit {0}; Int64 limit {0};
/// В целях тестирования exception safety - кидать исключение при каждом выделении памяти с указанной вероятностью. /// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0; double fault_probability = 0;
/// Односвязный список. Вся информация будет передаваться в следующие MemoryTracker-ы тоже. Они должны жить во время жизни данного MemoryTracker. /// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
MemoryTracker * next = nullptr; MemoryTracker * next = nullptr;
/// You could specify custom metric to track memory usage. /// You could specify custom metric to track memory usage.
CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking; CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking;
/// Если задано (например, "for user") - в сообщениях в логе будет указываться это описание. /// This description will be used as prefix into log messages (if isn't nullptr)
const char * description = nullptr; const char * description = nullptr;
public: public:
...@@ -39,7 +40,7 @@ public: ...@@ -39,7 +40,7 @@ public:
~MemoryTracker(); ~MemoryTracker();
/** Вызывайте эти функции перед соответствующими операциями с памятью. /** Call the following functions before calling of corresponding operations with memory allocators.
*/ */
void alloc(Int64 size); void alloc(Int64 size);
...@@ -48,7 +49,7 @@ public: ...@@ -48,7 +49,7 @@ public:
alloc(new_size - old_size); alloc(new_size - old_size);
} }
/** А эту функцию имеет смысл вызывать после освобождения памяти. /** This function should be called after memory deallocation.
*/ */
void free(Int64 size); void free(Int64 size);
...@@ -77,6 +78,7 @@ public: ...@@ -77,6 +78,7 @@ public:
next = elem; next = elem;
} }
/// The memory consumption could be shown in realtime via CurrentMetrics counter
void setMetric(CurrentMetrics::Metric metric_) void setMetric(CurrentMetrics::Metric metric_)
{ {
metric = metric_; metric = metric_;
...@@ -87,10 +89,10 @@ public: ...@@ -87,10 +89,10 @@ public:
description = description_; description = description_;
} }
/// Обнулить накопленные данные. /// Reset the accumulated data.
void reset(); void reset();
/// Вывести в лог информацию о пиковом потреблении памяти. /// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const; void logPeakMemoryUsage() const;
}; };
......
...@@ -235,6 +235,9 @@ public: ...@@ -235,6 +235,9 @@ public:
MergeList & getMergeList(); MergeList & getMergeList();
const MergeList & getMergeList() const; const MergeList & getMergeList() const;
/// Global memory tracker for merges
MemoryTracker * getMergesMemoryTracker() const;
/// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз. /// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз.
void setUncompressedCache(size_t max_size_in_bytes); void setUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const; std::shared_ptr<UncompressedCache> getUncompressedCache() const;
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#include <DB/Common/Stopwatch.h> #include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h> #include <DB/Common/CurrentMetrics.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/Interpreters/Context.h>
#include <memory> #include <memory>
#include <list> #include <list>
#include <mutex> #include <mutex>
...@@ -22,6 +24,26 @@ namespace DB ...@@ -22,6 +24,26 @@ namespace DB
struct MergeInfo struct MergeInfo
{
std::string database;
std::string table;
std::string result_part_name;
Float64 elapsed;
Float64 progress;
UInt64 num_parts;
UInt64 total_size_bytes_compressed;
UInt64 total_size_marks;
UInt64 bytes_read_uncompressed;
UInt64 bytes_written_uncompressed;
UInt64 rows_read;
UInt64 rows_written;
UInt64 columns_written;
UInt64 memory_usage;
UInt64 thread_number;
};
struct MergeListElement : boost::noncopyable
{ {
const std::string database; const std::string database;
const std::string table; const std::string table;
...@@ -41,27 +63,19 @@ struct MergeInfo ...@@ -41,27 +63,19 @@ struct MergeInfo
/// Updated only for Vertical algorithm /// Updated only for Vertical algorithm
std::atomic<UInt64> columns_written{}; std::atomic<UInt64> columns_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) MemoryTracker memory_tracker;
: database{database}, table{table}, result_part_name{result_part_name} MemoryTracker * background_pool_task_memory_tracker;
{ MemoryTracker * global_merges_memory_tracker;
}
MergeInfo(const MergeInfo & other) /// Poco thread number used in logs
: database(other.database), UInt32 thread_number;
table(other.table),
result_part_name(other.result_part_name),
watch(other.watch), MergeListElement(const std::string & database, const std::string & table, const std::string & result_part_name, const Context & context);
progress(other.progress),
num_parts(other.num_parts), MergeInfo getInfo() const;
total_size_bytes_compressed(other.total_size_bytes_compressed),
total_size_marks(other.total_size_marks), ~MergeListElement();
bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)),
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed)),
columns_written(other.columns_written.load(std::memory_order_relaxed))
{
}
}; };
...@@ -71,7 +85,7 @@ class MergeListEntry ...@@ -71,7 +85,7 @@ class MergeListEntry
{ {
MergeList & list; MergeList & list;
using container_t = std::list<MergeInfo>; using container_t = std::list<MergeListElement>;
container_t::iterator it; container_t::iterator it;
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge}; CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
...@@ -83,7 +97,7 @@ public: ...@@ -83,7 +97,7 @@ public:
MergeListEntry(MergeList & list, const container_t::iterator it) : list(list), it{it} {} MergeListEntry(MergeList & list, const container_t::iterator it) : list(list), it{it} {}
~MergeListEntry(); ~MergeListEntry();
MergeInfo * operator->() { return &*it; } MergeListElement * operator->() { return &*it; }
}; };
...@@ -91,7 +105,8 @@ class MergeList ...@@ -91,7 +105,8 @@ class MergeList
{ {
friend class MergeListEntry; friend class MergeListEntry;
using container_t = std::list<MergeInfo>; using container_t = std::list<MergeListElement>;
using info_container_t = std::list<MergeInfo>;
mutable std::mutex mutex; mutable std::mutex mutex;
container_t merges; container_t merges;
...@@ -107,10 +122,13 @@ public: ...@@ -107,10 +122,13 @@ public:
return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...)); return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
} }
container_t get() const info_container_t get() const
{ {
std::lock_guard<std::mutex> lock{mutex}; std::lock_guard<std::mutex> lock{mutex};
return merges; info_container_t res;
for (const auto & merge_element : merges)
res.emplace_back(merge_element.getInfo());
return res;
} }
}; };
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
M(LeaderReplica) \ M(LeaderReplica) \
M(MemoryTracking) \ M(MemoryTracking) \
M(MemoryTrackingInBackgroundProcessingPool) \ M(MemoryTrackingInBackgroundProcessingPool) \
M(MemoryTrackingForMerges) \
M(LeaderElection) \ M(LeaderElection) \
M(EphemeralNode) \ M(EphemeralNode) \
M(ZooKeeperWatch) \ M(ZooKeeperWatch) \
......
...@@ -86,7 +86,7 @@ ...@@ -86,7 +86,7 @@
\ \
M(MergedRows) \ M(MergedRows) \
M(MergedUncompressedBytes) \ M(MergedUncompressedBytes) \
M(MergesTime)\ M(MergesTimeMilliseconds)\
\ \
M(MergeTreeDataWriterRows) \ M(MergeTreeDataWriterRows) \
M(MergeTreeDataWriterUncompressedBytes) \ M(MergeTreeDataWriterUncompressedBytes) \
......
...@@ -51,6 +51,7 @@ namespace ProfileEvents ...@@ -51,6 +51,7 @@ namespace ProfileEvents
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric ContextLockWait; extern const Metric ContextLockWait;
extern const Metric MemoryTrackingForMerges;
} }
...@@ -127,6 +128,9 @@ struct ContextShared ...@@ -127,6 +128,9 @@ struct ContextShared
Poco::UUIDGenerator uuid_generator; Poco::UUIDGenerator uuid_generator;
/// Aggregates memory trackers from all merges
mutable std::unique_ptr<MemoryTracker> merges_memory_tracker;
bool shutdown_called = false; bool shutdown_called = false;
/// Позволяют запретить одновременное выполнение DDL запросов над одной и той же таблицей. /// Позволяют запретить одновременное выполнение DDL запросов над одной и той же таблицей.
...@@ -215,6 +219,18 @@ const ProcessList & Context::getProcessList() const { return shared->proc ...@@ -215,6 +219,18 @@ const ProcessList & Context::getProcessList() const { return shared->proc
MergeList & Context::getMergeList() { return shared->merge_list; } MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; }
MemoryTracker * Context::getMergesMemoryTracker() const
{
auto lock = getLock();
if (!shared->merges_memory_tracker)
{
shared->merges_memory_tracker = std::make_unique<MemoryTracker>();
shared->merges_memory_tracker->setMetric(CurrentMetrics::MemoryTrackingForMerges);
}
return shared->merges_memory_tracker.get();
}
const Databases Context::getDatabases() const const Databases Context::getDatabases() const
{ {
......
#include <DB/Storages/MergeTree/MergeList.h>
#include <Poco/Ext/ThreadNumber.h>
namespace DB
{
MergeListElement::MergeListElement(const std::string & database, const std::string & table, const std::string & result_part_name,
const Context & context)
: database{database}, table{table}, result_part_name{result_part_name}, thread_number{Poco::ThreadNumber::get()}
{
/// Each merge is executed into separate background processing pool thread
background_pool_task_memory_tracker = current_memory_tracker;
if (background_pool_task_memory_tracker)
{
background_pool_task_memory_tracker->setNext(&memory_tracker);
memory_tracker.setNext(context.getMergesMemoryTracker());
}
}
MergeInfo MergeListElement::getInfo() const
{
MergeInfo res;
res.database = database;
res.table = table;
res.result_part_name = result_part_name;
res.elapsed = watch.elapsedSeconds();
res.progress = progress;
res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_marks = total_size_marks;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed);
res.rows_read = rows_read.load(std::memory_order_relaxed);
res.rows_written = rows_written.load(std::memory_order_relaxed);
res.columns_written = columns_written.load(std::memory_order_relaxed);
res.memory_usage = memory_tracker.get();
res.thread_number = thread_number;
return res;
}
MergeListElement::~MergeListElement()
{
/// Unplug memory_tracker from current background processing pool thread
if (background_pool_task_memory_tracker)
background_pool_task_memory_tracker->setNext(nullptr);
}
}
...@@ -32,7 +32,7 @@ namespace ProfileEvents ...@@ -32,7 +32,7 @@ namespace ProfileEvents
{ {
extern const Event MergedRows; extern const Event MergedRows;
extern const Event MergedUncompressedBytes; extern const Event MergedUncompressedBytes;
extern const Event MergesTime; extern const Event MergesTimeMilliseconds;
} }
namespace CurrentMetrics namespace CurrentMetrics
...@@ -419,7 +419,7 @@ public: ...@@ -419,7 +419,7 @@ public:
void updateWatch() void updateWatch()
{ {
UInt64 watch_curr_elapsed = merge_entry->watch.elapsed(); UInt64 watch_curr_elapsed = merge_entry->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTime, watch_curr_elapsed - watch_prev_elapsed); ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed; watch_prev_elapsed = watch_curr_elapsed;
} }
...@@ -840,7 +840,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( ...@@ -840,7 +840,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
std::string merged_name = createMergedPartName(parts); std::string merged_name = createMergedPartName(parts);
MergeList::EntryPtr merge_entry_ptr = data.context.getMergeList().insert(job.database_name, MergeList::EntryPtr merge_entry_ptr = data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name); job.table_name, merged_name, data.context);
MergeList::Entry & merge_entry = *merge_entry_ptr; MergeList::Entry & merge_entry = *merge_entry_ptr;
merge_entry->num_parts = parts.size(); merge_entry->num_parts = parts.size();
......
...@@ -339,10 +339,10 @@ bool StorageMergeTree::merge( ...@@ -339,10 +339,10 @@ bool StorageMergeTree::merge(
merging_tagger.emplace(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this); merging_tagger.emplace(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this);
} }
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, context);
auto new_part = merger.mergePartsToTemporaryPart( auto new_part = merger.mergePartsToTemporaryPart(
merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), merging_tagger->reserved_space.get()); merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get());
merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr);
......
...@@ -1145,7 +1145,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1145,7 +1145,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
auto table_lock = lockStructure(false); auto table_lock = lockStructure(false);
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name); MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, context);
MergeTreeData::Transaction transaction; MergeTreeData::Transaction transaction;
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
...@@ -2304,7 +2304,7 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final, ...@@ -2304,7 +2304,7 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final,
if (unreplicated_merger->selectPartsToMerge(parts, merged_name, true, 0, always_can_merge)) if (unreplicated_merger->selectPartsToMerge(parts, merged_name, true, 0, always_can_merge))
{ {
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, merged_name, context);
auto new_part = unreplicated_merger->mergePartsToTemporaryPart( auto new_part = unreplicated_merger->mergePartsToTemporaryPart(
parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0)); parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0));
......
...@@ -25,7 +25,9 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) ...@@ -25,7 +25,9 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "rows_read", std::make_shared<DataTypeUInt64>() }, { "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() }, { "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() }, { "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() } { "columns_written", std::make_shared<DataTypeUInt64>() },
{ "memory_usage", std::make_shared<DataTypeUInt64>() },
{ "thread_number", std::make_shared<DataTypeUInt64>() },
} }
{ {
} }
...@@ -54,17 +56,19 @@ BlockInputStreams StorageSystemMerges::read( ...@@ -54,17 +56,19 @@ BlockInputStreams StorageSystemMerges::read(
size_t i = 0; size_t i = 0;
block.unsafeGetByPosition(i++).column->insert(merge.database); block.unsafeGetByPosition(i++).column->insert(merge.database);
block.unsafeGetByPosition(i++).column->insert(merge.table); block.unsafeGetByPosition(i++).column->insert(merge.table);
block.unsafeGetByPosition(i++).column->insert(merge.watch.elapsedSeconds()); block.unsafeGetByPosition(i++).column->insert(merge.elapsed);
block.unsafeGetByPosition(i++).column->insert(std::min(1., merge.progress)); /// little cheat block.unsafeGetByPosition(i++).column->insert(std::min(1., merge.progress)); /// little cheat
block.unsafeGetByPosition(i++).column->insert(merge.num_parts); block.unsafeGetByPosition(i++).column->insert(merge.num_parts);
block.unsafeGetByPosition(i++).column->insert(merge.result_part_name); block.unsafeGetByPosition(i++).column->insert(merge.result_part_name);
block.unsafeGetByPosition(i++).column->insert(merge.total_size_bytes_compressed); block.unsafeGetByPosition(i++).column->insert(merge.total_size_bytes_compressed);
block.unsafeGetByPosition(i++).column->insert(merge.total_size_marks); block.unsafeGetByPosition(i++).column->insert(merge.total_size_marks);
block.unsafeGetByPosition(i++).column->insert(merge.bytes_read_uncompressed.load(std::memory_order_relaxed)); block.unsafeGetByPosition(i++).column->insert(merge.bytes_read_uncompressed);
block.unsafeGetByPosition(i++).column->insert(merge.rows_read.load(std::memory_order_relaxed)); block.unsafeGetByPosition(i++).column->insert(merge.rows_read);
block.unsafeGetByPosition(i++).column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); block.unsafeGetByPosition(i++).column->insert(merge.bytes_written_uncompressed);
block.unsafeGetByPosition(i++).column->insert(merge.rows_written.load(std::memory_order_relaxed)); block.unsafeGetByPosition(i++).column->insert(merge.rows_written);
block.unsafeGetByPosition(i++).column->insert(merge.columns_written.load(std::memory_order_relaxed)); block.unsafeGetByPosition(i++).column->insert(merge.columns_written);
block.unsafeGetByPosition(i++).column->insert(merge.memory_usage);
block.unsafeGetByPosition(i++).column->insert(merge.thread_number);
} }
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)}; return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册