未验证 提交 a8168870 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #7907 from ClickHouse/sample_final

FINAL SAMPLE
......@@ -388,18 +388,18 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
RelativeSize size_of_universum = 0;
DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
DataTypePtr sampling_column_type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
if (typeid_cast<const DataTypeUInt64 *>(type.get()))
if (typeid_cast<const DataTypeUInt64 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt32 *>(type.get()))
else if (typeid_cast<const DataTypeUInt32 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt16 *>(type.get()))
else if (typeid_cast<const DataTypeUInt16 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt8 *>(type.get()))
else if (typeid_cast<const DataTypeUInt8 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
else
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.",
throw Exception("Invalid sampling column type in storage parameters: " + sampling_column_type->getName() + ". Must be unsigned integer type.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
if (settings.parallel_replicas_count > 1)
......@@ -453,13 +453,25 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;
/// If sample and final are used together no need to calculate sampling expression twice.
/// The first time it was calculated for final, because sample key is a part of the PK.
/// So, assume that we already have calculated column.
ASTPtr sampling_key_ast = data.getSamplingKeyAST();
if (select.final())
{
sampling_key_ast = std::make_shared<ASTIdentifier>(data.sampling_expr_column_name);
/// We do spoil available_real_columns here, but it is not used later.
available_real_columns.emplace_back(data.sampling_expr_column_name, std::move(sampling_column_type));
}
if (has_lower_limit)
{
if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.getSamplingKeyAST());
args->children.push_back(sampling_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(lower));
lower_function = std::make_shared<ASTFunction>();
......@@ -476,7 +488,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.getSamplingKeyAST());
args->children.push_back(sampling_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(upper));
upper_function = std::make_shared<ASTFunction>();
......@@ -503,11 +515,16 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
auto syntax_result = SyntaxAnalyzer(context).analyze(query, available_real_columns);
filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
if (!select.final())
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
column_names_to_read.end());
}
}
}
......
count
1000000
count final
666667
count sample
557632
count sample final
371758
count final max_parallel_replicas
666667
drop table if exists sample_final;
create table sample_final (CounterID UInt32, EventDate Date, EventTime DateTime, UserID UInt64, Sign Int8) engine = CollapsingMergeTree(Sign) order by (CounterID, EventDate, intHash32(UserID), EventTime) sample by intHash32(UserID);
insert into sample_final select number / (8192 * 4), toDate('2019-01-01'), toDateTime('2019-01-01 00:00:01') + number, number / (8192 * 2), number % 3 = 1 ? -1 : 1 from numbers(1000000);
select 'count';
select count() from sample_final;
select 'count final';
select count() from sample_final final;
select 'count sample';
select count() from sample_final sample 1/2;
select 'count sample final';
select count() from sample_final final sample 1/2;
select 'count final max_parallel_replicas';
set max_parallel_replicas=2;
select count() from remote('127.0.0.{2|3}', currentDatabase(), sample_final) final;
drop table if exists sample_final;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册