提交 42073f42 编写于 作者: M Michael Kolupaev

clickhouse: added support for SAMPLE clause of SELECT query (works only in...

clickhouse: added support for SAMPLE clause of SELECT query (works only in merge tree storage) [#6201].
上级 143df8a3
......@@ -147,6 +147,7 @@ namespace ErrorCodes
EXCESSIVE_ELEMENT_IN_CONFIG,
NO_ELEMENTS_IN_CONFIG,
ALL_REQUESTED_COLUMNS_ARE_MISSING,
SAMPLING_NOT_SUPPORTED,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,
......
......@@ -15,6 +15,7 @@ public:
ASTPtr select_expression_list;
ASTPtr database;
ASTPtr table; /// Идентификатор или подзапрос (рекурсивно ASTSelectQuery)
ASTPtr sample_size;
ASTPtr where_expression;
ASTPtr group_expression_list;
ASTPtr having_expression;
......@@ -37,6 +38,7 @@ public:
if (select_expression_list) { res->select_expression_list = select_expression_list->clone(); res->children.push_back(res->select_expression_list); }
if (database) { res->database = database->clone(); res->children.push_back(res->database); }
if (table) { res->table = table->clone(); res->children.push_back(res->table); }
if (sample_size) { res->sample_size = sample_size->clone(); res->children.push_back(res->sample_size); }
if (where_expression) { res->where_expression = where_expression->clone(); res->children.push_back(res->where_expression); }
if (group_expression_list) { res->group_expression_list = group_expression_list->clone(); res->children.push_back(res->group_expression_list); }
if (having_expression) { res->having_expression = having_expression->clone(); res->children.push_back(res->having_expression); }
......
......@@ -49,6 +49,10 @@ public:
*/
virtual bool isRemote() const { return false; }
/** Возвращает true, если хранилище поддерживает запросы с секцией SAMPLE.
*/
virtual bool supportsSampling() const { return false; }
/** Читать набор столбцов из таблицы.
* Принимает список столбцов, которых нужно прочитать, а также описание запроса,
* из которого может быть извлечена информация о том, каким способом извлекать данные
......
......@@ -154,6 +154,7 @@ struct Range
class PKCondition
{
public:
/// Не учитывает секцию SAMPLE.
PKCondition(ASTPtr query, const Context & context, const SortDescription & sort_descr);
/// Выполнимо ли условие в диапазоне ключей.
......@@ -169,6 +170,10 @@ public:
return rpn.size() == 1 && rpn[0].function == RPNElement::FUNCTION_UNKNOWN;
}
/// Наложить дополнительное условие: значение в столбце column должно быть в диапазоне range.
/// Возвращает, есть ли такой столбец в первичном ключе.
bool addCondition(const String & column, const Range & range);
String toString();
private:
/// Выражение хранится в виде обратной польской строки (Reverse Polish Notation).
......@@ -189,6 +194,8 @@ private:
RPNElement() {}
RPNElement(Function function_) : function(function_) {}
RPNElement(Function function_, size_t key_column_) : function(function_), key_column(key_column_) {}
RPNElement(Function function_, size_t key_column_, const Range & range_)
: function(function_), key_column(key_column_), range(range_){}
String toString()
{
......
......@@ -108,7 +108,9 @@ public:
*/
StorageMergeTree(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_, const String & date_column_name_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const String & sampling_column_name_, /// "", если семплирование не поддерживается.
size_t index_granularity_,
const String & sign_column_ = "",
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
......@@ -117,6 +119,7 @@ public:
std::string getName() const { return "MergeTree"; }
std::string getTableName() const { return name; }
bool supportsSampling() const { return sampling_column_name != ""; }
const NamesAndTypesList & getColumnsList() const { return *columns; }
......@@ -155,6 +158,7 @@ private:
Context context;
ASTPtr primary_expr_ast;
String date_column_name;
String sampling_column_name; /// "", если семплирование не поддерживается.
size_t index_granularity;
size_t min_marks_for_seek;
......
......@@ -220,6 +220,9 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
else
interpreter_subquery = new InterpreterSelectQuery(query.table, context);
if (query.sample_size && (!table || !table->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
/** При распределённой обработке запроса, в потоках почти не делается вычислений,
* а делается ожидание и получение данных с удалённых серверов.
* Если у нас 20 удалённых серверов, а max_threads = 8, то было бы не очень хорошо
......
......@@ -22,6 +22,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
ParserString s_select("SELECT", true, true);
ParserString s_from("FROM", true, true);
ParserString s_where("WHERE", true, true);
ParserString s_sample("SAMPLE", true, true);
ParserString s_group("GROUP", true, true);
ParserString s_by("BY", true, true);
ParserString s_having("HAVING", true, true);
......@@ -93,6 +94,19 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
return false;
}
/// SAMPLE number
if (s_sample.ignore(pos, end, expected))
{
ws.ignore(pos, end);
ParserNumber num;
if (!num.parse(pos, end, select_query->sample_size, expected))
return false;
ws.ignore(pos, end);
}
/// WHERE expr
if (s_where.ignore(pos, end, expected))
{
......@@ -183,6 +197,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
select_query->children.push_back(select_query->database);
if (select_query->table)
select_query->children.push_back(select_query->table);
if (select_query->sample_size)
select_query->children.push_back(select_query->sample_size);
if (select_query->where_expression)
select_query->children.push_back(select_query->where_expression);
if (select_query->group_expression_list)
......
......@@ -39,6 +39,15 @@ PKCondition::PKCondition(ASTPtr query, const Context & context_, const SortDescr
}
}
bool PKCondition::addCondition(const String & column, const Range & range)
{
if (!pk_columns.count(column))
return false;
rpn.push_back(RPNElement(RPNElement::FUNCTION_IN_RANGE, pk_columns[column], range));
rpn.push_back(RPNElement(RPNElement::FUNCTION_AND));
return true;
}
/** Получить значение константного выражения.
* Вернуть false, если выражение не константно.
*/
......
......@@ -158,9 +158,11 @@ StoragePtr StorageFactory::get(
{
/** В качестве аргумента для движка должно быть указано:
* - имя столбца с датой;
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки в скобках;
* - index_granularity.
* Например: ENGINE = MergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), EventTime), 8192).
* Например: ENGINE = MergeTree(EventDate, intHash32(UniqID), (CounterID, EventDate, intHash32(UniqID), EventTime), 8192).
*
*/
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
......@@ -171,14 +173,17 @@ StoragePtr StorageFactory::get(
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3)
throw Exception("Storage MergeTree requires exactly 3 parameters"
" - name of column with date, primary key expression, index granularity.",
if (args.size() != 3 && args.size() != 4)
throw Exception("Storage MergeTree requires 3 or 4 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
size_t arg_offset = args.size() - 3;
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[2]).value);
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[1]);
String sampling_column_name = arg_offset == 0 ? "" : dynamic_cast<ASTIdentifier &>(*args[1]).name;
UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[arg_offset + 1]);
if (primary_expr_func.name != "tuple")
throw Exception("Primary expression for storage MergeTree must be in parentheses.",
......@@ -186,12 +191,13 @@ StoragePtr StorageFactory::get(
ASTPtr primary_expr = primary_expr_func.children.at(0);
return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, index_granularity);
return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, sampling_column_name, index_granularity);
}
else if (name == "CollapsingMergeTree")
{
/** В качестве аргумента для движка должно быть указано:
* - имя столбца с датой;
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки в скобках;
* - index_granularity;
* - имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
......@@ -206,15 +212,18 @@ StoragePtr StorageFactory::get(
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 4)
throw Exception("Storage CollapsingMergeTree requires exactly 4 parameters"
" - name of column with date, primary key expression, index granularity, sign_column.",
if (args.size() != 4 && args.size() != 5)
throw Exception("Storage CollapsingMergeTree requires 4 or 5 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity, sign_column.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
size_t arg_offset = args.size() - 4;
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[2]).value);
String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[3]).name;
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[1]);
String sampling_column_name = arg_offset == 0 ? "" : dynamic_cast<ASTIdentifier &>(*args[1]).name;
UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[arg_offset + 3]).name;
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[arg_offset + 1]);
if (primary_expr_func.name != "tuple")
throw Exception("Primary expression for storage CollapsingMergeTree must be in parentheses.",
......@@ -222,7 +231,7 @@ StoragePtr StorageFactory::get(
ASTPtr primary_expr = primary_expr_func.children.at(0);
return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, index_granularity, sign_column_name);
return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, sampling_column_name, index_granularity, sign_column_name);
}
else if (name == "SystemNumbers")
{
......
......@@ -29,11 +29,13 @@
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTFunction.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/sortBlock.h>
......@@ -760,13 +762,15 @@ private:
StorageMergeTree::StorageMergeTree(
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_, const String & date_column_name_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_, const String & sampling_column_name_,
size_t index_granularity_,
const String & sign_column_,
const StorageMergeTreeSettings & settings_)
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_),
context(context_), primary_expr_ast(primary_expr_ast_->clone()),
date_column_name(date_column_name_), index_granularity(index_granularity_),
date_column_name(date_column_name_), sampling_column_name(sampling_column_name_),
index_granularity(index_granularity_),
sign_column(sign_column_),
settings(settings_),
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name))
......@@ -819,6 +823,30 @@ BlockInputStreams StorageMergeTree::read(
PKCondition key_condition(query, context, sort_descr);
PKCondition date_condition(query, context, SortDescription(1, SortColumnDescription(date_column_name, 1)));
size_t count_limit = std::numeric_limits<size_t>::max();
bool sample_by_value = false;
UInt64 sample_column_value_limit;
ASTSelectQuery & select = *dynamic_cast<ASTSelectQuery*>(&*query);
if (select.sample_size)
{
double size = boost::apply_visitor(FieldVisitorConvertToNumber<double>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
if (size < 0)
throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (size > 1)
{
count_limit = boost::apply_visitor(FieldVisitorConvertToNumber<UInt64>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
}
else
{
sample_by_value = true;
sample_column_value_limit = static_cast<UInt64>(size * std::numeric_limits<UInt32>::max());
if (!key_condition.addCondition(sampling_column_name,
Range::RightBounded(sample_column_value_limit, true)))
throw Exception("Invalid sampling column in storage parameters", ErrorCodes::ILLEGAL_COLUMN);
}
}
LOG_DEBUG(log, "key condition: " << key_condition.toString());
LOG_DEBUG(log, "date condition: " << date_condition.toString());
......@@ -855,14 +883,54 @@ BlockInputStreams StorageMergeTree::read(
for (size_t j = 0; j < ranges.ranges.size(); ++j)
{
sum_marks += ranges.ranges[j].end - ranges.ranges[j].begin;
/// Если нашли достаточно строк.
if (sum_marks * index_granularity >= count_limit)
{
MarkRanges & new_ranges = parts_with_ranges.back().ranges;
/// Обрежем этот отрезок.
new_ranges[j].end -= count_limit - sum_marks * index_granularity;
/// Удалим вссе последующие отрезки.
new_ranges.erase(new_ranges.begin() + j + 1, new_ranges.end());
}
}
/// Если нашли достаточно строк, дальше можено не смотреть.
if (sum_marks * index_granularity >= count_limit)
break;
}
}
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
return spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names, max_block_size);
BlockInputStreams res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names, max_block_size);
if (sample_by_value)
{
/// Добавим фильтрацию: sampling_column_name <= sample_column_value_limit
ASTPtr filter_function_args = new ASTExpressionList;
filter_function_args->children.push_back(new ASTIdentifier(StringRange(), sampling_column_name));
filter_function_args->children.push_back(new ASTLiteral(StringRange(), sample_column_value_limit));
Poco::SharedPtr<ASTFunction> filter_function;
filter_function->name = "lessOrEquals";
filter_function->arguments = filter_function_args;
filter_function->children.push_back(filter_function->arguments);
ExpressionPtr filter_expression = new Expression(filter_function, context);
for (size_t i = 0; i < res.size(); ++i)
{
BlockInputStreamPtr original_stream = res[i];
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream(original_stream, filter_expression);
BlockInputStreamPtr filter_stream = new FilterBlockInputStream(expression_stream, filter_function->getColumnName());
res[i] = filter_stream;
}
}
return res;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册