提交 2cc0aae8 编写于 作者: A Alexey Zatelepin

rearrange AST members in MergeTreeData [#CLICKHOUSE-3859]

上级 7167bfd7
......@@ -102,11 +102,11 @@ static NameSet getKeyColumns(const StoragePtr & storage)
for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns())
key_columns.insert(col);
auto sorting_key_expr = merge_tree_data->getSortingKeyExpression();
auto sorting_key_expr = merge_tree_data->sorting_key_expr;
if (sorting_key_expr)
for (const String & col : sorting_key_expr->getRequiredColumns())
key_columns.insert(col);
/// We don't process sampling_expression separately because it must be among the primary key columns.
/// We don't process sample_by_ast separately because it must be among the primary key columns.
if (!merge_tree_data->merging_params.sign_column.empty())
key_columns.insert(merge_tree_data->merging_params.sign_column);
......
......@@ -90,7 +90,7 @@ MergeTreeData::MergeTreeData(
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_,
const ASTPtr & sample_by_ast_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool require_part_metadata_,
......@@ -98,11 +98,11 @@ MergeTreeData::MergeTreeData(
BrokenPartCallback broken_part_callback_)
: ITableDeclaration{columns_},
context(context_),
sampling_expression(sampling_expression_),
index_granularity(settings_.index_granularity),
merging_params(merging_params_),
index_granularity(settings_.index_granularity),
settings(settings_),
partition_by_ast(partition_by_ast_),
sample_by_ast(sample_by_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_path(full_path_),
......@@ -116,14 +116,16 @@ MergeTreeData::MergeTreeData(
setPrimaryKey(order_by_ast_, primary_key_ast_);
if (sampling_expression)
if (sample_by_ast)
{
if (!primary_key_sample.has(sampling_expression->getColumnName())
sampling_expr_column_name = sample_by_ast->getColumnName();
if (!primary_key_sample.has(sampling_expr_column_name)
&& !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
columns_required_for_sampling = ExpressionAnalyzer(
sampling_expression, context, nullptr, getColumns().getAllPhysical())
sample_by_ast, context, nullptr, getColumns().getAllPhysical())
.getRequiredSourceColumns();
}
......@@ -943,7 +945,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
for (const String & col : sorting_key_expr->getRequiredColumns())
columns_alter_metadata_only.insert(col);
/// We don't process sampling_expression separately because it must be among the primary key columns
/// We don't process sample_by_ast separately because it must be among the primary key columns
/// and we don't process primary_key_expr separately because it is a prefix of sorting_key_expr.
}
......
......@@ -305,7 +305,7 @@ public:
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool require_part_metadata_,
......@@ -315,7 +315,6 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
bool supportsSampling() const { return sampling_expression != nullptr; }
bool supportsPrewhere() const { return true; }
bool supportsFinal() const
......@@ -502,16 +501,14 @@ public:
static ASTPtr extractKeyExpressionList(const ASTPtr & node);
bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
ExpressionActionsPtr getPrimaryKeyExpression() const { return primary_key_expr; }
Names getPrimaryKeyColumns() const { return primary_key_columns; }
bool hasSortingKey() const { return !sorting_key_columns.empty(); }
ExpressionActionsPtr getSortingKeyExpression() const { return sorting_key_expr; }
Names getSortingKeyColumns() const { return sorting_key_columns; }
Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; }
Names getColumnsRequiredForFinal() const { return sorting_key_expr->getRequiredColumns(); }
bool supportsSampling() const { return sample_by_ast != nullptr; }
ASTPtr getSamplingExpression() const { return sample_by_ast; }
Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
......@@ -560,21 +557,10 @@ public:
MergeTreeDataFormatVersion format_version;
Context & context;
const ASTPtr sampling_expression;
const size_t index_granularity;
/// Merging params - what additional actions to perform during merge.
const MergingParams merging_params;
const MergeTreeSettings settings;
ASTPtr order_by_ast;
ASTPtr primary_key_ast;
Block primary_key_sample;
DataTypes primary_key_data_types;
ASTPtr partition_by_ast;
ExpressionActionsPtr partition_key_expr;
Block partition_key_sample;
......@@ -583,6 +569,22 @@ public:
DataTypes minmax_idx_column_types;
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
/// Names of columns for primary key + secondary sorting columns.
Names sorting_key_columns;
ExpressionActionsPtr sorting_key_expr;
/// Names of columns for primary key.
Names primary_key_columns;
ExpressionActionsPtr primary_key_expr;
Block primary_key_sample;
DataTypes primary_key_data_types;
String sampling_expr_column_name;
Names columns_required_for_sampling;
const size_t index_granularity;
const MergeTreeSettings settings;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};
......@@ -592,22 +594,17 @@ public:
private:
friend struct MergeTreeDataPart;
friend class StorageMergeTree;
friend class ReplicatedMergeTreeAlterThread;
friend class MergeTreeDataMergerMutator;
friend class StorageMergeTree;
friend class StorageReplicatedMergeTree;
friend class MergeTreeDataMergerMutator;
friend class ReplicatedMergeTreeAlterThread;
friend struct ReplicatedMergeTreeTableMetadata;
bool require_part_metadata;
ExpressionActionsPtr sorting_key_expr;
/// Names of columns for primary key + secondary sorting columns.
Names sorting_key_columns;
ExpressionActionsPtr primary_key_expr;
/// Names of columns for primary key.
Names primary_key_columns;
ASTPtr partition_by_ast;
ASTPtr order_by_ast;
ASTPtr primary_key_ast;
const ASTPtr sample_by_ast;
Names columns_required_for_sampling;
bool require_part_metadata;
String database_name;
String table_name;
......
......@@ -549,7 +549,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
all_columns, data.getSortingKeyExpression(),
all_columns, data.sorting_key_expr,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
......@@ -632,12 +632,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (data.hasPrimaryKey())
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getSortingKeyExpression())));
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.sorting_key_expr)));
else
src_streams.emplace_back(std::move(input));
}
Names sort_columns = data.getSortingKeyColumns();
Names sort_columns = data.sorting_key_columns;
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
......@@ -905,7 +905,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (data.hasPrimaryKey())
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.getPrimaryKeyExpression()));
std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_expr));
MergeTreeDataPart::MinMaxIndex minmax_idx;
......
......@@ -211,11 +211,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
data.check(real_column_names);
const Settings & settings = context.getSettingsRef();
Names primary_key_columns = data.getPrimaryKeyColumns();
Names primary_key_columns = data.primary_key_columns;
KeyCondition key_condition(
query_info, context, available_real_and_virtual_columns,
primary_key_columns, data.getPrimaryKeyExpression());
primary_key_columns, data.primary_key_expr);
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
......@@ -372,14 +372,14 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
if (use_sampling)
{
if (!data.sampling_expression)
if (!data.supportsSampling())
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
RelativeSize size_of_universum = 0;
DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expression->getColumnName()).type;
DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
if (typeid_cast<const DataTypeUInt64 *>(type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
......@@ -446,11 +446,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
if (has_lower_limit)
{
if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createLeftBounded(lower, true)))
if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.sampling_expression);
args->children.push_back(data.getSamplingExpression());
args->children.push_back(std::make_shared<ASTLiteral>(lower));
lower_function = std::make_shared<ASTFunction>();
......@@ -463,11 +463,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
if (has_upper_limit)
{
if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createRightBounded(upper, false)))
if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createRightBounded(upper, false)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.sampling_expression);
args->children.push_back(data.getSamplingExpression());
args->children.push_back(std::make_shared<ASTLiteral>(upper));
upper_function = std::make_shared<ASTFunction>();
......@@ -492,7 +492,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
filter_expression = ExpressionAnalyzer(filter_function, context, nullptr, available_real_columns).getActions(false);
/// Add columns needed for `sampling_expression` to `column_names_to_read`.
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
......@@ -554,7 +554,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
if (select.final())
{
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = data.getSortingKeyExpression()->getRequiredColumns();
std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.sign_column.empty())
......@@ -782,10 +782,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getSortingKeyExpression()));
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.sorting_key_expr));
}
Names sort_columns = data.getSortingKeyColumns();
Names sort_columns = data.sorting_key_columns;
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
......
......@@ -176,9 +176,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// If we need to calculate some columns to sort.
if (data.hasSortingKey())
data.getSortingKeyExpression()->execute(block);
data.sorting_key_expr->execute(block);
Names sort_columns = data.getSortingKeyColumns();
Names sort_columns = data.sorting_key_columns;
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
......
......@@ -40,7 +40,7 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
const MergeTreeData & data,
const Names & column_names,
Logger * log)
: primary_key_columns{ext::collection_cast<std::unordered_set>(data.getPrimaryKeyColumns())},
: primary_key_columns{ext::collection_cast<std::unordered_set>(data.primary_key_columns)},
table_columns{ext::map<std::unordered_set>(data.getColumns().getAllPhysical(),
[] (const NameAndTypePair & col) { return col.name; })},
block_with_constants{KeyCondition::getBlockWithConstants(query_info.query, context, data.getColumns().getAllPhysical())},
......
......@@ -398,7 +398,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
WrittenOffsetColumns offset_columns;
auto primary_key_column_names = storage.getPrimaryKeyColumns();
auto primary_key_column_names = storage.primary_key_columns;
/// Here we will add the columns related to the Primary Key, then write the index.
std::vector<ColumnWithTypeAndName> primary_key_columns(primary_key_column_names.size());
......
......@@ -27,7 +27,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos];
sampling_expression = formattedAST(data.sampling_expression);
sampling_expression = formattedAST(data.sample_by_ast);
index_granularity = data.index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
......
......@@ -549,7 +549,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ASTPtr partition_by_ast;
ASTPtr order_by_ast;
ASTPtr primary_key_ast;
ASTPtr sampling_expression;
ASTPtr sample_by_ast;
MergeTreeSettings storage_settings = args.context.getMergeTreeSettings();
if (is_extended_storage_def)
......@@ -568,7 +568,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
primary_key_ast = args.storage_def->primary_key->ptr();
if (args.storage_def->sample_by)
sampling_expression = args.storage_def->sample_by->ptr();
sample_by_ast = args.storage_def->sample_by->ptr();
storage_settings.loadFromQuery(*args.storage_def);
}
......@@ -577,7 +577,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity)
if (engine_args.size() == 4)
{
sampling_expression = engine_args[1];
sample_by_ast = engine_args[1];
engine_args.erase(engine_args.begin() + 1);
}
......@@ -606,13 +606,13 @@ static StoragePtr create(const StorageFactory::Arguments & args)
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
args.columns,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sampling_expression, merging_params, storage_settings,
sample_by_ast, merging_params, storage_settings,
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name, args.columns, args.attach,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sampling_expression, merging_params, storage_settings,
sample_by_ast, merging_params, storage_settings,
args.has_force_restore_data_flag);
}
......
......@@ -50,7 +50,7 @@ StorageMergeTree::StorageMergeTree(
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
......@@ -59,7 +59,7 @@ StorageMergeTree::StorageMergeTree(
data(database_name, table_name,
full_path, columns_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sampling_expression_, merging_params_,
sample_by_ast_, merging_params_,
settings_, false, attach),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
......
......@@ -170,7 +170,7 @@ protected:
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
......
......@@ -201,7 +201,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_,
const ASTPtr & sample_by_ast_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
......@@ -213,7 +213,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data(database_name, table_name,
full_path, columns_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sampling_expression_, merging_params_,
sample_by_ast_, merging_params_,
settings_, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this),
......
......@@ -533,7 +533,7 @@ protected:
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sampling_expression_,
const ASTPtr & sample_by_ast_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册