提交 07f04d70 编写于 作者: A Alexey Milovidov

Merge

上级 73fd006e
......@@ -203,6 +203,7 @@ add_library (dbms
include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h
include/DB/DataStreams/SummingSortedBlockInputStream.h
include/DB/DataStreams/ReplacingSortedBlockInputStream.h
include/DB/DataStreams/GraphiteRollupSortedBlockInputStream.h
include/DB/DataStreams/AddingConstColumnBlockInputStream.h
include/DB/DataStreams/DistinctBlockInputStream.h
include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h
......@@ -716,6 +717,7 @@ add_library (dbms
src/DataStreams/CSVRowInputStream.cpp
src/DataStreams/SummingSortedBlockInputStream.cpp
src/DataStreams/ReplacingSortedBlockInputStream.cpp
src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp
src/DataStreams/TotalsHavingBlockInputStream.cpp
src/DataStreams/CreatingSetsBlockInputStream.cpp
src/DataStreams/DistinctBlockInputStream.cpp
......
#pragma once
#include <common/logger_useful.h>
#include <DB/Core/Row.h>
#include <DB/Core/ColumnNumbers.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Common/OptimizedRegularExpression.h>
namespace DB
{
/** Предназначен для реализации "rollup" - загрубления старых данных
* для таблицы с данными Графита - системы количественного мониторинга.
*
* Таблица с данными Графита имеет по крайней мере следующие столбцы (с точностью до названия):
* Path, Time, Value, Version
*
* Path - имя метрики (сенсора);
* Time - время, к которому относится измерение;
* Value - значение;
* Version - число, такое что для одинаковых пар Path, Time, следует брать значение максимальной версии.
*
* Каждая строчка в таблице соответствует одному значению одного сенсора.
*
* Правила загрубления задаются в следующем виде:
*
* pattern
* regexp
* function
* age -> precision
* age -> precision
* ...
* pattern
* ...
* default
* function
* age -> precision
* ...
*
* regexp - шаблон на имя сенсора
* default - если ни один шаблон не сработал
*
* age - минимальный возраст данных (в секундах), после которого следует применять округление до величины precision.
* precision - величина округления (в секундах)
*
* function - имя агрегатной функции, которую следует применять к значениям, время которых округлилось до одинакового.
*
* Пример:
*
* <graphite_rollup>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
* <age>0</age>
* <precision>3600</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>60</precision>
* </retention>
* </pattern>
* <default>
* <function>max</function>
* <retention>
* <age>0</age>
* <precision>60</precision>
* </retention>
* <retention>
* <age>3600</age>
* <precision>300</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>3600</precision>
* </retention>
* </default>
* </graphite_rollup>
*/
namespace Graphite
{
struct Retention
{
UInt32 age;
UInt32 precision;
};
using Retentions = std::vector<Retention>;
struct Pattern
{
std::shared_ptr<OptimizedRegularExpression> regexp;
AggregateFunctionPtr function;
Retentions retentions; /// Упорядочены по убыванию age.
};
using Patterns = std::vector<Pattern>;
struct Params
{
String path_column_name;
String time_column_name;
String value_column_name;
String version_column_name;
Graphite::Patterns patterns;
};
}
/** Соединяет несколько сортированных потоков в один.
*
* При этом, для каждой группы идущих подряд одинаковых значений столбца path,
* и одинаковых значений time с округлением до некоторой точности
* (где точность округления зависит от набора шаблонов на path
* и количества времени, прошедшего от time до заданного времени),
* оставляет одну строку,
* выполняя округление времени,
* слияние значений value, используя заданные агрегатные функции,
* а также оставляя максимальное значение столбца version.
*/
class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
GraphiteRollupSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
params(params), time_of_merge(time_of_merge)
{
}
String getName() const override { return "GraphiteRollupSorted"; }
String getID() const override
{
std::stringstream res;
res << "GraphiteRollupSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
~GraphiteRollupSortedBlockInputStream()
{
if (current_pattern)
current_pattern->function->destroy(place_for_aggregate_state.data());
}
protected:
Block readImpl() override;
private:
Logger * log = &Logger::get("GraphiteRollupSortedBlockInputStream");
const Graphite::Params params;
size_t path_column_num;
size_t time_column_num;
size_t value_column_num;
size_t version_column_num;
/// Все столбцы кроме time, value, version. Они вставляются без модификации.
ColumnNumbers unmodified_column_numbers;
time_t time_of_merge;
/// Прочитали до конца.
bool finished = false;
bool is_first = true;
StringRef current_path;
time_t current_time;
StringRef next_path;
time_t next_time;
UInt64 current_max_version = 0;
const Graphite::Pattern * current_pattern = nullptr;
std::vector<char> place_for_aggregate_state;
const Graphite::Pattern * selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
template <typename TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить значения в результирующие столбцы, которые не будут меняться в дальнейшем.
template <class TSortCursor>
void startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor);
/// Вставить в результирующие столбцы вычисленные значения time, value, version по последней группе строк.
void finishCurrentRow(ColumnPlainPtrs & merged_columns);
/// Обновить состояние агрегатной функции новым значением value.
template <class TSortCursor>
void accumulateRow(TSortCursor & cursor);
};
}
......@@ -11,6 +11,7 @@
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <DB/Storages/MergeTree/MergeTreeDataPart.h>
......@@ -70,6 +71,7 @@ namespace ErrorCodes
* - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций.
* - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются;
* - это позволяет читать данные ровно такими пачками, какими они были записаны.
* - Graphite - выполняет загрубление исторических данных для таблицы Графита - системы количественного мониторинга.
*/
/** Этот класс хранит список кусков и параметры структуры данных.
......@@ -199,6 +201,7 @@ public:
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
};
Mode mode;
......@@ -212,8 +215,13 @@ public:
/// Для Replacing режима. Может быть пустым.
String version_column;
/// Для Graphite режима.
Graphite::Params graphite_params;
/// Проверить наличие и корректность типов столбцов.
void check(const NamesAndTypesList & columns) const;
String getModeName() const;
};
......@@ -246,8 +254,6 @@ public:
/// Загрузить множество кусков с данными с диска. Вызывается один раз - сразу после создания объекта.
void loadDataParts(bool skip_sanity_checks);
std::string getModePrefix() const;
bool supportsSampling() const { return !!sampling_expression; }
bool supportsPrewhere() const { return true; }
......
......@@ -51,10 +51,13 @@ public:
*
* Создаёт и возвращает временный кусок.
* Чтобы закончить мердж, вызовите функцию renameTemporaryMergedPart.
*
* time_of_merge - время, когда мердж был назначен.
* Важно при использовании ReplicatedGraphiteMergeTree для обеспечения одинакового мерджа на репликах.
*/
MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry,
size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::DataPartsVector & parts,
......
......@@ -48,7 +48,7 @@ public:
std::string getName() const override
{
return data.getModePrefix() + "MergeTree";
return data.merging_params.getModeName() + "MergeTree";
}
std::string getTableName() const override { return table_name; }
......
......@@ -94,7 +94,7 @@ public:
std::string getName() const override
{
return "Replicated" + data.getModePrefix() + "MergeTree";
return "Replicated" + data.merging_params.getModeName() + "MergeTree";
}
std::string getTableName() const override { return table_name; }
......
#include <DB/DataStreams/GraphiteRollupSortedBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
{
for (const auto & pattern : params.patterns)
if (!pattern.regexp || pattern.regexp->match(path.data, path.size))
return &pattern;
return nullptr;
}
UInt32 GraphiteRollupSortedBlockInputStream::selectPrecision(const Graphite::Retentions & retentions, time_t time) const
{
/// Рассчитываем на то, что time_t - знаковый тип.
for (const auto & retention : retentions)
if (current_time - time >= static_cast<time_t>(retention.age))
return retention.precision;
/// Без огрубления.
return 1;
}
Block GraphiteRollupSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block merged_block;
ColumnPlainPtrs merged_columns;
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
/// Дополнительная инициализация.
if (!current_path.data)
{
/// Определим максимальный размер состояния агрегатных функций.
size_t max_size_of_aggregate_state = 0;
for (const auto & pattern : params.patterns)
if (pattern.function->sizeOfData() > max_size_of_aggregate_state)
max_size_of_aggregate_state = pattern.function->sizeOfData();
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Запомним номера столбцов.
path_column_num = merged_block.getPositionByName(params.path_column_name);
time_column_num = merged_block.getPositionByName(params.time_column_name);
value_column_num = merged_block.getPositionByName(params.value_column_name);
version_column_num = merged_block.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
}
template <typename TSortCursor>
void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
{
TSortCursor current = queue.top();
next_path = current->sort_columns[path_column_num]->getDataAt(current->pos);
next_time = current->sort_columns[time_column_num]->get64(current->pos);
bool is_new_key;
bool was_first = is_first;
auto prev_pattern = current_pattern;
if (is_first) /// Первый встретившийся ключ.
{
is_first = false;
current_pattern = selectPatternForPath(next_path);
if (current_pattern)
{
current_pattern->function->create(place_for_aggregate_state.data());
UInt32 precision = selectPrecision(current_pattern->retentions, next_time);
next_time = next_time / precision * precision;
}
current_path = next_path;
current_time = next_time;
is_new_key = true;
}
else
{
bool path_differs = next_path != current_path;
if (path_differs)
current_pattern = selectPatternForPath(next_path);
if (current_pattern)
{
UInt32 precision = selectPrecision(current_pattern->retentions, next_time);
next_time = next_time / precision * precision;
}
is_new_key = path_differs || next_time != current_time;
if (is_new_key)
{
if (prev_pattern)
prev_pattern->function->destroy(place_for_aggregate_state.data());
if (current_pattern)
current_pattern->function->create(place_for_aggregate_state.data());
}
}
/// если накопилось достаточно строк и последняя посчитана полностью
if (is_new_key && merged_rows >= max_block_size)
return;
queue.pop();
if (is_new_key)
{
if (!was_first)
finishCurrentRow(merged_columns);
startNextRow(merged_columns, current);
std::swap(current_path, next_path);
std::swap(current_time, next_time);
current_max_version = 0;
if (prev_pattern)
prev_pattern->function->destroy(place_for_aggregate_state.data());
if (current_pattern)
current_pattern->function->create(place_for_aggregate_state.data());
++merged_rows;
}
accumulateRow(current);
current_max_version = std::max(current_max_version, current->sort_columns[version_column_num]->get64(current->pos));
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current, queue);
}
}
/// Запишем данные для последней группы.
++merged_rows;
finishCurrentRow(merged_columns);
finished = true;
if (current_pattern)
{
current_pattern->function->destroy(place_for_aggregate_state.data());
current_pattern = nullptr;
}
}
template <class TSortCursor>
void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor)
{
/// Копируем не модифицированные значения столбцов.
for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
{
size_t j = unmodified_column_numbers[i];
merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos);
}
if (!current_pattern)
merged_columns[value_column_num]->insertFrom(*cursor->all_columns[value_column_num], cursor->pos);
}
void GraphiteRollupSortedBlockInputStream::finishCurrentRow(ColumnPlainPtrs & merged_columns)
{
/// Вставляем вычисленные значения столбцов time, value, version.
merged_columns[time_column_num]->insert(UInt64(current_time));
merged_columns[version_column_num]->insert(current_max_version);
if (current_pattern)
current_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
}
template <class TSortCursor>
void GraphiteRollupSortedBlockInputStream::accumulateRow(TSortCursor & cursor)
{
if (current_pattern)
current_pattern->function->add(place_for_aggregate_state.data(), &cursor->all_columns[value_column_num], cursor->pos);
}
}
......@@ -166,6 +166,26 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
}
}
}
/// TODO Проверки для Graphite
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)
{
case Ordinary: return "";
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
case Unsorted: return "Unsorted";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
}
}
......@@ -180,22 +200,6 @@ Int64 MergeTreeData::getMaxDataPartIndex()
return max_part_id;
}
std::string MergeTreeData::getModePrefix() const
{
switch (merging_params.mode)
{
case MergingParams::Ordinary: return "";
case MergingParams::Collapsing: return "Collapsing";
case MergingParams::Summing: return "Summing";
case MergingParams::Aggregating: return "Aggregating";
case MergingParams::Unsorted: return "Unsorted";
case MergingParams::Replacing: return "Replacing";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(merging_params.mode), ErrorCodes::LOGICAL_ERROR);
}
}
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
......
......@@ -10,6 +10,7 @@
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
#include <DB/DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
......@@ -338,7 +339,7 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
/// parts должны быть отсортированы.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation)
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation)
{
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
......@@ -447,6 +448,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
......@@ -707,6 +714,12 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time(0));
break;
case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
......
......@@ -811,6 +811,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
case MergeTreeData::MergingParams::Unsorted:
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
res.emplace_back(merged);
......
......@@ -27,6 +27,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/StorageSet.h>
#include <DB/Storages/StorageJoin.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
......@@ -39,6 +40,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_STORAGE;
extern const int NO_REPLICA_NAME_GIVEN;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
......@@ -100,6 +103,114 @@ static Names extractColumnNames(const ASTPtr & node)
}
/** Прочитать настройки прореживания старых данных Графита из конфига.
* Пример:
*
* <graphite_rollup>
* <path_column_name>Path</path_column_name>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
* <age>0</age>
* <precision>3600</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>60</precision>
* </retention>
* </pattern>
* <default>
* <function>max</function>
* <retention>
* <age>0</age>
* <precision>60</precision>
* </retention>
* <retention>
* <age>3600</age>
* <precision>300</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>3600</precision>
* </retention>
* </default>
* </graphite_rollup>
*/
static void appendGraphitePattern(const Context & context,
const Poco::Util::AbstractConfiguration & config, const String & config_element, Graphite::Patterns & patterns)
{
Graphite::Pattern pattern;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (key == "regexp")
{
pattern.regexp = std::make_shared<OptimizedRegularExpression>(config.getString(config_element + ".regexp"));
}
else if (key == "function")
{
/// TODO Не только Float64
pattern.function = context.getAggregateFunctionFactory().get(
config.getString(config_element + ".function"), { new DataTypeFloat64 });
}
else if (startsWith(key, "retention"))
{
pattern.retentions.emplace_back(
Graphite::Retention{
.age = config.getUInt(config_element + "." + key + ".age"),
.precision = config.getUInt(config_element + "." + key + ".precision")});
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!pattern.function)
throw Exception("Aggregate function is mandatory for retention patterns in GraphiteMergeTree",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
/// retention-ы должны идти по убыванию возраста.
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });
patterns.emplace_back(pattern);
}
static void setGraphitePatternsFromConfig(const Context & context,
const String & config_element, Graphite::Params & params)
{
const Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
params.value_column_name = config.getString(config_element + ".value_column_name", "Value");
params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (startsWith(key, "pattern"))
{
appendGraphitePattern(context, config, config_element + "." + key, params.patterns);
}
else if (key == "default")
{
/// Ниже.
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (config.has(config_element + ".default"))
appendGraphitePattern(context, config, config_element + "." + ".default", params.patterns);
}
StoragePtr StorageFactory::get(
const String & name,
const String & data_path,
......@@ -352,7 +463,7 @@ StoragePtr StorageFactory::get(
}
else if (endsWith(name, "MergeTree"))
{
/** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing]MergeTree (2 * 6 комбинаций)
/** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 комбинаций)
* В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper
......@@ -366,12 +477,14 @@ StoragePtr StorageFactory::get(
* - (для Summing, не обязательно) кортеж столбцов, которых следует суммировать. Если не задано - используются все числовые столбцы, не входящие в первичный ключ.
* - (для Replacing, не обязательно) имя столбца одного из UInt типов, обозначающего "версию"
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
* - (для Graphite) имена столбцов Path, Time, Value, Version, имеющих специальное значение.
*
* MergeTree(date, [sample_key], primary_key, index_granularity)
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, Path, Time, Value, Version)
* UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже.
*/
......@@ -383,9 +496,9 @@ MergeTrees is different in two ways:
- it may be replicated and non-replicated;
- it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions.
So we have 12 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree
So we have 14 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree
In most of cases, you need MergeTree or ReplicatedMergeTree.
......@@ -469,7 +582,9 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
/// NOTE Слегка запутанно.
size_t num_additional_params = (replicated ? 2 : 0) + (merging_params.mode == MergeTreeData::MergingParams::Collapsing);
size_t num_additional_params = (replicated ? 2 : 0)
+ (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
+ (merging_params.mode == MergeTreeData::MergingParams::Graphite) * 4;
if (merging_params.mode == MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 2)
......@@ -610,6 +725,17 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args.pop_back();
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::Graphite)
{
String graphite_config_name;
if (auto ast = typeid_cast<ASTLiteral *>(&*args.back()))
graphite_config_name = ast->value.get<String>();
else
throw Exception(String("Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
setGraphitePatternsFromConfig(context, graphite_config_name, merging_params.graphite_params);
}
/// Если присутствует выражение для сэмплирования. MergeTree(date, [sample_key], primary_key, index_granularity)
if (args.size() == 4)
......
......@@ -255,7 +255,7 @@ bool StorageMergeTree::merge(size_t aio_threshold, bool aggressive, BackgroundPr
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
auto new_part = merger.mergePartsToTemporaryPart(
merging_tagger->parts, merged_name, *merge_entry, aio_threshold, &*merging_tagger->reserved_space);
merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), &*merging_tagger->reserved_space);
merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr);
......
......@@ -1121,7 +1121,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
auto part = merger.mergePartsToTemporaryPart(
parts, entry.new_part_name, *merge_entry, aio_threshold, reserved_space);
parts, entry.new_part_name, *merge_entry, aio_threshold, entry.create_time, reserved_space);
zkutil::Ops ops;
......@@ -2200,7 +2200,7 @@ bool StorageReplicatedMergeTree::optimize(const Settings & settings)
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
auto new_part = unreplicated_merger->mergePartsToTemporaryPart(
parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io);
parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0));
unreplicated_merger->renameMergedTemporaryPart(parts, new_part, merged_name, nullptr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册