diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index cfe078462f4405737c734aa90d14f324c2162af6..a02cd2966f93b7bdc247c76e22018b8a1d3bb457 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -203,6 +203,7 @@ add_library (dbms include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h include/DB/DataStreams/SummingSortedBlockInputStream.h include/DB/DataStreams/ReplacingSortedBlockInputStream.h + include/DB/DataStreams/GraphiteRollupSortedBlockInputStream.h include/DB/DataStreams/AddingConstColumnBlockInputStream.h include/DB/DataStreams/DistinctBlockInputStream.h include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h @@ -716,6 +717,7 @@ add_library (dbms src/DataStreams/CSVRowInputStream.cpp src/DataStreams/SummingSortedBlockInputStream.cpp src/DataStreams/ReplacingSortedBlockInputStream.cpp + src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp src/DataStreams/TotalsHavingBlockInputStream.cpp src/DataStreams/CreatingSetsBlockInputStream.cpp src/DataStreams/DistinctBlockInputStream.cpp diff --git a/dbms/include/DB/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/include/DB/DataStreams/GraphiteRollupSortedBlockInputStream.h new file mode 100644 index 0000000000000000000000000000000000000000..ff6133390c402d640434808aaa3993940c83cce8 --- /dev/null +++ b/dbms/include/DB/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -0,0 +1,211 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Предназначен для реализации "rollup" - загрубления старых данных + * для таблицы с данными Графита - системы количественного мониторинга. + * + * Таблица с данными Графита имеет по крайней мере следующие столбцы (с точностью до названия): + * Path, Time, Value, Version + * + * Path - имя метрики (сенсора); + * Time - время, к которому относится измерение; + * Value - значение; + * Version - число, такое что для одинаковых пар Path, Time, следует брать значение максимальной версии. + * + * Каждая строчка в таблице соответствует одному значению одного сенсора. + * + * Правила загрубления задаются в следующем виде: + * + * pattern + * regexp + * function + * age -> precision + * age -> precision + * ... + * pattern + * ... + * default + * function + * age -> precision + * ... + * + * regexp - шаблон на имя сенсора + * default - если ни один шаблон не сработал + * + * age - минимальный возраст данных (в секундах), после которого следует применять округление до величины precision. + * precision - величина округления (в секундах) + * + * function - имя агрегатной функции, которую следует применять к значениям, время которых округлилось до одинакового. + * + * Пример: + * + * + * + * click_cost + * any + * + * 0 + * 3600 + * + * + * 86400 + * 60 + * + * + * + * max + * + * 0 + * 60 + * + * + * 3600 + * 300 + * + * + * 86400 + * 3600 + * + * + * + */ + +namespace Graphite +{ + struct Retention + { + UInt32 age; + UInt32 precision; + }; + + using Retentions = std::vector; + + struct Pattern + { + std::shared_ptr regexp; + AggregateFunctionPtr function; + Retentions retentions; /// Упорядочены по убыванию age. + }; + + using Patterns = std::vector; + + struct Params + { + String path_column_name; + String time_column_name; + String value_column_name; + String version_column_name; + Graphite::Patterns patterns; + }; +} + +/** Соединяет несколько сортированных потоков в один. + * + * При этом, для каждой группы идущих подряд одинаковых значений столбца path, + * и одинаковых значений time с округлением до некоторой точности + * (где точность округления зависит от набора шаблонов на path + * и количества времени, прошедшего от time до заданного времени), + * оставляет одну строку, + * выполняя округление времени, + * слияние значений value, используя заданные агрегатные функции, + * а также оставляя максимальное значение столбца version. + */ +class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStream +{ +public: + GraphiteRollupSortedBlockInputStream( + BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, + const Graphite::Params & params, time_t time_of_merge) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + params(params), time_of_merge(time_of_merge) + { + } + + String getName() const override { return "GraphiteRollupSorted"; } + + String getID() const override + { + std::stringstream res; + res << "GraphiteRollupSorted(inputs"; + + for (size_t i = 0; i < children.size(); ++i) + res << ", " << children[i]->getID(); + + res << ", description"; + + for (size_t i = 0; i < description.size(); ++i) + res << ", " << description[i].getID(); + + res << ")"; + return res.str(); + } + + ~GraphiteRollupSortedBlockInputStream() + { + if (current_pattern) + current_pattern->function->destroy(place_for_aggregate_state.data()); + } + +protected: + Block readImpl() override; + +private: + Logger * log = &Logger::get("GraphiteRollupSortedBlockInputStream"); + + const Graphite::Params params; + + size_t path_column_num; + size_t time_column_num; + size_t value_column_num; + size_t version_column_num; + + /// Все столбцы кроме time, value, version. Они вставляются без модификации. + ColumnNumbers unmodified_column_numbers; + + time_t time_of_merge; + + /// Прочитали до конца. + bool finished = false; + + bool is_first = true; + StringRef current_path; + time_t current_time; + StringRef next_path; + time_t next_time; + UInt64 current_max_version = 0; + + const Graphite::Pattern * current_pattern = nullptr; + std::vector place_for_aggregate_state; + + const Graphite::Pattern * selectPatternForPath(StringRef path) const; + UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const; + + + template + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + + /// Вставить значения в результирующие столбцы, которые не будут меняться в дальнейшем. + template + void startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor); + + /// Вставить в результирующие столбцы вычисленные значения time, value, version по последней группе строк. + void finishCurrentRow(ColumnPlainPtrs & merged_columns); + + /// Обновить состояние агрегатной функции новым значением value. + template + void accumulateRow(TSortCursor & cursor); +}; + +} diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index c7cf7667c0597eed4c139b618f0b2fc6e703390d..75873bead0ec44e9393a71b340ca1c7d6773a268 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -11,6 +11,7 @@ #include #include #include +#include #include @@ -70,6 +71,7 @@ namespace ErrorCodes * - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций. * - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются; * - это позволяет читать данные ровно такими пачками, какими они были записаны. + * - Graphite - выполняет загрубление исторических данных для таблицы Графита - системы количественного мониторинга. */ /** Этот класс хранит список кусков и параметры структуры данных. @@ -199,6 +201,7 @@ public: Aggregating = 3, Unsorted = 4, Replacing = 5, + Graphite = 6, }; Mode mode; @@ -212,8 +215,13 @@ public: /// Для Replacing режима. Может быть пустым. String version_column; + /// Для Graphite режима. + Graphite::Params graphite_params; + /// Проверить наличие и корректность типов столбцов. void check(const NamesAndTypesList & columns) const; + + String getModeName() const; }; @@ -246,8 +254,6 @@ public: /// Загрузить множество кусков с данными с диска. Вызывается один раз - сразу после создания объекта. void loadDataParts(bool skip_sanity_checks); - std::string getModePrefix() const; - bool supportsSampling() const { return !!sampling_expression; } bool supportsPrewhere() const { return true; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index 0c6faa85a988eb2e075dc3d2a62193798d2edff0..d9788aefab6cace0bd93bf858e75e89d2ce8b7df 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -51,10 +51,13 @@ public: * * Создаёт и возвращает временный кусок. * Чтобы закончить мердж, вызовите функцию renameTemporaryMergedPart. + * + * time_of_merge - время, когда мердж был назначен. + * Важно при использовании ReplicatedGraphiteMergeTree для обеспечения одинакового мерджа на репликах. */ MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry, - size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation = nullptr); + size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation = nullptr); MergeTreeData::DataPartPtr renameMergedTemporaryPart( MergeTreeData::DataPartsVector & parts, diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 2cdf61143b5bac690616c57208bc790b6bb8473a..d96876f4d1e5a1267c3eb691efb2fe8879ec3656 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -48,7 +48,7 @@ public: std::string getName() const override { - return data.getModePrefix() + "MergeTree"; + return data.merging_params.getModeName() + "MergeTree"; } std::string getTableName() const override { return table_name; } diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 82a38208231d3db4e91238a972fa6ea93ca16b53..1857079ce4e0882b4ae9a31bfb6958a4117edd9a 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -94,7 +94,7 @@ public: std::string getName() const override { - return "Replicated" + data.getModePrefix() + "MergeTree"; + return "Replicated" + data.merging_params.getModeName() + "MergeTree"; } std::string getTableName() const override { return table_name; } diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1db8a9340c0848d3bbdcdc2edddd5cc9e8ebeb52 --- /dev/null +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -0,0 +1,226 @@ +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NO_SUCH_COLUMN_IN_TABLE; +} + + +const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const +{ + for (const auto & pattern : params.patterns) + if (!pattern.regexp || pattern.regexp->match(path.data, path.size)) + return &pattern; + + return nullptr; +} + + +UInt32 GraphiteRollupSortedBlockInputStream::selectPrecision(const Graphite::Retentions & retentions, time_t time) const +{ + /// Рассчитываем на то, что time_t - знаковый тип. + for (const auto & retention : retentions) + if (current_time - time >= static_cast(retention.age)) + return retention.precision; + + /// Без огрубления. + return 1; +} + + +Block GraphiteRollupSortedBlockInputStream::readImpl() +{ + if (finished) + return Block(); + + if (children.size() == 1) + return children[0]->read(); + + Block merged_block; + ColumnPlainPtrs merged_columns; + + init(merged_block, merged_columns); + if (merged_columns.empty()) + return Block(); + + /// Дополнительная инициализация. + if (!current_path.data) + { + /// Определим максимальный размер состояния агрегатных функций. + size_t max_size_of_aggregate_state = 0; + for (const auto & pattern : params.patterns) + if (pattern.function->sizeOfData() > max_size_of_aggregate_state) + max_size_of_aggregate_state = pattern.function->sizeOfData(); + + place_for_aggregate_state.resize(max_size_of_aggregate_state); + + /// Запомним номера столбцов. + path_column_num = merged_block.getPositionByName(params.path_column_name); + time_column_num = merged_block.getPositionByName(params.time_column_name); + value_column_num = merged_block.getPositionByName(params.value_column_name); + version_column_num = merged_block.getPositionByName(params.version_column_name); + + for (size_t i = 0; i < num_columns; ++i) + if (i != time_column_num && i != value_column_num && i != version_column_num) + unmodified_column_numbers.push_back(i); + } + + if (has_collation) + merge(merged_columns, queue_with_collation); + else + merge(merged_columns, queue); + + return merged_block; +} + + +template +void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +{ + size_t merged_rows = 0; + + /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size + while (!queue.empty()) + { + TSortCursor current = queue.top(); + + next_path = current->sort_columns[path_column_num]->getDataAt(current->pos); + next_time = current->sort_columns[time_column_num]->get64(current->pos); + + bool is_new_key; + bool was_first = is_first; + auto prev_pattern = current_pattern; + + if (is_first) /// Первый встретившийся ключ. + { + is_first = false; + current_pattern = selectPatternForPath(next_path); + + if (current_pattern) + { + current_pattern->function->create(place_for_aggregate_state.data()); + UInt32 precision = selectPrecision(current_pattern->retentions, next_time); + next_time = next_time / precision * precision; + } + + current_path = next_path; + current_time = next_time; + + is_new_key = true; + } + else + { + bool path_differs = next_path != current_path; + + if (path_differs) + current_pattern = selectPatternForPath(next_path); + + if (current_pattern) + { + UInt32 precision = selectPrecision(current_pattern->retentions, next_time); + next_time = next_time / precision * precision; + } + + is_new_key = path_differs || next_time != current_time; + + if (is_new_key) + { + if (prev_pattern) + prev_pattern->function->destroy(place_for_aggregate_state.data()); + if (current_pattern) + current_pattern->function->create(place_for_aggregate_state.data()); + } + } + + /// если накопилось достаточно строк и последняя посчитана полностью + if (is_new_key && merged_rows >= max_block_size) + return; + + queue.pop(); + + if (is_new_key) + { + if (!was_first) + finishCurrentRow(merged_columns); + + startNextRow(merged_columns, current); + + std::swap(current_path, next_path); + std::swap(current_time, next_time); + current_max_version = 0; + + if (prev_pattern) + prev_pattern->function->destroy(place_for_aggregate_state.data()); + if (current_pattern) + current_pattern->function->create(place_for_aggregate_state.data()); + + ++merged_rows; + } + + accumulateRow(current); + current_max_version = std::max(current_max_version, current->sort_columns[version_column_num]->get64(current->pos)); + + if (!current->isLast()) + { + current->next(); + queue.push(current); + } + else + { + /// Достаём из соответствующего источника следующий блок, если есть. + fetchNextBlock(current, queue); + } + } + + /// Запишем данные для последней группы. + ++merged_rows; + finishCurrentRow(merged_columns); + + finished = true; + + if (current_pattern) + { + current_pattern->function->destroy(place_for_aggregate_state.data()); + current_pattern = nullptr; + } +} + + +template +void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor) +{ + /// Копируем не модифицированные значения столбцов. + for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i) + { + size_t j = unmodified_column_numbers[i]; + merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos); + } + + if (!current_pattern) + merged_columns[value_column_num]->insertFrom(*cursor->all_columns[value_column_num], cursor->pos); +} + + +void GraphiteRollupSortedBlockInputStream::finishCurrentRow(ColumnPlainPtrs & merged_columns) +{ + /// Вставляем вычисленные значения столбцов time, value, version. + merged_columns[time_column_num]->insert(UInt64(current_time)); + merged_columns[version_column_num]->insert(current_max_version); + + if (current_pattern) + current_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]); +} + + +template +void GraphiteRollupSortedBlockInputStream::accumulateRow(TSortCursor & cursor) +{ + if (current_pattern) + current_pattern->function->add(place_for_aggregate_state.data(), &cursor->all_columns[value_column_num], cursor->pos); +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 40667cac5effd8b3ccd86dc4e0cca871b70f9bf3..dfb520c6dc6ad64cfe696200e4b6f8b6d6165855 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -166,6 +166,26 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons } } } + + /// TODO Проверки для Graphite +} + + +String MergeTreeData::MergingParams::getModeName() const +{ + switch (mode) + { + case Ordinary: return ""; + case Collapsing: return "Collapsing"; + case Summing: return "Summing"; + case Aggregating: return "Aggregating"; + case Unsorted: return "Unsorted"; + case Replacing: return "Replacing"; + case Graphite: return "Graphite"; + + default: + throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR); + } } @@ -180,22 +200,6 @@ Int64 MergeTreeData::getMaxDataPartIndex() return max_part_id; } -std::string MergeTreeData::getModePrefix() const -{ - switch (merging_params.mode) - { - case MergingParams::Ordinary: return ""; - case MergingParams::Collapsing: return "Collapsing"; - case MergingParams::Summing: return "Summing"; - case MergingParams::Aggregating: return "Aggregating"; - case MergingParams::Unsorted: return "Unsorted"; - case MergingParams::Replacing: return "Replacing"; - - default: - throw Exception("Unknown mode of operation for MergeTreeData: " + toString(merging_params.mode), ErrorCodes::LOGICAL_ERROR); - } -} - void MergeTreeData::loadDataParts(bool skip_sanity_checks) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 809f52090222fdfa333a41ba320a8fb80ce8104b..cfe5bc65b0b4b11d2ecb9832230fb7633cb2b303 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -338,7 +339,7 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition( /// parts должны быть отсортированы. MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, - size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation) + size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation) { if (isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); @@ -447,6 +448,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; + case MergeTreeData::MergingParams::Graphite: + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE, + data.merging_params.graphite_params, time_of_merge); + break; + case MergeTreeData::MergingParams::Unsorted: merged_stream = std::make_unique(src_streams); break; @@ -707,6 +714,12 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; + case MergeTreeData::MergingParams::Graphite: + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE, + data.merging_params.graphite_params, time(0)); + break; + case MergeTreeData::MergingParams::Unsorted: merged_stream = std::make_unique(src_streams); break; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index eda8075a5d27d79835428a54920887dc6b74c52f..e6e1c92246d626fcd977791285354ab7a623593d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -811,6 +811,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal case MergeTreeData::MergingParams::Unsorted: throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } res.emplace_back(merged); diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 64b2cb1b712861eb03c1418cf27a845c4c46a345..153716df369b00cdb2d9a6ceba698914a339aa99 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace DB @@ -39,6 +40,8 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int UNKNOWN_STORAGE; extern const int NO_REPLICA_NAME_GIVEN; + extern const int NO_ELEMENTS_IN_CONFIG; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; } @@ -100,6 +103,114 @@ static Names extractColumnNames(const ASTPtr & node) } +/** Прочитать настройки прореживания старых данных Графита из конфига. + * Пример: + * + * + * Path + * + * click_cost + * any + * + * 0 + * 3600 + * + * + * 86400 + * 60 + * + * + * + * max + * + * 0 + * 60 + * + * + * 3600 + * 300 + * + * + * 86400 + * 3600 + * + * + * + */ +static void appendGraphitePattern(const Context & context, + const Poco::Util::AbstractConfiguration & config, const String & config_element, Graphite::Patterns & patterns) +{ + Graphite::Pattern pattern; + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_element, keys); + + for (const auto & key : keys) + { + if (key == "regexp") + { + pattern.regexp = std::make_shared(config.getString(config_element + ".regexp")); + } + else if (key == "function") + { + /// TODO Не только Float64 + pattern.function = context.getAggregateFunctionFactory().get( + config.getString(config_element + ".function"), { new DataTypeFloat64 }); + } + else if (startsWith(key, "retention")) + { + pattern.retentions.emplace_back( + Graphite::Retention{ + .age = config.getUInt(config_element + "." + key + ".age"), + .precision = config.getUInt(config_element + "." + key + ".precision")}); + } + else + throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + } + + if (!pattern.function) + throw Exception("Aggregate function is mandatory for retention patterns in GraphiteMergeTree", + ErrorCodes::NO_ELEMENTS_IN_CONFIG); + + /// retention-ы должны идти по убыванию возраста. + std::sort(pattern.retentions.begin(), pattern.retentions.end(), + [] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; }); + + patterns.emplace_back(pattern); +} + +static void setGraphitePatternsFromConfig(const Context & context, + const String & config_element, Graphite::Params & params) +{ + const Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); + + params.path_column_name = config.getString(config_element + ".path_column_name", "Path"); + params.time_column_name = config.getString(config_element + ".time_column_name", "Time"); + params.value_column_name = config.getString(config_element + ".value_column_name", "Value"); + params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp"); + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_element, keys); + + for (const auto & key : keys) + { + if (startsWith(key, "pattern")) + { + appendGraphitePattern(context, config, config_element + "." + key, params.patterns); + } + else if (key == "default") + { + /// Ниже. + } + else + throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + } + + if (config.has(config_element + ".default")) + appendGraphitePattern(context, config, config_element + "." + ".default", params.patterns); +} + + StoragePtr StorageFactory::get( const String & name, const String & data_path, @@ -352,7 +463,7 @@ StoragePtr StorageFactory::get( } else if (endsWith(name, "MergeTree")) { - /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing]MergeTree (2 * 6 комбинаций) + /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 комбинаций) * В качестве аргумента для движка должно быть указано: * - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper @@ -366,12 +477,14 @@ StoragePtr StorageFactory::get( * - (для Summing, не обязательно) кортеж столбцов, которых следует суммировать. Если не задано - используются все числовые столбцы, не входящие в первичный ключ. * - (для Replacing, не обязательно) имя столбца одного из UInt типов, обозначающего "версию" * Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). + * - (для Graphite) имена столбцов Path, Time, Value, Version, имеющих специальное значение. * * MergeTree(date, [sample_key], primary_key, index_granularity) * CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign) * SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum]) * AggregatingMergeTree(date, [sample_key], primary_key, index_granularity) * ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column]) + * GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, Path, Time, Value, Version) * UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже. */ @@ -383,9 +496,9 @@ MergeTrees is different in two ways: - it may be replicated and non-replicated; - it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions. -So we have 12 combinations: - MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree - ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree +So we have 14 combinations: + MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree + ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree In most of cases, you need MergeTree or ReplicatedMergeTree. @@ -469,7 +582,9 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args = typeid_cast(*args_func.at(0)).children; /// NOTE Слегка запутанно. - size_t num_additional_params = (replicated ? 2 : 0) + (merging_params.mode == MergeTreeData::MergingParams::Collapsing); + size_t num_additional_params = (replicated ? 2 : 0) + + (merging_params.mode == MergeTreeData::MergingParams::Collapsing) + + (merging_params.mode == MergeTreeData::MergingParams::Graphite) * 4; if (merging_params.mode == MergeTreeData::MergingParams::Unsorted && args.size() != num_additional_params + 2) @@ -610,6 +725,17 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args.pop_back(); } } + else if (merging_params.mode == MergeTreeData::MergingParams::Graphite) + { + String graphite_config_name; + + if (auto ast = typeid_cast(&*args.back())) + graphite_config_name = ast->value.get(); + else + throw Exception(String("Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + + setGraphitePatternsFromConfig(context, graphite_config_name, merging_params.graphite_params); + } /// Если присутствует выражение для сэмплирования. MergeTree(date, [sample_key], primary_key, index_granularity) if (args.size() == 4) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d21931a9d63911e57400ecd3cd5da4c32abf3e45..b75b476e4511c0752c92f80a4f1a0e7d6cbaeb62 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -255,7 +255,7 @@ bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundPr const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); auto new_part = merger.mergePartsToTemporaryPart( - merging_tagger->parts, merged_name, *merge_entry, aio_threshold, &*merging_tagger->reserved_space); + merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), &*merging_tagger->reserved_space); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 99aa0a971200a8ceb811df02d75e6df48aeb61e0..c6723727fb8b86b91de254d33b78ec3a4811b04f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1121,7 +1121,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; auto part = merger.mergePartsToTemporaryPart( - parts, entry.new_part_name, *merge_entry, aio_threshold, reserved_space); + parts, entry.new_part_name, *merge_entry, aio_threshold, entry.create_time, reserved_space); zkutil::Ops ops; @@ -2200,7 +2200,7 @@ bool StorageReplicatedMergeTree::optimize(const Settings & settings) const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); auto new_part = unreplicated_merger->mergePartsToTemporaryPart( - parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io); + parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0)); unreplicated_merger->renameMergedTemporaryPart(parts, new_part, merged_name, nullptr);