提交 62f2c17a 编写于 作者: A alesapin

Secondary indices in StorageInMemoryMetadata

上级 2fac3290
......@@ -356,7 +356,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
const IAST & args = *func->arguments;
const ASTPtr & left_in_operand = args.children.at(0);
if (storage()->mayBenefitFromIndexForIn(left_in_operand, context))
if (storage()->mayBenefitFromIndexForIn(left_in_operand, context, metadata_snapshot))
{
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
......
......@@ -11,7 +11,6 @@
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/DatabaseCatalog.h>
namespace DB
{
......@@ -32,6 +31,9 @@ class ASTExpressionList;
class ASTSelectQuery;
struct ASTTablesInSelectQueryElement;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
/// Create columns in block or return false if not possible
bool sanitizeBlock(Block & block);
......@@ -232,11 +234,14 @@ public:
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_,
const StorageMetadataPtr & metadata_snapshot_,
const NameSet & required_result_columns_ = {},
bool do_global_ = false,
const SelectQueryOptions & options_ = {})
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, options_.subquery_depth, do_global_)
, required_result_columns(required_result_columns_), query_options(options_)
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, options_.subquery_depth, do_global_)
, metadata_snapshot(metadata_snapshot_)
, required_result_columns(required_result_columns_)
, query_options(options_)
{
}
......@@ -260,6 +265,7 @@ public:
void appendProjectResult(ExpressionActionsChain & chain) const;
private:
StorageMetadataPtr metadata_snapshot;
/// If non-empty, ignore all expressions not from this list.
NameSet required_result_columns;
SelectQueryOptions query_options;
......
......@@ -130,7 +130,7 @@ String InterpreterSelectQuery::generateFilterActions(
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = SyntaxAnalyzer(*context).analyzeSelect(query_ast, SyntaxAnalyzerResult({}, storage));
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context);
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context, metadata_snapshot);
actions = analyzer.simpleSelectActions();
return expr_list->children.at(0)->getColumnName();
......@@ -336,7 +336,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
context->getQueryContext().addScalar(it.first, it.second);
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, *context,
query_ptr, syntax_analyzer_result, *context, metadata_snapshot,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
!options.only_analyze, options);
......
......@@ -294,8 +294,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
const ColumnsDescription & columns_desc = storage->getColumns();
const IndicesDescription & indices_desc = storage->getSecondaryIndices();
const ColumnsDescription & columns_desc = metadata_snapshot->getColumns();
const IndicesDescription & indices_desc = metadata_snapshot->getSecondaryIndices();
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
NameSet updated_columns;
......
......@@ -37,17 +37,6 @@ const ColumnsDescription & IStorage::getColumns() const
return metadata->columns;
}
const IndicesDescription & IStorage::getSecondaryIndices() const
{
return metadata->secondary_indices;
}
bool IStorage::hasSecondaryIndices() const
{
return !metadata->secondary_indices.empty();
}
namespace
{
#if !defined(ARCADIA_BUILD)
......
......@@ -140,10 +140,6 @@ public:
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const; /// returns combined set of columns
const IndicesDescription & getSecondaryIndices() const;
/// Has at least one non primary index
bool hasSecondaryIndices() const;
/// Storage settings
ASTPtr getSettingsChanges() const;
......@@ -413,7 +409,7 @@ public:
virtual bool supportsIndexForIn() const { return false; }
/// Provides a hint that the storage engine may evaluate the IN-condition by using an index.
virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, const Context & /* query_context */) const { return false; }
virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, const Context & /* query_context */, const StorageMetadataPtr & /* metadata_snapshot */) const { return false; }
/// Checks validity of the data
virtual CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) { throw Exception("Check query is not supported for " + getName() + " storage", ErrorCodes::NOT_IMPLEMENTED); }
......
......@@ -408,14 +408,14 @@ ExpressionActionsPtr getCombinedIndicesExpression(
}
ExpressionActionsPtr MergeTreeData::getPrimaryKeyAndSkipIndicesExpression() const
ExpressionActionsPtr MergeTreeData::getPrimaryKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const
{
return getCombinedIndicesExpression(getPrimaryKey(), getSecondaryIndices(), getColumns(), global_context);
return getCombinedIndicesExpression(getPrimaryKey(), metadata_snapshot->getSecondaryIndices(), metadata_snapshot->getColumns(), global_context);
}
ExpressionActionsPtr MergeTreeData::getSortingKeyAndSkipIndicesExpression() const
ExpressionActionsPtr MergeTreeData::getSortingKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const
{
return getCombinedIndicesExpression(getSortingKey(), getSecondaryIndices(), getColumns(), global_context);
return getCombinedIndicesExpression(getSortingKey(), metadata_snapshot->getSecondaryIndices(), metadata_snapshot->getColumns(), global_context);
}
......@@ -1237,9 +1237,10 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
{
/// Check that needed transformations can be applied to the list of columns without considering type conversions.
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
commands.apply(new_metadata, global_context);
if (getSecondaryIndices().empty() && !new_metadata.secondary_indices.empty() &&
!settings.allow_experimental_data_skipping_indices)
if (old_metadata.getSecondaryIndices().empty() && !new_metadata.secondary_indices.empty()
&& !settings.allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
......@@ -1259,7 +1260,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
columns_alter_type_forbidden.insert(col);
}
for (const auto & index : getSecondaryIndices())
for (const auto & index : old_metadata.getSecondaryIndices())
{
for (const String & col : index.expression->getRequiredColumns())
columns_alter_type_forbidden.insert(col);
......@@ -2932,7 +2933,8 @@ bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const A
return false;
}
bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const
bool MergeTreeData::mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context &, const StorageMetadataPtr & metadata_snapshot) const
{
/// Make sure that the left side of the IN operator contain part of the key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple
......@@ -2945,7 +2947,7 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, con
{
if (isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(item))
return true;
for (const auto & index : getSecondaryIndices())
for (const auto & index : metadata_snapshot->getSecondaryIndices())
if (index_wrapper_factory.get(index)->mayBenefitFromIndexForIn(item))
return true;
}
......@@ -2954,7 +2956,7 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, con
}
else
{
for (const auto & index : getSecondaryIndices())
for (const auto & index : metadata_snapshot->getSecondaryIndices())
if (index_wrapper_factory.get(index)->mayBenefitFromIndexForIn(left_in_operand))
return true;
......
......@@ -350,7 +350,8 @@ public:
bool supportsSettings() const override { return true; }
NamesAndTypesList getVirtuals() const override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
bool
mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &, const StorageMetadataPtr & metadata_snapshot) const override;
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
......@@ -643,8 +644,8 @@ public:
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column.
ExpressionActionsPtr getPrimaryKeyAndSkipIndicesExpression() const;
ExpressionActionsPtr getSortingKeyAndSkipIndicesExpression() const;
ExpressionActionsPtr getPrimaryKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const;
ExpressionActionsPtr getSortingKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const;
std::optional<TTLDescription> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
......
......@@ -612,7 +612,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
NamesAndTypesList merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
storage_columns, data.getSortingKey().expression, data.getSecondaryIndices(),
storage_columns, data.getSortingKey().expression, metadata_snapshot->getSecondaryIndices(),
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, disk);
......@@ -798,10 +798,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
if (data.hasSecondaryIndices())
if (metadata_snapshot->hasSecondaryIndices())
{
const auto & indices = data.getSecondaryIndices();
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, indices.getSingleExpressionForIndices(data.getColumns(), data.global_context));
const auto & indices = metadata_snapshot->getSecondaryIndices();
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, indices.getSingleExpressionForIndices(metadata_snapshot->getColumns(), data.global_context));
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
}
......@@ -810,7 +810,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
new_data_part,
metadata_snapshot,
merging_columns,
index_factory.getMany(data.getSecondaryIndices()),
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
compression_codec,
merged_column_to_size,
data_settings->min_merge_bytes_to_use_direct_io,
......@@ -1084,7 +1084,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// All columns from part are changed and may be some more that were missing before in part
if (isCompactPart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList()))
{
auto part_indices = getIndicesForNewDataPart(data.getSecondaryIndices(), for_file_renames);
auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames);
mutateAllPartColumns(
new_data_part,
metadata_snapshot,
......@@ -1101,7 +1101,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else /// TODO: check that we modify only non-key columns in this case.
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
auto indices_to_recalc = getIndicesToRecalculate(in, updated_header.getNamesAndTypesList(), context);
auto indices_to_recalc = getIndicesToRecalculate(in, updated_header.getNamesAndTypesList(), metadata_snapshot, context);
NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension);
NameToNameVector files_to_rename = collectFilesForRenames(source_part, for_file_renames, mrk_extension);
......@@ -1524,6 +1524,7 @@ MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(
std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
BlockInputStreamPtr & input_stream,
const NamesAndTypesList & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) const
{
/// Checks if columns used in skipping indexes modified.
......@@ -1532,7 +1533,7 @@ std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & col : updated_columns.getNames())
{
const auto & indices = data.getSecondaryIndices();
const auto & indices = metadata_snapshot->getSecondaryIndices();
for (size_t i = 0; i < indices.size(); ++i)
{
const auto & index = indices[i];
......@@ -1597,9 +1598,9 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (data.hasPrimaryKey() || data.hasSecondaryIndices())
if (data.hasPrimaryKey() || metadata_snapshot->hasSecondaryIndices())
mutating_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression()));
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression(metadata_snapshot)));
if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);
......
......@@ -177,6 +177,7 @@ private:
std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
BlockInputStreamPtr & input_stream,
const NamesAndTypesList & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) const;
/// Override all columns of new part using mutating_stream
......
......@@ -550,7 +550,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
for (const auto & index : data.getSecondaryIndices())
for (const auto & index : metadata_snapshot->getSecondaryIndices())
{
auto index_helper = MergeTreeIndexFactory::instance().get(index);
auto condition = index_helper->createIndexCondition(query_info, context);
......
......@@ -262,8 +262,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
new_data_part->volume->getDisk()->createDirectories(full_path);
/// If we need to calculate some columns to sort.
if (data.hasSortingKey() || data.hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression()->execute(block);
if (data.hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = data.getSortingKeyColumns();
SortDescription sort_description;
......@@ -302,7 +302,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(data.getSecondaryIndices()), compression_codec);
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);
......
......@@ -164,7 +164,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
return;
std::unordered_set<String> skip_indexes_column_names_set;
for (const auto & index : storage.getSecondaryIndices())
for (const auto & index : metadata_snapshot->getSecondaryIndices())
std::copy(index.column_names.cbegin(), index.column_names.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
......
......@@ -35,9 +35,10 @@ public:
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context);
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
NamesAndTypesList getVirtuals() const override
......
......@@ -447,7 +447,8 @@ BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Storag
}
bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
bool StorageBuffer::mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
{
if (!destination_id)
return false;
......@@ -457,7 +458,8 @@ bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, con
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->mayBenefitFromIndexForIn(left_in_operand, query_context);
/// TODO alesap (check destination metadata)
return destination->mayBenefitFromIndexForIn(left_in_operand, query_context, destination->getInMemoryMetadataPtr());
}
......
......@@ -84,7 +84,7 @@ public:
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) const override;
......
......@@ -26,9 +26,11 @@ public:
bool supportsFinal() const override { return getTargetTable()->supportsFinal(); }
bool supportsIndexForIn() const override { return getTargetTable()->supportsIndexForIn(); }
bool supportsParallelInsert() const override { return getTargetTable()->supportsParallelInsert(); }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & /* metadata_snapshot */) const override
{
return getTargetTable()->mayBenefitFromIndexForIn(left_in_operand, query_context);
auto target_table = getTargetTable();
auto metadata_snapshot = target_table->getInMemoryMetadataPtr();
return target_table->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......
......@@ -81,7 +81,7 @@ bool StorageMerge::isRemote() const
}
bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
{
/// It's beneficial if it is true for at least one table.
StorageListWithLocks selected_tables = getSelectedTables(
......@@ -90,7 +90,9 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
size_t i = 0;
for (const auto & table : selected_tables)
{
if (std::get<0>(table)->mayBenefitFromIndexForIn(left_in_operand, query_context))
auto storage_ptr = std::get<0>(table);
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
if (storage_ptr->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot))
return true;
++i;
......
......@@ -44,7 +44,8 @@ public:
/// the structure of sub-tables is not checked
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override;
private:
String source_database;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册