diff --git a/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h b/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h index f36ecb1b05e444461afbe04c11c8808a37e5f116..216a43910cbb588e89286e9e67923c6c92e214f4 100644 --- a/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h @@ -23,9 +23,13 @@ public: BlockIO execute() override { const ASTOptimizeQuery & ast = typeid_cast(*query_ptr); + + if (ast.final && ast.partition.empty()) + throw Exception("FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS); + StoragePtr table = context.getTable(ast.database, ast.table); auto table_lock = table->lockStructure(true); - table->optimize(context.getSettings()); + table->optimize(ast.partition, ast.final, context.getSettings()); return {}; } diff --git a/dbms/include/DB/Parsers/ASTOptimizeQuery.h b/dbms/include/DB/Parsers/ASTOptimizeQuery.h index 2fb6921d2ed59945c3d5f7adef44f9077dad7704..ed856c3dcdac1714188c5a0a61859c76bf0695ac 100644 --- a/dbms/include/DB/Parsers/ASTOptimizeQuery.h +++ b/dbms/include/DB/Parsers/ASTOptimizeQuery.h @@ -15,11 +15,16 @@ public: String database; String table; + /// Может быть указана партиция, в которой производить оптимизацию. + String partition; + /// Может быть указан флаг - производить оптимизацию "до конца" вместо одного шага. + bool final; + ASTOptimizeQuery() = default; ASTOptimizeQuery(const StringRange range_) : IAST(range_) {} - + /** Получить текст, который идентифицирует этот элемент. */ - String getID() const override { return "OptimizeQuery_" + database + "_" + table; }; + String getID() const override { return "OptimizeQuery_" + database + "_" + table + "_" + partition + "_" + toString(final); }; ASTPtr clone() const override { return new ASTOptimizeQuery(*this); } @@ -28,6 +33,13 @@ protected: { settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "") << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); + + if (!partition.empty()) + settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "") + << partition; + + if (final) + settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : ""); } }; diff --git a/dbms/include/DB/Parsers/ParserOptimizeQuery.h b/dbms/include/DB/Parsers/ParserOptimizeQuery.h index c41ba5ead1dc1a4c7483b08d309ffc2674ca6d3f..dfe3f263ae5253b9ef2132ff43ff176670763b76 100644 --- a/dbms/include/DB/Parsers/ParserOptimizeQuery.h +++ b/dbms/include/DB/Parsers/ParserOptimizeQuery.h @@ -7,7 +7,7 @@ namespace DB { -/** Запрос OPTIMIZE TABLE [db.]name +/** Запрос OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL] */ class ParserOptimizeQuery : public IParserBase { diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index e31f6fc09eacaa36e383cca21976c8900174d6c2..a0c6f647a0766d1ecd1094d951c7825ba75f98d6 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -257,7 +257,7 @@ public: /** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree. * Возвращает - была ли выполнена какая-либо работа. */ - virtual bool optimize(const Settings & settings) + virtual bool optimize(const String & partition, bool final, const Settings & settings) { throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } @@ -285,7 +285,7 @@ public: bool is_dropped{false}; /// Поддерживается ли индекс в секции IN - virtual bool supportsIndexForIn() const { return false; }; + virtual bool supportsIndexForIn() const { return false; } /// проверяет валидность данных virtual bool checkData() const { throw DB::Exception("Check query is not supported for " + getName() + " storage"); } diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index f43e7148c6fe58122156f0aae66880d8aec5641f..e788f6f028ff95e56cc9461cda4805c2857c0d77 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -39,6 +39,9 @@ public: if (right != rhs.right) return right < rhs.right; + if (level != rhs.level) + return level < rhs.level; + return false; } @@ -49,7 +52,8 @@ public: && left_date <= rhs.left_date && right_date >= rhs.right_date && left <= rhs.left - && right >= rhs.right; + && right >= rhs.right + && level >= rhs.level; } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index a079de307800ee0c43345ad17cc0ad9122d47f88..750b824ebd86b74a924d2af4092aedc069cbdc43 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -24,7 +24,7 @@ public: break; ProfileEvents::increment(ProfileEvents::SynchronousMergeOnInsert); - storage.merge(0); + storage.merge(0, {}, {}, {}, {}); } } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 75873bead0ec44e9393a71b340ca1c7d6773a268..d705f2a2b658890fc1cc0cdba9561b31e6558aad 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -345,7 +345,8 @@ public: /** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски. * Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке). */ - DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); + DataPartsVector renameTempPartAndReplace( + MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); /** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts. * Если clear_without_timeout, данные будут удалены сразу, либо при следующем clearOldParts, игнорируя old_parts_lifetime. diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index d9788aefab6cace0bd93bf858e75e89d2ce8b7df..509f7856af5557e5916d0dee493347af01bcef88 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -45,6 +45,17 @@ public: bool only_small, const AllowedMergingPredicate & can_merge); + /** Выбрать для слияния все куски в заданной партиции, если возможно. + * final - выбирать для слияния даже единственный кусок - то есть, позволять мерджить один кусок "сам с собой". + */ + bool selectAllPartsToMergeWithinPartition( + MergeTreeData::DataPartsVector & what, + String & merged_name, + size_t available_disk_space, + const AllowedMergingPredicate & can_merge, + DayNum_t partition, + bool final); + /** Сливает куски. * Если reservation != nullptr, то и дело уменьшает размер зарезервированного места * приблизительно пропорционально количеству уже выписанных данных. diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 1c632a7317f1bac5ae239a3da6ba885831cc216b..4d26226f41e53254d8f4b43a30b2125115677381 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -51,6 +51,10 @@ struct ReplicatedMergeTreeLogEntryData } } + void writeText(WriteBuffer & out) const; + void readText(ReadBuffer & in); + String toString() const; + String znode_name; Type type = EMPTY; @@ -96,10 +100,6 @@ struct ReplicatedMergeTreeLogEntry : ReplicatedMergeTreeLogEntryData std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false. - void writeText(WriteBuffer & out) const; - void readText(ReadBuffer & in); - - String toString() const; static Ptr parse(const String & s, const Stat & stat); }; diff --git a/dbms/include/DB/Storages/StorageBuffer.h b/dbms/include/DB/Storages/StorageBuffer.h index b37352d076f093c20f31d568f1f8d92314081587..c6bdbca75816fe7f5300847116bac87a070eca47 100644 --- a/dbms/include/DB/Storages/StorageBuffer.h +++ b/dbms/include/DB/Storages/StorageBuffer.h @@ -74,7 +74,7 @@ public: /// Сбрасывает все буферы в подчинённую таблицу. void shutdown() override; - bool optimize(const Settings & settings) override; + bool optimize(const String & partition, bool final, const Settings & settings) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; } diff --git a/dbms/include/DB/Storages/StorageMaterializedView.h b/dbms/include/DB/Storages/StorageMaterializedView.h index 533d20f1e2cad59163288ccdd536272f2eeb3032..17c7683ada4ea0fe8be4e9c1648f4be42aae8ff6 100644 --- a/dbms/include/DB/Storages/StorageMaterializedView.h +++ b/dbms/include/DB/Storages/StorageMaterializedView.h @@ -30,12 +30,13 @@ public: bool supportsSampling() const override { return getInnerTable()->supportsSampling(); } bool supportsPrewhere() const override { return getInnerTable()->supportsPrewhere(); } bool supportsFinal() const override { return getInnerTable()->supportsFinal(); } - bool supportsIndexForIn() const override { return getInnerTable()->supportsIndexForIn(); } bool supportsParallelReplicas() const override { return getInnerTable()->supportsParallelReplicas(); } + bool supportsIndexForIn() const override { return getInnerTable()->supportsIndexForIn(); } + Block getIndexSampleBlock() const override { return getInnerTable()->getIndexSampleBlock(); } BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override; void drop() override; - bool optimize(const Settings & settings) override; + bool optimize(const String & partition, bool final, const Settings & settings) override; BlockInputStreams read( const Names & column_names, diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index d96876f4d1e5a1267c3eb691efb2fe8879ec3656..c2d69bcd5c3e73f745328d1aca85036f1bb9fd76 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -82,9 +82,9 @@ public: /** Выполнить очередной шаг объединения кусков. */ - bool optimize(const Settings & settings) override + bool optimize(const String & partition, bool final, const Settings & settings) override { - return merge(settings.min_bytes_to_use_direct_io, true); + return merge(settings.min_bytes_to_use_direct_io, true, nullptr, partition, final); } void dropPartition(ASTPtr query, const Field & partition, bool detach, bool unreplicated, const Settings & settings) override; @@ -189,7 +189,7 @@ private: * Если aggressive - выбрать куски, не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE). * Возвращает, получилось ли что-нибудь объединить. */ - bool merge(size_t aio_threshold, bool aggressive = false, BackgroundProcessingPool::Context * context = nullptr); + bool merge(size_t aio_threshold, bool aggressive, BackgroundProcessingPool::Context * context, const String & partition, bool final); bool mergeTask(BackgroundProcessingPool::Context & context); diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 1857079ce4e0882b4ae9a31bfb6958a4117edd9a..a226510278929d890ce3b808dc32c11344524d29 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -128,7 +128,7 @@ public: BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override; - bool optimize(const Settings & settings) override; + bool optimize(const String & partition, bool final, const Settings & settings) override; void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override; @@ -396,6 +396,22 @@ private: */ void mergeSelectingThread(); + using MemoizedPartsThatCouldBeMerged = std::set>; + /// Можно ли мерджить куски в указанном диапазоне? memo - необязательный параметр. + bool canMergeParts( + const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + MemoizedPartsThatCouldBeMerged * memo); + + /** Записать выбранные куски для слияния в лог, + * Вызывать при заблокированном merge_selecting_mutex. + * Возвращает false, если какого-то куска нет в ZK. + */ + bool createLogEntryToMergeParts( + const MergeTreeData::DataPartsVector & parts, + const String & merged_name, + ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); + /// Обмен кусками. /** Возвращает пустую строку, если куска ни у кого нет. @@ -417,11 +433,11 @@ private: /** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога. * Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику. */ - void waitForAllReplicasToProcessLogEntry(const LogEntry & entry); + void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry); /** Дождаться, пока указанная реплика выполнит указанное действие из лога. */ - void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry); + void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); /// Кинуть исключение, если таблица readonly. void assertNotReadonly() const; diff --git a/dbms/src/Parsers/ParserOptimizeQuery.cpp b/dbms/src/Parsers/ParserOptimizeQuery.cpp index 481fc286cce655bddf776c518290c0dc948c0b17..c19d6732ee2829a716507abbc4a9d9bd11f28090 100644 --- a/dbms/src/Parsers/ParserOptimizeQuery.cpp +++ b/dbms/src/Parsers/ParserOptimizeQuery.cpp @@ -3,6 +3,7 @@ #include #include +#include namespace DB @@ -16,11 +17,16 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max ParserWhiteSpaceOrComments ws; ParserString s_optimize("OPTIMIZE", true, true); ParserString s_table("TABLE", true, true); + ParserString s_partition("PARTITION", true, true); + ParserString s_final("FINAL", true, true); ParserString s_dot("."); ParserIdentifier name_p; + ParserLiteral partition_p; ASTPtr database; ASTPtr table; + ASTPtr partition; + bool final = false; ws.ignore(pos, end); @@ -50,13 +56,29 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max ws.ignore(pos, end); + if (s_partition.ignore(pos, end, max_parsed_pos, expected)) + { + ws.ignore(pos, end); + + if (!partition_p.parse(pos, end, partition, max_parsed_pos, expected)) + return false; + } + + ws.ignore(pos, end); + + if (s_final.ignore(pos, end, max_parsed_pos, expected)) + final = true; + ASTOptimizeQuery * query = new ASTOptimizeQuery(StringRange(begin, pos)); node = query; if (database) - query->database = typeid_cast(*database).name; + query->database = typeid_cast(*database).name; if (table) - query->table = typeid_cast(*table).name; + query->table = typeid_cast(*table).name; + if (partition) + query->partition = apply_visitor(FieldVisitorToString(), typeid_cast(*partition).value); + query->final = final; return true; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index cfe5bc65b0b4b11d2ecb9832230fb7633cb2b303..620865b1c2a549ea5f0124455c53e5e11dcc17b7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -80,8 +80,14 @@ void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook /// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки. /// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности. -bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, String & merged_name, size_t available_disk_space, - bool merge_anything_for_old_months, bool aggressive, bool only_small, const AllowedMergingPredicate & can_merge_callback) +bool MergeTreeDataMerger::selectPartsToMerge( + MergeTreeData::DataPartsVector & parts, + String & merged_name, + size_t available_disk_space, + bool merge_anything_for_old_months, + bool aggressive, + bool only_small, + const AllowedMergingPredicate & can_merge_callback) { MergeTreeData::DataParts data_parts = data.getDataParts(); @@ -175,7 +181,8 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa MergeTreeData::DataParts::iterator jt = it; while (cur_len < static_cast(data.settings.max_parts_to_merge_at_once) || (cur_len < static_cast(data.settings.max_parts_to_merge_at_once_if_small) - && cur_sum < data.settings.merge_more_parts_if_sum_bytes_is_less_than)) + && cur_sum < data.settings.merge_more_parts_if_sum_bytes_is_less_than) + || aggressive) { const MergeTreeData::DataPartPtr & prev_part = *jt; ++jt; @@ -200,7 +207,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa /// Кусок правее предыдущего. if (last_part->left < cur_id) { - LOG_WARNING(log, "Part " << last_part->name << " intersects previous part"); + LOG_ERROR(log, "Part " << last_part->name << " intersects previous part"); break; } @@ -219,7 +226,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa int cur_age_in_sec = time(0) - newest_modification_time; /// Если куски больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3. - if (cur_max > 1024 * 1024 * 1024 && cur_age_in_sec < 6 * 3600) + if (cur_max > 1024 * 1024 * 1024 && cur_age_in_sec < 6 * 3600 && !aggressive) min_len = 3; /// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски. @@ -316,6 +323,74 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa } +bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( + MergeTreeData::DataPartsVector & what, + String & merged_name, + size_t available_disk_space, + const AllowedMergingPredicate & can_merge, + DayNum_t partition, + bool final) +{ + MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition); + + if (parts.empty()) + return false; + + if (!final && parts.size() == 1) + return false; + + MergeTreeData::DataPartsVector::const_iterator it = parts.begin(); + MergeTreeData::DataPartsVector::const_iterator prev_it = it; + + size_t sum_bytes = 0; + DayNum_t left_date = DayNum_t(std::numeric_limits::max()); + DayNum_t right_date = DayNum_t(std::numeric_limits::min()); + UInt32 level = 0; + + while (it != parts.end()) + { + if ((it != parts.begin() || parts.size() == 1) /// Для случая одного куска, проверяем, что его можно мерджить "самого с собой". + && !can_merge(*prev_it, *it)) + return false; + + level = std::max(level, (*it)->level); + left_date = std::min(left_date, (*it)->left_date); + right_date = std::max(right_date, (*it)->right_date); + + sum_bytes += (*it)->size_in_bytes; + + prev_it = it; + ++it; + } + + /// Достаточно места на диске, чтобы покрыть новый мердж с запасом. + if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT) + { + time_t now = time(0); + if (now - disk_space_warning_time > 3600) + { + disk_space_warning_time = now; + LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name + << " because not enough free space: " + << formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved " + << "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace()) << " reserved in " + << DiskSpaceMonitor::getReservationCount() << " chunks), " + << formatReadableSizeWithBinarySuffix(sum_bytes) + << " required now (+" << static_cast((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100) + << "% on overhead); suppressing similar warnings for the next hour"); + } + return false; + } + + what = parts; + merged_name = ActiveDataPartSet::getPartName( + left_date, right_date, parts.front()->left, parts.back()->right, level + 1); + + LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name); + return true; +} + + MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(DayNum_t partition) { MergeTreeData::DataPartsVector parts_from_partition; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 91275d52ec4ef1a1ed0a7d8c5474f9f52d5a0c62..2d87dd05a3e59ade3d547123e85e61da45d3bff4 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -9,7 +9,7 @@ namespace DB { -void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const +void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const { out << "format version: 3\n" << "create_time: " << LocalDateTime(create_time ? create_time : time(0)) << "\n" @@ -56,7 +56,7 @@ void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const out << "quorum: " << quorum << '\n'; } -void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in) +void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) { UInt8 format_version = 0; String type_str; @@ -127,7 +127,7 @@ void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in) in >> "quorum: " >> quorum >> "\n"; } -String ReplicatedMergeTreeLogEntry::toString() const +String ReplicatedMergeTreeLogEntryData::toString() const { String s; { diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 6df274972da6c06b29f95f1ff47e7453663d29d2..8fce6f8ea065883f78868fc68f98aa288ac29d8a 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -294,7 +294,7 @@ void StorageBuffer::shutdown() try { - optimize(context.getSettings()); + optimize({}, {}, context.getSettings()); } catch (...) { @@ -303,10 +303,15 @@ void StorageBuffer::shutdown() } -bool StorageBuffer::optimize(const Settings & settings) +bool StorageBuffer::optimize(const String & partition, bool final, const Settings & settings) { - flushAllBuffers(false); + if (!partition.empty()) + throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); + + if (final) + throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); + flushAllBuffers(false); return true; } @@ -505,7 +510,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_ auto lock = lockStructureForAlter(); /// Чтобы не осталось блоков старой структуры. - optimize(context.getSettings()); + optimize({}, {}, context.getSettings()); params.apply(*columns, materialized_columns, alias_columns, column_defaults); InterpreterAlterQuery::updateMetadata(database_name, table_name, diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index 82781dcfb6e49147ef6b24a48b35c9fe2ee54fd5..ae24b2b4bcf6d7e9dd3027fc6c016b6e49ab6ebf 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -132,9 +132,9 @@ void StorageMaterializedView::drop() } } -bool StorageMaterializedView::optimize(const Settings & settings) +bool StorageMaterializedView::optimize(const String & partition, bool final, const Settings & settings) { - return getInnerTable()->optimize(settings); + return getInnerTable()->optimize(partition, final, settings); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index b75b476e4511c0752c92f80a4f1a0e7d6cbaeb62..1e2066e0a29ec936677f31cfde57ef47b0769ece 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -206,7 +206,12 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa } } -bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundProcessingPool::Context * pool_context) +bool StorageMergeTree::merge( + size_t aio_threshold, + bool aggressive, + BackgroundProcessingPool::Context * pool_context, + const String & partition, + bool final) { /// Удаляем старые куски. data.clearOldParts(); @@ -230,11 +235,21 @@ bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundPr size_t big_merges = background_pool.getCounter("big merges"); bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads(); - if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) && - !merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge)) + bool selected = false; + + if (partition.empty()) { - return false; + selected = merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) + || merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge); } + else + { + DayNum_t month = MergeTreeData::getMonthFromName(partition); + selected = merger.selectAllPartsToMergeWithinPartition(parts, merged_name, disk_space, can_merge, month, final); + } + + if (!selected) + return false; merging_tagger = new CurrentlyMergingPartsTagger(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this); @@ -270,7 +285,7 @@ bool StorageMergeTree::mergeTask(BackgroundProcessingPool::Context & background_ try { size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; - return merge(aio_threshold, false, &background_processing_pool_context); + return merge(aio_threshold, false, &background_processing_pool_context, {}, {}); } catch (Exception & e) { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index bb2c0596ad746a2b4c0349edc5653dc7d58ed745..980fed9aa7a98da9459360a474df137d9df0a7b5 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -521,27 +521,27 @@ void StorageReplicatedMergeTree::createTableIfNotExists() zkutil::Ops ops; ops.push_back(new zkutil::Op::Create(zookeeper_path, "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/metadata", metadata, - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/columns", ColumnsDescription{ - data.getColumnsListNonMaterialized(), data.materialized_columns, - data.alias_columns, data.column_defaults}.toString(), - acl, zkutil::CreateMode::Persistent)); + data.getColumnsListNonMaterialized(), data.materialized_columns, + data.alias_columns, data.column_defaults}.toString(), + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/log", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/block_numbers", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/nonincrement_block_numbers", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/leader_election", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/temp", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/replicas", "", - acl, zkutil::CreateMode::Persistent)); + acl, zkutil::CreateMode::Persistent)); auto code = zookeeper->tryMulti(ops); if (code != ZOK && code != ZNODEEXISTS) @@ -1557,12 +1557,11 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p } -void StorageReplicatedMergeTree::mergeSelectingThread() +bool StorageReplicatedMergeTree::canMergeParts( + const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + MemoizedPartsThatCouldBeMerged * memo) { - setThreadName("ReplMTMergeSel"); - - bool need_pull = true; - /** Может много времени тратиться на определение, можно ли мерджить два рядом стоящих куска. * Два рядом стоящих куска можно мерджить, если все номера блоков между их номерами не используются ("заброшены", abandoned). * Это значит, что между этими кусками не может быть вставлен другой кусок. @@ -1571,73 +1570,88 @@ void StorageReplicatedMergeTree::mergeSelectingThread() * то делается слишком много чтений из ZooKeeper, чтобы узнать, можно ли их мерджить. * * Воспользуемся утверждением, что если пару кусков было можно мерджить, и их мердж ещё не запланирован, - * то и сейчас их можно мерджить, и будем запоминать это состояние, чтобы не делать много раз одинаковые запросы в ZooKeeper. + * то и сейчас их можно мерджить, и будем запоминать это состояние (если задан параметр memo), + * чтобы не делать много раз одинаковые запросы в ZooKeeper. * * TODO Интересно, как это сочетается с DROP PARTITION и затем ATTACH PARTITION. */ - std::set> memoized_parts_that_could_be_merged; - auto can_merge = [&memoized_parts_that_could_be_merged, this] - (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool + /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. + if (queue.partWillBeMergedOrMergesDisabled(left->name) + || (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name))) + return false; + + auto key = std::make_pair(left->name, right->name); + if (memo && memo->count(key)) + return true; + + String month_name = left->name.substr(0, 6); + auto zookeeper = getZooKeeper(); + + /// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум. + /// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной. + String quorum_node_value; + if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value)) { - /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. - if (queue.partWillBeMergedOrMergesDisabled(left->name) || queue.partWillBeMergedOrMergesDisabled(right->name)) - return false; + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.fromString(quorum_node_value); - auto key = std::make_pair(left->name, right->name); - if (memoized_parts_that_could_be_merged.count(key)) - return true; + ActiveDataPartSet::Part part_info; + ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info); - String month_name = left->name.substr(0, 6); - auto zookeeper = getZooKeeper(); + if (part_info.left != part_info.right) + throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR); - /// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум. - /// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной. - String quorum_node_value; - if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value)) - { - ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.fromString(quorum_node_value); + if (left->right <= part_info.left && right->left >= part_info.right) + return false; + } - ActiveDataPartSet::Part part_info; - ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info); + /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. + for (Int64 number = left->right + 1; number <= right->left - 1; ++number) + { + /** Для номеров до RESERVED_BLOCK_NUMBERS не используется AbandonableLock + * - такие номера не могут быть "заброшены" - то есть, не использованными для кусков. + * Это номера кусков, которые были добавлены с помощью ALTER ... ATTACH. + * Они должны идти без пропусков (для каждого номера должен быть кусок). + * Проверяем, что для всех таких номеров есть куски, + * иначе, через "дыры" - отсутствующие куски, нельзя мерджить. + */ - if (part_info.left != part_info.right) - throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR); + if (number < RESERVED_BLOCK_NUMBERS) + { + if (!data.hasBlockNumberInMonth(number, left->month)) + return false; + } + else + { + String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); + String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number); - if (left->right <= part_info.left && right->left >= part_info.right) + if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && + AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) return false; } + } - /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. - for (Int64 number = left->right + 1; number <= right->left - 1; ++number) - { - /** Для номеров до RESERVED_BLOCK_NUMBERS не используется AbandonableLock - * - такие номера не могут быть "заброшены" - то есть, не использованными для кусков. - * Это номера кусков, которые были добавлены с помощью ALTER ... ATTACH. - * Они должны идти без пропусков (для каждого номера должен быть кусок). - * Проверяем, что для всех таких номеров есть куски, - * иначе, через "дыры" - отсутствующие куски, нельзя мерджить. - */ + if (memo) + memo->insert(key); - if (number < RESERVED_BLOCK_NUMBERS) - { - if (!data.hasBlockNumberInMonth(number, left->month)) - return false; - } - else - { - String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); - String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number); + return true; +} - if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && - AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) - return false; - } - } - memoized_parts_that_could_be_merged.insert(key); - return true; +void StorageReplicatedMergeTree::mergeSelectingThread() +{ + setThreadName("ReplMTMergeSel"); + + bool need_pull = true; + + MemoizedPartsThatCouldBeMerged memoized_parts_that_could_be_merged; + + auto can_merge = [&memoized_parts_that_could_be_merged, this] + (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool + { + return canMergeParts(left, right, &memoized_parts_that_could_be_merged); }; while (!shutdown_called && is_leader_node) @@ -1688,78 +1702,26 @@ void StorageReplicatedMergeTree::mergeSelectingThread() << ". Max number of big merges: " << max_number_of_big_merges << (only_small ? ". So, will select only small parts to merge." : ".")); - do + if (merges_queued >= data.settings.max_replicated_merges_in_queue) { - if (merges_queued >= data.settings.max_replicated_merges_in_queue) - { - LOG_TRACE(log, "Number of queued merges (" << merges_queued - << ") is greater than max_replicated_merges_in_queue (" - << data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge."); - break; - } - - MergeTreeData::DataPartsVector parts; - - String merged_name; - - size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - - if ( !merger.selectPartsToMerge(parts, merged_name, disk_space, false, false, only_small, can_merge) - && !merger.selectPartsToMerge(parts, merged_name, disk_space, true, false, only_small, can_merge)) - { - break; - } - - auto zookeeper = getZooKeeper(); - - bool all_in_zk = true; - for (const auto & part : parts) - { - /// Если о каком-то из кусков нет информации в ZK, не будем сливать. - if (!zookeeper->exists(replica_path + "/parts/" + part->name)) - { - all_in_zk = false; - - if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0)) - { - LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)" - << " with age " << (time(0) - part->modification_time) - << " seconds exists locally but not in ZooKeeper." - << " Won't do merge with that part and will check it."); - enqueuePartForCheck(part->name); - } - } - } - if (!all_in_zk) - break; - - LogEntry entry; - entry.type = LogEntry::MERGE_PARTS; - entry.source_replica = replica_name; - entry.new_part_name = merged_name; - entry.create_time = time(0); - - for (const auto & part : parts) - entry.parts_to_merge.push_back(part->name); - - need_pull = true; + LOG_TRACE(log, "Number of queued merges (" << merges_queued + << ") is greater than max_replicated_merges_in_queue (" + << data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge."); + break; + } - zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); + MergeTreeData::DataPartsVector parts; + String merged_name; - String month_name = parts[0]->name.substr(0, 6); - for (size_t i = 0; i + 1 < parts.size(); ++i) - { - /// Уберем больше не нужные отметки о несуществующих блоках. - for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number) - { - zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number)); - zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number)); - } - } + size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + if (( merger.selectPartsToMerge(parts, merged_name, disk_space, false, false, only_small, can_merge) + || merger.selectPartsToMerge(parts, merged_name, disk_space, true, false, only_small, can_merge)) + && createLogEntryToMergeParts(parts, merged_name)) + { success = true; + need_pull = true; } - while (false); } catch (...) { @@ -1777,6 +1739,61 @@ void StorageReplicatedMergeTree::mergeSelectingThread() } +bool StorageReplicatedMergeTree::createLogEntryToMergeParts( + const MergeTreeData::DataPartsVector & parts, const String & merged_name, ReplicatedMergeTreeLogEntryData * out_log_entry) +{ + auto zookeeper = getZooKeeper(); + + bool all_in_zk = true; + for (const auto & part : parts) + { + /// Если о каком-то из кусков нет информации в ZK, не будем сливать. + if (!zookeeper->exists(replica_path + "/parts/" + part->name)) + { + all_in_zk = false; + + if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0)) + { + LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)" + << " with age " << (time(0) - part->modification_time) + << " seconds exists locally but not in ZooKeeper." + << " Won't do merge with that part and will check it."); + enqueuePartForCheck(part->name); + } + } + } + if (!all_in_zk) + return false; + + LogEntry entry; + entry.type = LogEntry::MERGE_PARTS; + entry.source_replica = replica_name; + entry.new_part_name = merged_name; + entry.create_time = time(0); + + for (const auto & part : parts) + entry.parts_to_merge.push_back(part->name); + + zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); + + String month_name = parts[0]->name.substr(0, 6); + for (size_t i = 0; i + 1 < parts.size(); ++i) + { + /// Уберем больше не нужные отметки о несуществующих блоках. + for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number) + { + zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number)); + zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number)); + } + } + + if (out_log_entry) + *out_log_entry = entry; + + return true; +} + + void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops) { String part_path = replica_path + "/parts/" + part_name; @@ -2180,31 +2197,75 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Setti } -bool StorageReplicatedMergeTree::optimize(const Settings & settings) +bool StorageReplicatedMergeTree::optimize(const String & partition, bool final, const Settings & settings) { - /// Померджим какие-нибудь куски из директории unreplicated. - /// TODO: Мерджить реплицируемые куски тоже. + /// Если есть не реплицируемые данные, то мерджим сначала их. + if (unreplicated_data) + { + std::lock_guard lock(unreplicated_mutex); + unreplicated_data->clearOldParts(); - if (!unreplicated_data) - return false; + MergeTreeData::DataPartsVector parts; + String merged_name; + auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; }; - std::lock_guard lock(unreplicated_mutex); + if (unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge)) + { + const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); - unreplicated_data->clearOldParts(); + auto new_part = unreplicated_merger->mergePartsToTemporaryPart( + parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0)); - MergeTreeData::DataPartsVector parts; - String merged_name; - auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; }; - if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge)) - return false; + unreplicated_merger->renameMergedTemporaryPart(parts, new_part, merged_name, nullptr); + return true; + } + } + + assertNotReadonly(); + + auto zookeeper = getZooKeeper(); + + if (!is_leader_node) + throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED); + + auto can_merge = [this] + (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + { + return canMergeParts(left, right, nullptr); + }; + + pullLogsToQueue(); + + ReplicatedMergeTreeLogEntryData merge_entry; + { + std::lock_guard merge_selecting_lock(merge_selecting_mutex); + + MergeTreeData::DataPartsVector parts; + String merged_name; + + size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); + bool selected = false; - auto new_part = unreplicated_merger->mergePartsToTemporaryPart( - parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0)); + if (partition.empty()) + { + selected = merger.selectPartsToMerge(parts, merged_name, disk_space, false, true, false, can_merge) + || merger.selectPartsToMerge(parts, merged_name, disk_space, true, true, false, can_merge); + } + else + { + DayNum_t month = MergeTreeData::getMonthFromName(partition); + selected = merger.selectAllPartsToMergeWithinPartition(parts, merged_name, disk_space, can_merge, month, final); + } - unreplicated_merger->renameMergedTemporaryPart(parts, new_part, merged_name, nullptr); + if (!selected) + return false; + + if (!createLogEntryToMergeParts(parts, merged_name, &merge_entry)) + return false; + } + waitForAllReplicasToProcessLogEntry(merge_entry); return true; } @@ -2430,7 +2491,8 @@ void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partiti } -void StorageReplicatedMergeTree::dropPartition(ASTPtr query, const Field & field, bool detach, bool unreplicated, const Settings & settings) +void StorageReplicatedMergeTree::dropPartition( + ASTPtr query, const Field & field, bool detach, bool unreplicated, const Settings & settings) { if (unreplicated) { @@ -2746,7 +2808,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const } -void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry) +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry) { auto zookeeper = getZooKeeper(); LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); @@ -2759,7 +2821,7 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEn } -void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const LogEntry & entry) +void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) { auto zookeeper = getZooKeeper();