提交 dffdece3 编写于 作者: A alesapin

getColumns in StorageInMemoryMetadta (only compilable)

上级 ef8781cc
......@@ -49,7 +49,7 @@ std::ostream & operator<<(std::ostream & stream, const IStorage & what)
{
auto table_id = what.getStorageID();
stream << "IStorage(name = " << what.getName() << ", tableName = " << table_id.table_name << ") {"
<< what.getColumns().getAllPhysical().toString() << "}";
<< what.getInMemoryMetadataPtr()->getColumns().getAllPhysical().toString() << "}";
return stream;
}
......
......@@ -21,7 +21,11 @@ namespace ErrorCodes
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context, const ASTPtr & input_function)
const ASTPtr & ast,
ReadBuffer * input_buffer_tail_part,
const Block & header,
const Context & context,
const ASTPtr & input_function)
{
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
......@@ -59,7 +63,8 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
{
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
auto column_defaults = storage->getColumns().getDefaults();
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto column_defaults = metadata_snapshot->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
}
......
......@@ -11,6 +11,8 @@ namespace DB
struct BlockIO;
class Context;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
/** Prepares an input stream which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly
......@@ -19,7 +21,8 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast,
InputStreamFromASTInsertQuery(
const ASTPtr & ast,
ReadBuffer * input_buffer_tail_part,
const Block & header,
const Context & context,
......
......@@ -79,6 +79,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();
query = dependent_metadata_snapshot->getSelectQuery().inner_query;
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
......@@ -90,7 +91,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Insert only columns returned by select.
auto list = std::make_shared<ASTExpressionList>();
const auto & inner_table_columns = inner_table->getColumns();
const auto & inner_table_columns = inner_metadata_snapshot->getColumns();
for (auto & column : header)
/// But skip columns which storage doesn't have.
if (inner_table_columns.hasPhysical(column.name))
......@@ -323,7 +324,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
Context local_context = *select_context;
local_context.addViewSource(
StorageValues::create(
storage->getStorageID(), storage->getColumns(), block, storage->getVirtuals()));
storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
......
......@@ -325,7 +325,7 @@ void RemoteQueryExecutor::sendExternalTables()
Pipes pipes;
pipes = cur->read(
cur->getColumns().getNamesOfPhysical(),
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
......
......@@ -139,7 +139,8 @@ static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
for (const auto & column_type_and_name : storage->getColumns().getOrdinary())
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
......
......@@ -114,7 +114,8 @@ void FunctionHasColumnInTable::executeImpl(Block & block, const ColumnNumbers &
if (host_name.empty())
{
const StoragePtr & table = DatabaseCatalog::instance().getTable({database_name, table_name}, global_context);
has_column = table->getColumns().hasPhysical(column_name);
auto table_metadata = table->getInMemoryMetadataPtr();
has_column = table_metadata->getColumns().hasPhysical(column_name);
}
else
{
......
......@@ -91,7 +91,8 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
auto table_lock = table->lockStructureForShare(
false, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
columns = table->getColumns();
auto metadata_snapshot = table->getInMemoryMetadataPtr();
columns = metadata_snapshot->getColumns();
}
Block sample_block = getSampleBlock();
......
......@@ -244,7 +244,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns().getDefaults(), context);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
......@@ -295,7 +295,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!allow_materialized)
{
for (const auto & column : table->getColumns())
for (const auto & column : metadata_snapshot->getColumns())
if (column.default_desc.kind == ColumnDefaultKind::Materialized && header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
......
......@@ -129,7 +129,7 @@ String InterpreterSelectQuery::generateFilterActions(
table_expr->children.push_back(table_expr->database_and_table_name);
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = SyntaxAnalyzer(*context).analyzeSelect(query_ast, SyntaxAnalyzerResult({}, storage));
auto syntax_result = SyntaxAnalyzer(*context).analyzeSelect(query_ast, SyntaxAnalyzerResult({}, storage, metadata_snapshot));
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context, metadata_snapshot);
actions = analyzer.simpleSelectActions();
......@@ -263,7 +263,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
if (has_input || !joined_tables.resolveTables())
joined_tables.makeFakeTable(storage, source_header);
joined_tables.makeFakeTable(storage, metadata_snapshot, source_header);
/// Rewrite JOINs
if (!has_input && joined_tables.tablesCount() > 1)
......@@ -311,8 +311,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
view->replaceWithSubquery(getSelectQuery(), view_table, metadata_snapshot);
syntax_analyzer_result = SyntaxAnalyzer(*context).analyzeSelect(
query_ptr, SyntaxAnalyzerResult(source_header.getNamesAndTypesList(), storage),
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
query_ptr,
SyntaxAnalyzerResult(source_header.getNamesAndTypesList(), storage, metadata_snapshot),
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
if (view)
{
......@@ -1087,7 +1088,7 @@ void InterpreterSelectQuery::executeFetchColumns(
/// Detect, if ALIAS columns are required for query execution
auto alias_columns_required = false;
const ColumnsDescription & storage_columns = storage->getColumns();
const ColumnsDescription & storage_columns = metadata_snapshot->getColumns();
for (const auto & column_name : required_columns)
{
auto column_default = storage_columns.getDefault(column_name);
......@@ -1210,7 +1211,7 @@ void InterpreterSelectQuery::executeFetchColumns(
prewhere_info->prewhere_actions = std::move(new_actions);
auto analyzed_result
= SyntaxAnalyzer(*context).analyze(required_columns_from_prewhere_expr, storage->getColumns().getAllPhysical());
= SyntaxAnalyzer(*context).analyze(required_columns_from_prewhere_expr, metadata_snapshot->getColumns().getAllPhysical());
prewhere_info->alias_actions
= ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, *context).getActions(true, false);
......
......@@ -47,7 +47,7 @@ BlockIO InterpreterWatchQuery::execute()
ErrorCodes::UNKNOWN_TABLE);
/// List of columns to read to execute the query.
Names required_columns = storage->getColumns().getNamesOfPhysical();
Names required_columns = storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
context.checkAccess(AccessType::SELECT, table_id, required_columns);
/// Get context settings for this query
......
......@@ -207,11 +207,11 @@ bool JoinedTables::resolveTables()
return !tables_with_columns.empty();
}
void JoinedTables::makeFakeTable(StoragePtr storage, const Block & source_header)
void JoinedTables::makeFakeTable(StoragePtr storage, const StorageMetadataPtr & metadata_snapshot, const Block & source_header)
{
if (storage)
{
const ColumnsDescription & storage_columns = storage->getColumns();
const ColumnsDescription & storage_columns = metadata_snapshot->getColumns();
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, storage_columns.getOrdinary());
auto & table = tables_with_columns.back();
......
......@@ -13,6 +13,8 @@ namespace DB
class ASTSelectQuery;
class TableJoin;
struct SelectQueryOptions;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
/// Joined tables' columns resolver.
/// We want to get each table structure at most once per table occurance. Or even better once per table.
......@@ -31,7 +33,7 @@ public:
bool resolveTables();
/// Make fake tables_with_columns[0] in case we have predefined input in InterpreterSelectQuery
void makeFakeTable(StoragePtr storage, const Block & source_header);
void makeFakeTable(StoragePtr storage, const StorageMetadataPtr & metadata_snapshot, const Block & source_header);
std::shared_ptr<TableJoin> makeTableJoin(const ASTSelectQuery & select_query);
const TablesWithColumns & tablesWithColumns() const { return tables_with_columns; }
......
......@@ -419,7 +419,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
else
{
NameSet new_updated_columns;
auto column_ttls = storage->getColumns().getColumnTTLs();
auto column_ttls = metadata_snapshot->getColumns().getColumnTTLs();
for (const auto & elem : column_ttls)
{
dependencies.emplace(elem.first, ColumnDependency::TTL_TARGET);
......@@ -528,7 +528,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
{
NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAllPhysical();
/// Next, for each stage calculate columns changed by this and previous stages.
......
......@@ -681,7 +681,7 @@ void SyntaxAnalyzerResult::collectSourceColumns(bool add_special)
{
if (storage)
{
const ColumnsDescription & columns = storage->getColumns();
const ColumnsDescription & columns = metadata_snapshot->getColumns();
auto columns_from_storage = add_special ? columns.getAll() : columns.getAllPhysical();
if (source_columns.empty())
......@@ -962,14 +962,19 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
return std::make_shared<const SyntaxAnalyzerResult>(result);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregations) const
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns,
ConstStoragePtr storage,
const StorageMetadataPtr & metadata_snapshot,
bool allow_aggregations) const
{
if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
const auto & settings = context.getSettingsRef();
SyntaxAnalyzerResult result(source_columns, storage, false);
SyntaxAnalyzerResult result(source_columns, storage, metadata_snapshot, false);
normalize(query, result.aliases, settings);
......
......@@ -16,10 +16,13 @@ class Context;
struct Settings;
struct SelectQueryOptions;
using Scalars = std::map<String, Block>;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
struct SyntaxAnalyzerResult
{
ConstStoragePtr storage;
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<TableJoin> analyzed_join;
NamesAndTypesList source_columns;
......@@ -51,8 +54,13 @@ struct SyntaxAnalyzerResult
/// Results of scalar sub queries
Scalars scalars;
SyntaxAnalyzerResult(const NamesAndTypesList & source_columns_, ConstStoragePtr storage_ = {}, bool add_special = true)
SyntaxAnalyzerResult(
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage_ = {},
const StorageMetadataPtr & metadata_snapshot_ = {},
bool add_special = true)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, source_columns(source_columns_)
{
collectSourceColumns(add_special);
......@@ -86,7 +94,12 @@ public:
{}
/// Analyze and rewrite not select query
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const;
SyntaxAnalyzerResultPtr analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage = {},
const StorageMetadataPtr & metadata_snapshot = {},
bool allow_aggregations = false) const;
/// Analyze and rewrite select query
SyntaxAnalyzerResultPtr analyzeSelect(
......
......@@ -87,7 +87,8 @@ static NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression
const auto table_function = table_expression.table_function;
auto * query_context = const_cast<Context *>(&context.getQueryContext());
const auto & function_storage = query_context->executeTableFunction(table_function);
const auto & columns = function_storage->getColumns();
auto function_metadata_snapshot = function_storage->getInMemoryMetadataPtr();
const auto & columns = function_metadata_snapshot->getColumns();
names_and_type_list = columns.getOrdinary();
materialized = columns.getMaterialized();
aliases = columns.getAliases();
......@@ -97,7 +98,8 @@ static NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression
{
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
const auto & table = DatabaseCatalog::instance().getTable(table_id, context);
const auto & columns = table->getColumns();
auto table_metadata_snapshot = table->getInMemoryMetadataPtr();
const auto & columns = table_metadata_snapshot->getColumns();
names_and_type_list = columns.getOrdinary();
materialized = columns.getMaterialized();
aliases = columns.getAliases();
......
......@@ -90,14 +90,14 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
{
auto * query_context = const_cast<Context *>(&context.getQueryContext());
const auto & storage = query_context->executeTableFunction(table_expression);
columns = storage->getColumns().getOrdinary();
columns = storage->getInMemoryMetadataPtr()->getColumns().getOrdinary();
select_query->addTableFunction(*const_cast<ASTPtr *>(&table_expression)); // XXX: const_cast should be avoided!
}
else
{
auto table_id = context.resolveStorageID(table_expression);
const auto & storage = DatabaseCatalog::instance().getTable(table_id, context);
columns = storage->getColumns().getOrdinary();
columns = storage->getInMemoryMetadataPtr()->getColumns().getOrdinary();
select_query->replaceDatabaseAndTable(table_id);
}
......
......@@ -254,7 +254,8 @@ void MySQLHandler::comFieldList(ReadBuffer & payload)
packet.readPayload(payload);
String database = connection_context.getCurrentDatabase();
StoragePtr table_ptr = DatabaseCatalog::instance().getTable({database, packet.table}, connection_context);
for (const NameAndTypePair & column: table_ptr->getColumns().getAll())
auto metadata_snapshot = table_ptr->getInMemoryMetadataPtr();
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAll())
{
ColumnDefinition column_definition(
database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0
......
......@@ -213,17 +213,18 @@ void TCPHandler::runImpl()
if (&context != &query_context.value())
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage.
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
sendTableColumns(input_storage->getColumns());
sendTableColumns(metadata_snapshot->getColumns());
}
/// Send block to the client - input storage structure.
state.input_header = input_storage->getInMemoryMetadataPtr()->getSampleBlock();
state.input_header = metadata_snapshot->getSampleBlock();
sendData(state.input_header);
});
......@@ -474,7 +475,10 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!table_id.empty())
sendTableColumns(DatabaseCatalog::instance().getTable(table_id, *query_context)->getColumns());
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, *query_context);
sendTableColumns(storage_ptr->getInMemoryMetadataPtr()->getColumns());
}
}
}
......
......@@ -32,12 +32,6 @@ namespace ErrorCodes
extern const int DEADLOCK_AVOIDED;
}
const ColumnsDescription & IStorage::getColumns() const
{
return metadata->columns;
}
bool IStorage::isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const
{
/// Virtual column maybe overriden by real column
......
......@@ -137,7 +137,6 @@ public:
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const; /// returns combined set of columns
StorageInMemoryMetadata getInMemoryMetadata() const { return *metadata; }
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata; }
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata = std::make_shared<StorageInMemoryMetadata>(metadata_); }
......
......@@ -142,8 +142,10 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
auto creator = [&](const StorageID & blocks_id_global)
{
return StorageBlocks::createStorage(blocks_id_global, getParentStorage()->getColumns(),
std::move(pipes), QueryProcessingStage::WithMergeableState);
auto parent_table_metadata = getParentStorage()->getInMemoryMetadataPtr();
return StorageBlocks::createStorage(
blocks_id_global, parent_table_metadata->getColumns(),
std::move(pipes), QueryProcessingStage::WithMergeableState);
};
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(global_context, creator));
......@@ -209,8 +211,10 @@ void StorageLiveView::writeIntoLiveView(
auto creator = [&](const StorageID & blocks_id_global)
{
return StorageBlocks::createStorage(blocks_id_global, live_view.getParentStorage()->getColumns(),
std::move(pipes), QueryProcessingStage::FetchColumns);
auto parent_metadata = live_view.getParentStorage()->getInMemoryMetadataPtr();
return StorageBlocks::createStorage(
blocks_id_global, parent_metadata->getColumns(),
std::move(pipes), QueryProcessingStage::FetchColumns);
};
TemporaryTableHolder blocks_storage(context, creator);
......
......@@ -352,9 +352,9 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size;
}
String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize(const StorageMetadataPtr & metadata_snapshot) const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const auto & storage_columns = metadata_snapshot->getColumns().getAllPhysical();
auto alter_conversions = storage.getAlterConversionsForPart(shared_from_this());
std::optional<std::string> minimum_size_column;
......@@ -613,6 +613,7 @@ void IMergeTreeDataPart::loadTTLInfos()
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullRelativePath() + "columns.txt";
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (!volume->getDisk()->exists(path))
{
/// We can get list of columns only from columns.txt in compact parts.
......@@ -620,7 +621,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : storage.getColumns().getAllPhysical())
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAllPhysical())
if (volume->getDisk()->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
columns.push_back(column);
......
......@@ -77,6 +77,7 @@ public:
virtual MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
......@@ -143,7 +144,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
String getColumnNameWithMinumumCompressedSize(const StorageMetadataPtr & metadata_snapshot) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
......
......@@ -22,13 +22,23 @@ namespace ErrorCodes
}
IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_, const MergeTreeReaderSettings & settings_,
IMergeTreeReader::IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_)
: data_part(data_part_), avg_value_size_hints(avg_value_size_hints_)
, columns(columns_), uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_)
, settings(settings_), storage(data_part_->storage)
: data_part(data_part_)
, avg_value_size_hints(avg_value_size_hints_)
, columns(columns_)
, uncompressed_cache(uncompressed_cache_)
, mark_cache(mark_cache_)
, settings(settings_)
, storage(data_part_->storage)
, metadata_snapshot(metadata_snapshot_)
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part))
{
......@@ -112,7 +122,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
if (res_columns[i] == nullptr)
{
if (storage.getColumns().hasDefault(name))
if (metadata_snapshot->getColumns().hasDefault(name))
{
should_evaluate_missing_defaults = true;
continue;
......@@ -170,7 +180,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context);
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns().getDefaults(), storage.global_context);
/// Move columns from block.
name_and_type = columns.begin();
......
......@@ -18,8 +18,10 @@ public:
using ValueSizeMap = std::map<std::string, double>;
using DeserializeBinaryBulkStateMap = std::map<std::string, IDataType::DeserializeBinaryBulkStatePtr>;
IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_,
IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_,
......@@ -75,6 +77,7 @@ protected:
MergeTreeReaderSettings settings;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
MarkRanges all_mark_ranges;
friend class MergeTreeRangeReader::DelayedStream;
......
......@@ -13,14 +13,14 @@ namespace ErrorCodes
}
NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns)
NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & part, Names & columns)
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
auto all_column_files_missing = true;
const auto & storage_columns = storage.getColumns();
const auto & storage_columns = metadata_snapshot->getColumns();
auto alter_conversions = storage.getAlterConversionsForPart(part);
for (size_t i = 0; i < columns.size(); ++i)
{
......@@ -66,7 +66,7 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData
*/
if (all_column_files_missing)
{
const auto minimum_size_column_name = part->getColumnNameWithMinumumCompressedSize();
const auto minimum_size_column_name = part->getColumnNameWithMinumumCompressedSize(metadata_snapshot);
columns.push_back(minimum_size_column_name);
/// correctly report added column
injected_columns.insert(columns.back());
......@@ -214,14 +214,19 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
}
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns)
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,
bool check_columns)
{
Names column_names = required_columns;
Names pre_column_names;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
bool should_reorder = !injectRequiredColumns(storage, metadata_snapshot, data_part, column_names).empty();
if (prewhere_info)
{
......@@ -233,7 +238,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
const auto injected_pre_columns = injectRequiredColumns(storage, metadata_snapshot, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
......@@ -251,7 +256,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const
if (check_columns)
{
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllPhysical();
result.pre_columns = physical_columns.addTypes(pre_column_names);
result.columns = physical_columns.addTypes(column_names);
}
......
......@@ -22,7 +22,7 @@ using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredict
* so that you can calculate the DEFAULT expression for these columns.
* Adds them to the `columns`.
*/
NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns);
NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & part, Names & columns);
/// A batch of work for MergeTreeThreadSelectBlockInputStream
......@@ -73,8 +73,13 @@ struct MergeTreeReadTaskColumns
bool should_reorder;
};
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns);
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,
bool check_columns);
struct MergeTreeBlockSizePredictor
{
......
......@@ -604,8 +604,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList storage_columns = data.getColumns().getAllPhysical();
Names all_column_names = metadata_snapshot->getColumns().getNamesOfPhysical();
NamesAndTypesList storage_columns = metadata_snapshot->getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
NamesAndTypesList gathering_columns;
......@@ -1041,7 +1041,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
NamesAndTypesList storage_columns = data.getColumns().getAllPhysical();
NamesAndTypesList storage_columns = metadata_snapshot->getColumns().getAllPhysical();
if (!for_interpreter.empty())
{
......
......@@ -38,6 +38,7 @@ MergeTreeDataPartCompact::MergeTreeDataPartCompact(
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
......@@ -47,7 +48,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartCompact>(shared_from_this());
return std::make_unique<MergeTreeReaderCompact>(
ptr, columns_to_read, uncompressed_cache,
ptr, columns_to_read, metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}
......
......@@ -37,6 +37,7 @@ public:
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
......
......@@ -37,6 +37,7 @@ MergeTreeDataPartWide::MergeTreeDataPartWide(
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
......@@ -46,7 +47,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartWide>(shared_from_this());
return std::make_unique<MergeTreeReaderWide>(
ptr, columns_to_read, uncompressed_cache,
ptr, columns_to_read, metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}
......
......@@ -30,6 +30,7 @@ public:
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
......
......@@ -650,7 +650,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
auto order_key_prefix_ast = metadata_snapshot->getSortingKey().expression_list_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, metadata_snapshot->getColumns().getAllPhysical());
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
res = spreadMarkRangesAmongStreamsWithOrder(
......@@ -1274,29 +1274,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
return pipes;
}
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
{
auto function = std::make_shared<ASTFunction>();
auto arguments = std::make_shared<ASTExpressionList>();
auto sign = std::make_shared<ASTIdentifier>(data.merging_params.sign_column);
auto one = std::make_shared<ASTLiteral>(1);
function->name = "equals";
function->arguments = arguments;
function->children.push_back(arguments);
arguments->children.push_back(sign);
arguments->children.push_back(one);
ASTPtr query = function;
auto syntax_result = SyntaxAnalyzer(context).analyze(query, data.getColumns().getAllPhysical());
out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
out_column = function->getColumnName();
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
......
......@@ -95,12 +95,6 @@ private:
const KeyCondition & key_condition,
const Settings & settings) const;
/// Create the expression "Sign == 1".
void createPositiveSignCondition(
ExpressionActionsPtr & out_expression,
String & out_column,
const Context & context) const;
MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
......
......@@ -211,7 +211,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, part.data_part, column_names, prewhere_info, check_columns);
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();
......
......@@ -17,6 +17,7 @@ namespace ErrorCodes
MergeTreeReaderCompact::MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
......@@ -24,15 +25,23 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
ValueSizeMap avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(std::move(data_part_), std::move(columns_),
uncompressed_cache_, mark_cache_, std::move(mark_ranges_),
std::move(settings_), std::move(avg_value_size_hints_))
: IMergeTreeReader(
std::move(data_part_),
std::move(columns_),
metadata_snapshot_,
uncompressed_cache_,
mark_cache_,
std::move(mark_ranges_),
std::move(settings_),
std::move(avg_value_size_hints_))
, marks_loader(
data_part->volume->getDisk(),
mark_cache,
data_part->index_granularity_info.getMarksFilePath(data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(), data_part->index_granularity_info,
settings.save_marks_in_cache, data_part->getColumns().size())
data_part->volume->getDisk(),
mark_cache,
data_part->index_granularity_info.getMarksFilePath(data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(),
data_part->index_granularity_info,
settings.save_marks_in_cache,
data_part->getColumns().size())
{
size_t buffer_size = settings.max_read_buffer_size;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
......
......@@ -17,6 +17,7 @@ public:
MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
......
......@@ -28,6 +28,7 @@ namespace ErrorCodes
MergeTreeReaderWide::MergeTreeReaderWide(
DataPartWidePtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
......@@ -36,8 +37,14 @@ MergeTreeReaderWide::MergeTreeReaderWide(
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(
std::move(data_part_), std::move(columns_), uncompressed_cache_, std::move(mark_cache_),
std::move(mark_ranges_), std::move(settings_), std::move(avg_value_size_hints_))
std::move(data_part_),
std::move(columns_),
metadata_snapshot_,
uncompressed_cache_,
std::move(mark_cache_),
std::move(mark_ranges_),
std::move(settings_),
std::move(avg_value_size_hints_))
{
try
{
......
......@@ -17,6 +17,7 @@ public:
MergeTreeReaderWide(
DataPartWidePtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
......
......@@ -76,7 +76,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
......@@ -87,11 +87,12 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
owned_mark_cache = storage.global_context.getMarkCache();
reader = data_part->getReader(task_columns.columns, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
reader = data_part->getReader(task_columns.columns, metadata_snapshot,
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, all_mark_ranges,
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
......
......@@ -68,7 +68,9 @@ try
}
is_first_task = false;
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part,
required_columns, prewhere_info, check_columns);
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
......@@ -90,11 +92,11 @@ try
owned_mark_cache = storage.global_context.getMarkCache();
reader = data_part->getReader(task_columns.columns, all_mark_ranges,
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, all_mark_ranges,
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
......
......@@ -39,7 +39,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addTotalRowsApprox(data_part->rows_count);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, columns_to_read);
injectRequiredColumns(storage, metadata_snapshot, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{
......@@ -60,7 +60,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
.save_marks_in_cache = false
};
reader = data_part->getReader(columns_for_reader,
reader = data_part->getReader(columns_for_reader, metadata_snapshot,
MarkRanges{MarkRange(0, data_part->getMarksCount())},
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
}
......
......@@ -74,12 +74,12 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
owned_mark_cache = storage.global_context.getMarkCache();
reader = task->data_part->getReader(task->columns, rest_mark_ranges,
reader = task->data_part->getReader(task->columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, rest_mark_ranges,
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
}
......@@ -90,12 +90,12 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
{
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
/// retain avg_value_size_hints
reader = task->data_part->getReader(task->columns, rest_mark_ranges,
reader = task->data_part->getReader(task->columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, rest_mark_ranges,
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
}
......
......@@ -168,9 +168,9 @@ Pipes StorageBuffer::read(
auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr();
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination](const String& column_name)
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination_metadata_snapshot](const String& column_name)
{
const auto & dest_columns = destination->getColumns();
const auto & dest_columns = destination_metadata_snapshot->getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
return dest_columns.hasPhysical(column_name) &&
dest_columns.get(column_name).type->equals(*our_columns.get(column_name).type);
......@@ -192,8 +192,8 @@ Pipes StorageBuffer::read(
const Block header = metadata_snapshot->getSampleBlock();
Names columns_intersection = column_names;
Block header_after_adding_defaults = header;
const auto & dest_columns = destination->getColumns();
const auto & our_columns = getColumns();
const auto & dest_columns = destination_metadata_snapshot->getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
for (const String & column_name : column_names)
{
if (!dest_columns.hasPhysical(column_name))
......@@ -224,7 +224,7 @@ Pipes StorageBuffer::read(
for (auto & pipe : pipes_from_dst)
{
pipe.addSimpleTransform(std::make_shared<AddingMissedTransform>(
pipe.getHeader(), header_after_adding_defaults, getColumns().getDefaults(), context));
pipe.getHeader(), header_after_adding_defaults, metadata_snapshot->getColumns().getDefaults(), context));
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name));
......
......@@ -290,7 +290,7 @@ StorageDistributed::StorageDistributed(
if (sharding_key_)
{
sharding_key_expr = buildShardingKeyExpression(sharding_key_, *global_context, getColumns().getAllPhysical(), false);
sharding_key_expr = buildShardingKeyExpression(sharding_key_, *global_context, metadata_.getColumns().getAllPhysical(), false);
sharding_key_column_name = sharding_key_->getColumnName();
}
......@@ -447,6 +447,7 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
auto metadata_snapshot = getInMemoryMetadataPtr();
if (canForceGroupByNoMerge(context, to_stage, query_ptr))
return QueryProcessingStage::Complete;
......@@ -454,7 +455,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(context, query_ptr);
ClusterPtr optimized_cluster = getOptimizedCluster(context, metadata_snapshot, query_ptr);
if (optimized_cluster)
cluster = optimized_cluster;
}
......@@ -476,7 +477,7 @@ Pipes StorageDistributed::read(
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(context, query_info.query);
ClusterPtr optimized_cluster = getOptimizedCluster(context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}", makeFormattedListOfShards(optimized_cluster));
......@@ -683,14 +684,14 @@ ClusterPtr StorageDistributed::getCluster() const
return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name);
}
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const ASTPtr & query_ptr) const
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const
{
ClusterPtr cluster = getCluster();
const Settings & settings = context.getSettingsRef();
if (has_sharding_key)
{
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, context);
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, metadata_snapshot, context);
if (optimized)
return optimized;
}
......@@ -751,7 +752,11 @@ IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, c
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const
ClusterPtr StorageDistributed::skipUnusedShards(
ClusterPtr cluster,
const ASTPtr & query_ptr,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) const
{
const auto & select = query_ptr->as<ASTSelectQuery &>();
......@@ -770,7 +775,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const ASTPtr
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
}
replaceConstantExpressions(condition_ast, context, getColumns().getAll(), shared_from_this());
replaceConstantExpressions(condition_ast, context, metadata_snapshot->getColumns().getAll(), shared_from_this());
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr);
// Can't get definite answer if we can skip any shards
......
......@@ -120,8 +120,8 @@ public:
/// Apply the following settings:
/// - optimize_skip_unused_shards
/// - force_optimize_skip_unused_shards
ClusterPtr getOptimizedCluster(const Context &, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const;
ClusterPtr getOptimizedCluster(const Context &, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, const Context & context) const;
ActionLock getActionLock(StorageActionBlockType type) override;
......
......@@ -417,7 +417,7 @@ Pipes StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, getColumns().getDefaults()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
return pipes;
}
......
......@@ -441,7 +441,7 @@ Pipes StorageGenerateRandom::read(
Pipes pipes;
pipes.reserve(num_streams);
const ColumnsDescription & our_columns = getColumns();
const ColumnsDescription & our_columns = metadata_snapshot->getColumns();
Block block_header;
for (const auto & name : column_names)
{
......
......@@ -358,7 +358,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (written_streams.count(stream_name))
return;
const auto & columns = storage.getColumns();
const auto & columns = metadata_snapshot->getColumns();
streams.try_emplace(
stream_name,
storage.disk,
......@@ -445,7 +445,7 @@ StorageLog::StorageLog(
/// create directories if they do not exist
disk->createDirectories(table_path);
for (const auto & column : getColumns().getAllPhysical())
for (const auto & column : metadata_.getColumns().getAllPhysical())
addFiles(column.name, *column.type);
marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME;
......@@ -539,13 +539,14 @@ void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLo
{
std::shared_lock<std::shared_mutex> lock(rwlock);
auto metadata_snapshot = getInMemoryMetadataPtr();
files.clear();
file_count = 0;
loaded_marks = false;
disk->clearDirectory(table_path);
for (const auto & column : getColumns().getAllPhysical())
for (const auto & column : metadata_snapshot->getColumns().getAllPhysical())
addFiles(column.name, *column.type);
file_checker = FileChecker{disk, table_path + "sizes.json"};
......@@ -553,11 +554,11 @@ void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLo
}
const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMetadataPtr & metadata_snapshot) const
{
/// There should be at least one physical column
const String column_name = getColumns().getAllPhysical().begin()->name;
const auto column_type = getColumns().getAllPhysical().begin()->type;
const String column_name = metadata_snapshot->getColumns().getAllPhysical().begin()->name;
const auto column_type = metadata_snapshot->getColumns().getAllPhysical().begin()->type;
String filename;
/** We take marks from first column.
......@@ -590,13 +591,13 @@ Pipes StorageLog::read(
metadata_snapshot->check(column_names, getVirtuals());
loadMarks();
NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));
NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names));
std::shared_lock<std::shared_mutex> lock(rwlock);
Pipes pipes;
const Marks & marks = getMarksWithRealRowCount();
const Marks & marks = getMarksWithRealRowCount(metadata_snapshot);
size_t marks_size = marks.size();
if (num_streams > marks_size)
......
......@@ -26,7 +26,7 @@ public:
Pipes read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
......@@ -112,7 +112,7 @@ private:
*
* Return the first group of marks that contain the number of rows, but not the internals of the arrays.
*/
const Marks & getMarksWithRealRowCount() const;
const Marks & getMarksWithRealRowCount(const StorageMetadataPtr & metadata_snapshot) const;
};
}
......@@ -250,9 +250,11 @@ Pipes StorageMerge::createSources(
if (!storage)
{
auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().pipeline.getPipe();
auto pipe = InterpreterSelectQuery(
modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().pipeline.getPipe();
pipe.addInterpreterContext(modified_context);
pipes.emplace_back(std::move(pipe));
return pipes;
......@@ -263,7 +265,7 @@ Pipes StorageMerge::createSources(
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
real_column_names.push_back(ExpressionActions::getSmallestColumn(metadata_snapshot->getColumns().getAllPhysical()));
pipes = storage->read(real_column_names, metadata_snapshot, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
......
......@@ -500,7 +500,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata_str,
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/columns", getColumns().toString(),
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "",
zkutil::CreateMode::Persistent));
......@@ -535,7 +535,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", metadata_str,
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
zkutil::CreateMode::Persistent));
......@@ -596,7 +596,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this, metadata_snapshot).toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
zkutil::CreateMode::Persistent));
......@@ -748,7 +748,7 @@ void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr
Coordination::Stat columns_stat;
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_prefix + "/columns", &columns_stat));
const ColumnsDescription & old_columns = getColumns();
const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
if (columns_from_zk != old_columns)
{
throw Exception("Table columns structure in ZooKeeper is different from local table structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
......
......@@ -237,8 +237,9 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum
}
IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name,
WrittenStreams & written_streams)
IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
const String & name,
WrittenStreams & written_streams)
{
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
......@@ -247,12 +248,13 @@ IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const
if (!written_streams.insert(stream_name).second)
return nullptr;
const auto & columns = storage.getColumns();
const auto & columns = metadata_snapshot->getColumns();
if (!streams.count(stream_name))
streams[stream_name] = std::make_unique<Stream>(storage.disk,
storage.files[stream_name].data_file_path,
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
streams[stream_name] = std::make_unique<Stream>(
storage.disk,
storage.files[stream_name].data_file_path,
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
return &streams[stream_name]->compressed;
};
......@@ -351,7 +353,7 @@ StorageTinyLog::StorageTinyLog(
disk->createDirectories(table_path);
}
for (const auto & col : getColumns().getAllPhysical())
for (const auto & col : metadata_.getColumns().getAllPhysical())
addFiles(col.name, *col.type);
}
......@@ -430,13 +432,14 @@ CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context
void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
auto metadata_snapshot = getInMemoryMetadataPtr();
disk->clearDirectory(table_path);
files.clear();
file_checker = FileChecker{disk, table_path + "sizes.json"};
for (const auto &column : getColumns().getAllPhysical())
for (const auto & column : metadata_snapshot->getColumns().getAllPhysical())
addFiles(column.name, *column.type);
}
......
......@@ -136,7 +136,9 @@ std::string IStorageURLBase::getReadMethod() const
return Poco::Net::HTTPRequest::HTTP_GET;
}
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(const Names & /*column_names*/,
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
......@@ -145,7 +147,9 @@ std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIPara
return {};
}
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/,
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
......@@ -165,7 +169,7 @@ Pipes IStorageURLBase::read(
unsigned /*num_streams*/)
{
auto request_uri = uri;
auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
......@@ -173,7 +177,9 @@ Pipes IStorageURLBase::read(
pipes.emplace_back(std::make_shared<StorageURLSource>(
request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
context, processed_stage, max_block_size),
format_name,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
......
......@@ -50,6 +50,7 @@ private:
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
......@@ -57,6 +58,7 @@ private:
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
......@@ -68,12 +70,13 @@ private:
class StorageURLBlockOutputStream : public IBlockOutputStream
{
public:
StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method);
StorageURLBlockOutputStream(
const Poco::URI & uri,
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method);
Block getHeader() const override
{
......
......@@ -50,7 +50,9 @@ std::string StorageXDBC::getReadMethod() const
return Poco::Net::HTTPRequest::HTTP_POST;
}
std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(const Names & column_names,
std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
......@@ -59,20 +61,22 @@ std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(c
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = getColumns().getPhysical(name);
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
return bridge_helper->getURLParams(cols.toString(), max_block_size);
}
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(const Names & /*column_names*/,
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
String query = transformQueryForExternalDatabase(query_info,
getColumns().getOrdinary(),
metadata_snapshot->getColumns().getOrdinary(),
bridge_helper->getIdentifierQuotingStyle(),
remote_database_name,
remote_table_name,
......
......@@ -29,7 +29,8 @@ public:
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
const Context & context_,
BridgeHelperPtr bridge_helper_);
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......@@ -45,6 +46,7 @@ private:
std::vector<std::pair<std::string, std::string>> getReadURIParams(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
......@@ -52,6 +54,7 @@ private:
std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
......
......@@ -71,7 +71,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, co
};
std::unordered_map<String, ColumnInfo> columns_info;
for (const auto & column : info.storage->getColumns())
for (const auto & column : info.storage->getInMemoryMetadataPtr()->getColumns())
{
ColumnInfo column_info;
if (column.default_desc.expression)
......
......@@ -220,7 +220,7 @@ TTLDescription TTLDescription::getTTLFromAST(
if (value->as<ASTFunction>())
{
auto syntax_result = SyntaxAnalyzer(context).analyze(value, columns.getAllPhysical(), {}, true);
auto syntax_result = SyntaxAnalyzer(context).analyze(value, columns.getAllPhysical(), {}, {}, true);
auto expr_actions = ExpressionAnalyzer(value, syntax_result, context).getActions(false);
for (const auto & column : expr_actions->getRequiredColumns())
{
......@@ -249,7 +249,7 @@ TTLDescription TTLDescription::getTTLFromAST(
for (auto [name, value] : aggregations)
{
auto syntax_result = SyntaxAnalyzer(context).analyze(value, columns.getAllPhysical(), {}, true);
auto syntax_result = SyntaxAnalyzer(context).analyze(value, columns.getAllPhysical(), {}, {}, true);
auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, context);
TTLAggregateDescription set_part;
......
......@@ -75,7 +75,8 @@ ColumnsDescription getStructureOfRemoteTableInShard(
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
return table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName())->getColumns();
auto storage_ptr = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
return storage_ptr->getInMemoryMetadataPtr()->getColumns();
}
auto table_func_name = queryToString(table_func_ptr);
......@@ -84,7 +85,10 @@ ColumnsDescription getStructureOfRemoteTableInShard(
else
{
if (shard_info.isLocal())
return DatabaseCatalog::instance().getTable(table_id, context)->getColumns();
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, context);
return storage_ptr->getInMemoryMetadataPtr()->getColumns();
}
/// Request for a table description
query = "DESC TABLE " + table_id.getFullTableName();
......
......@@ -78,7 +78,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::Context & cont
Block block;
{
const auto & storage_columns = table->getColumns();
const auto & storage_columns = metadata_snapshot->getColumns();
ColumnWithTypeAndName column;
column.name = "a";
column.type = storage_columns.getPhysical("a").type;
......
......@@ -42,7 +42,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
+ table_name_regexp_, ErrorCodes::UNKNOWN_TABLE);
return any_table->getColumns().getAllPhysical();
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册