提交 093f69c4 编写于 作者: A Alexey Milovidov

Added support for FINAL and PARTITION specification for OPTIMIZE query [#METR-21326].

上级 a078cfa6
......@@ -23,9 +23,13 @@ public:
BlockIO execute() override
{
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
if (ast.final && ast.partition.empty())
throw Exception("FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true);
table->optimize(context.getSettings());
table->optimize(ast.partition, ast.final, context.getSettings());
return {};
}
......
......@@ -15,11 +15,16 @@ public:
String database;
String table;
/// Может быть указана партиция, в которой производить оптимизацию.
String partition;
/// Может быть указан флаг - производить оптимизацию "до конца" вместо одного шага.
bool final;
ASTOptimizeQuery() = default;
ASTOptimizeQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */
String getID() const override { return "OptimizeQuery_" + database + "_" + table; };
String getID() const override { return "OptimizeQuery_" + database + "_" + table + "_" + partition + "_" + toString(final); };
ASTPtr clone() const override { return new ASTOptimizeQuery(*this); }
......@@ -28,6 +33,13 @@ protected:
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
if (!partition.empty())
settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "")
<< partition;
if (final)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : "");
}
};
......
......@@ -7,7 +7,7 @@
namespace DB
{
/** Запрос OPTIMIZE TABLE [db.]name
/** Запрос OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL]
*/
class ParserOptimizeQuery : public IParserBase
{
......
......@@ -257,7 +257,7 @@ public:
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа.
*/
virtual bool optimize(const Settings & settings)
virtual bool optimize(const String & partition, bool final, const Settings & settings)
{
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
......@@ -285,7 +285,7 @@ public:
bool is_dropped{false};
/// Поддерживается ли индекс в секции IN
virtual bool supportsIndexForIn() const { return false; };
virtual bool supportsIndexForIn() const { return false; }
/// проверяет валидность данных
virtual bool checkData() const { throw DB::Exception("Check query is not supported for " + getName() + " storage"); }
......
......@@ -39,6 +39,9 @@ public:
if (right != rhs.right)
return right < rhs.right;
if (level != rhs.level)
return level < rhs.level;
return false;
}
......@@ -49,7 +52,8 @@ public:
&& left_date <= rhs.left_date
&& right_date >= rhs.right_date
&& left <= rhs.left
&& right >= rhs.right;
&& right >= rhs.right
&& level >= rhs.level;
}
};
......
......@@ -24,7 +24,7 @@ public:
break;
ProfileEvents::increment(ProfileEvents::SynchronousMergeOnInsert);
storage.merge(0);
storage.merge(0, {}, {}, {}, {});
}
}
......
......@@ -345,7 +345,8 @@ public:
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts.
* Если clear_without_timeout, данные будут удалены сразу, либо при следующем clearOldParts, игнорируя old_parts_lifetime.
......
......@@ -45,6 +45,17 @@ public:
bool only_small,
const AllowedMergingPredicate & can_merge);
/** Выбрать для слияния все куски в заданной партиции, если возможно.
* final - выбирать для слияния даже единственный кусок - то есть, позволять мерджить один кусок "сам с собой".
*/
bool selectAllPartsToMergeWithinPartition(
MergeTreeData::DataPartsVector & what,
String & merged_name,
size_t available_disk_space,
const AllowedMergingPredicate & can_merge,
DayNum_t partition,
bool final);
/** Сливает куски.
* Если reservation != nullptr, то и дело уменьшает размер зарезервированного места
* приблизительно пропорционально количеству уже выписанных данных.
......
......@@ -51,6 +51,10 @@ struct ReplicatedMergeTreeLogEntryData
}
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const;
String znode_name;
Type type = EMPTY;
......@@ -96,10 +100,6 @@ struct ReplicatedMergeTreeLogEntry : ReplicatedMergeTreeLogEntryData
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const;
static Ptr parse(const String & s, const Stat & stat);
};
......
......@@ -74,7 +74,7 @@ public:
/// Сбрасывает все буферы в подчинённую таблицу.
void shutdown() override;
bool optimize(const Settings & settings) override;
bool optimize(const String & partition, bool final, const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
......
......@@ -30,12 +30,13 @@ public:
bool supportsSampling() const override { return getInnerTable()->supportsSampling(); }
bool supportsPrewhere() const override { return getInnerTable()->supportsPrewhere(); }
bool supportsFinal() const override { return getInnerTable()->supportsFinal(); }
bool supportsIndexForIn() const override { return getInnerTable()->supportsIndexForIn(); }
bool supportsParallelReplicas() const override { return getInnerTable()->supportsParallelReplicas(); }
bool supportsIndexForIn() const override { return getInnerTable()->supportsIndexForIn(); }
Block getIndexSampleBlock() const override { return getInnerTable()->getIndexSampleBlock(); }
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
void drop() override;
bool optimize(const Settings & settings) override;
bool optimize(const String & partition, bool final, const Settings & settings) override;
BlockInputStreams read(
const Names & column_names,
......
......@@ -82,9 +82,9 @@ public:
/** Выполнить очередной шаг объединения кусков.
*/
bool optimize(const Settings & settings) override
bool optimize(const String & partition, bool final, const Settings & settings) override
{
return merge(settings.min_bytes_to_use_direct_io, true);
return merge(settings.min_bytes_to_use_direct_io, true, nullptr, partition, final);
}
void dropPartition(ASTPtr query, const Field & partition, bool detach, bool unreplicated, const Settings & settings) override;
......@@ -189,7 +189,7 @@ private:
* Если aggressive - выбрать куски, не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE).
* Возвращает, получилось ли что-нибудь объединить.
*/
bool merge(size_t aio_threshold, bool aggressive = false, BackgroundProcessingPool::Context * context = nullptr);
bool merge(size_t aio_threshold, bool aggressive, BackgroundProcessingPool::Context * context, const String & partition, bool final);
bool mergeTask(BackgroundProcessingPool::Context & context);
......
......@@ -128,7 +128,7 @@ public:
BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override;
bool optimize(const Settings & settings) override;
bool optimize(const String & partition, bool final, const Settings & settings) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
......@@ -396,6 +396,22 @@ private:
*/
void mergeSelectingThread();
using MemoizedPartsThatCouldBeMerged = std::set<std::pair<std::string, std::string>>;
/// Можно ли мерджить куски в указанном диапазоне? memo - необязательный параметр.
bool canMergeParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
MemoizedPartsThatCouldBeMerged * memo);
/** Записать выбранные куски для слияния в лог,
* Вызывать при заблокированном merge_selecting_mutex.
* Возвращает false, если какого-то куска нет в ZK.
*/
bool createLogEntryToMergeParts(
const MergeTreeData::DataPartsVector & parts,
const String & merged_name,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
/// Обмен кусками.
/** Возвращает пустую строку, если куска ни у кого нет.
......@@ -417,11 +433,11 @@ private:
/** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога.
* Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику.
*/
void waitForAllReplicasToProcessLogEntry(const LogEntry & entry);
void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry);
/** Дождаться, пока указанная реплика выполнит указанное действие из лога.
*/
void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry);
void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry);
/// Кинуть исключение, если таблица readonly.
void assertNotReadonly() const;
......
......@@ -3,6 +3,7 @@
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ParserOptimizeQuery.h>
#include <DB/Parsers/ASTLiteral.h>
namespace DB
......@@ -16,11 +17,16 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max
ParserWhiteSpaceOrComments ws;
ParserString s_optimize("OPTIMIZE", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_partition("PARTITION", true, true);
ParserString s_final("FINAL", true, true);
ParserString s_dot(".");
ParserIdentifier name_p;
ParserLiteral partition_p;
ASTPtr database;
ASTPtr table;
ASTPtr partition;
bool final = false;
ws.ignore(pos, end);
......@@ -50,13 +56,29 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max
ws.ignore(pos, end);
if (s_partition.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
if (!partition_p.parse(pos, end, partition, max_parsed_pos, expected))
return false;
}
ws.ignore(pos, end);
if (s_final.ignore(pos, end, max_parsed_pos, expected))
final = true;
ASTOptimizeQuery * query = new ASTOptimizeQuery(StringRange(begin, pos));
node = query;
if (database)
query->database = typeid_cast<ASTIdentifier &>(*database).name;
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<ASTIdentifier &>(*table).name;
query->table = typeid_cast<const ASTIdentifier &>(*table).name;
if (partition)
query->partition = apply_visitor(FieldVisitorToString(), typeid_cast<const ASTLiteral &>(*partition).value);
query->final = final;
return true;
}
......
......@@ -80,8 +80,14 @@ void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook
/// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки.
/// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности.
bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, String & merged_name, size_t available_disk_space,
bool merge_anything_for_old_months, bool aggressive, bool only_small, const AllowedMergingPredicate & can_merge_callback)
bool MergeTreeDataMerger::selectPartsToMerge(
MergeTreeData::DataPartsVector & parts,
String & merged_name,
size_t available_disk_space,
bool merge_anything_for_old_months,
bool aggressive,
bool only_small,
const AllowedMergingPredicate & can_merge_callback)
{
MergeTreeData::DataParts data_parts = data.getDataParts();
......@@ -175,7 +181,8 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
MergeTreeData::DataParts::iterator jt = it;
while (cur_len < static_cast<int>(data.settings.max_parts_to_merge_at_once)
|| (cur_len < static_cast<int>(data.settings.max_parts_to_merge_at_once_if_small)
&& cur_sum < data.settings.merge_more_parts_if_sum_bytes_is_less_than))
&& cur_sum < data.settings.merge_more_parts_if_sum_bytes_is_less_than)
|| aggressive)
{
const MergeTreeData::DataPartPtr & prev_part = *jt;
++jt;
......@@ -200,7 +207,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
/// Кусок правее предыдущего.
if (last_part->left < cur_id)
{
LOG_WARNING(log, "Part " << last_part->name << " intersects previous part");
LOG_ERROR(log, "Part " << last_part->name << " intersects previous part");
break;
}
......@@ -219,7 +226,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
int cur_age_in_sec = time(0) - newest_modification_time;
/// Если куски больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if (cur_max > 1024 * 1024 * 1024 && cur_age_in_sec < 6 * 3600)
if (cur_max > 1024 * 1024 * 1024 && cur_age_in_sec < 6 * 3600 && !aggressive)
min_len = 3;
/// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
......@@ -316,6 +323,74 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
}
bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
MergeTreeData::DataPartsVector & what,
String & merged_name,
size_t available_disk_space,
const AllowedMergingPredicate & can_merge,
DayNum_t partition,
bool final)
{
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition);
if (parts.empty())
return false;
if (!final && parts.size() == 1)
return false;
MergeTreeData::DataPartsVector::const_iterator it = parts.begin();
MergeTreeData::DataPartsVector::const_iterator prev_it = it;
size_t sum_bytes = 0;
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
while (it != parts.end())
{
if ((it != parts.begin() || parts.size() == 1) /// Для случая одного куска, проверяем, что его можно мерджить "самого с собой".
&& !can_merge(*prev_it, *it))
return false;
level = std::max(level, (*it)->level);
left_date = std::min(left_date, (*it)->left_date);
right_date = std::max(right_date, (*it)->right_date);
sum_bytes += (*it)->size_in_bytes;
prev_it = it;
++it;
}
/// Достаточно места на диске, чтобы покрыть новый мердж с запасом.
if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT)
{
time_t now = time(0);
if (now - disk_space_warning_time > 3600)
{
disk_space_warning_time = now;
LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name
<< " because not enough free space: "
<< formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved "
<< "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace()) << " reserved in "
<< DiskSpaceMonitor::getReservationCount() << " chunks), "
<< formatReadableSizeWithBinarySuffix(sum_bytes)
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< "% on overhead); suppressing similar warnings for the next hour");
}
return false;
}
what = parts;
merged_name = ActiveDataPartSet::getPartName(
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
return true;
}
MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(DayNum_t partition)
{
MergeTreeData::DataPartsVector parts_from_partition;
......
......@@ -9,7 +9,7 @@ namespace DB
{
void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
{
out << "format version: 3\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(0)) << "\n"
......@@ -56,7 +56,7 @@ void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
out << "quorum: " << quorum << '\n';
}
void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in)
void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
{
UInt8 format_version = 0;
String type_str;
......@@ -127,7 +127,7 @@ void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in)
in >> "quorum: " >> quorum >> "\n";
}
String ReplicatedMergeTreeLogEntry::toString() const
String ReplicatedMergeTreeLogEntryData::toString() const
{
String s;
{
......
......@@ -294,7 +294,7 @@ void StorageBuffer::shutdown()
try
{
optimize(context.getSettings());
optimize({}, {}, context.getSettings());
}
catch (...)
{
......@@ -303,10 +303,15 @@ void StorageBuffer::shutdown()
}
bool StorageBuffer::optimize(const Settings & settings)
bool StorageBuffer::optimize(const String & partition, bool final, const Settings & settings)
{
flushAllBuffers(false);
if (!partition.empty())
throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
if (final)
throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
flushAllBuffers(false);
return true;
}
......@@ -505,7 +510,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_
auto lock = lockStructureForAlter();
/// Чтобы не осталось блоков старой структуры.
optimize(context.getSettings());
optimize({}, {}, context.getSettings());
params.apply(*columns, materialized_columns, alias_columns, column_defaults);
InterpreterAlterQuery::updateMetadata(database_name, table_name,
......
......@@ -132,9 +132,9 @@ void StorageMaterializedView::drop()
}
}
bool StorageMaterializedView::optimize(const Settings & settings)
bool StorageMaterializedView::optimize(const String & partition, bool final, const Settings & settings)
{
return getInnerTable()->optimize(settings);
return getInnerTable()->optimize(partition, final, settings);
}
......
......@@ -206,7 +206,12 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa
}
}
bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundProcessingPool::Context * pool_context)
bool StorageMergeTree::merge(
size_t aio_threshold,
bool aggressive,
BackgroundProcessingPool::Context * pool_context,
const String & partition,
bool final)
{
/// Удаляем старые куски.
data.clearOldParts();
......@@ -230,11 +235,21 @@ bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundPr
size_t big_merges = background_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads();
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
bool selected = false;
if (partition.empty())
{
return false;
selected = merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge)
|| merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge);
}
else
{
DayNum_t month = MergeTreeData::getMonthFromName(partition);
selected = merger.selectAllPartsToMergeWithinPartition(parts, merged_name, disk_space, can_merge, month, final);
}
if (!selected)
return false;
merging_tagger = new CurrentlyMergingPartsTagger(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this);
......@@ -270,7 +285,7 @@ bool StorageMergeTree::mergeTask(BackgroundProcessingPool::Context & background_
try
{
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
return merge(aio_threshold, false, &background_processing_pool_context);
return merge(aio_threshold, false, &background_processing_pool_context, {}, {});
}
catch (Exception & e)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册