提交 17b86dec 编写于 作者: A alesapin

Recursive defaults

上级 c0d1416b
......@@ -1555,9 +1555,8 @@ private:
BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size);
const auto & column_defaults = columns_description.getDefaults();
if (!column_defaults.empty())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
if (columns_description.hasDefaults())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns_description, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
......
......@@ -7,7 +7,7 @@ namespace DB
void AddingDefaultBlockOutputStream::write(const Block & block)
{
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), column_defaults, context));
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context));
}
void AddingDefaultBlockOutputStream::flush()
......
......@@ -2,7 +2,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnConst.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
......@@ -23,10 +23,10 @@ public:
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & output_block_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_), output_block(output_block_),
column_defaults(column_defaults_), context(context_)
columns(columns_), context(context_)
{
}
......@@ -43,7 +43,7 @@ private:
const Block header;
/// Blocks after this stream should have this structure
const Block output_block;
const ColumnDefaults column_defaults;
const ColumnsDescription columns;
const Context & context;
};
......
......@@ -127,11 +127,13 @@ static MutableColumnPtr mixColumns(const ColumnWithTypeAndName & col_read,
}
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
const Context & context_)
: column_defaults(column_defaults_),
context(context_)
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnsDescription & columns_,
const Context & context_)
: columns(columns_)
, column_defaults(columns.getDefaults())
, context(context_)
{
children.push_back(input);
header = input->getHeader();
......@@ -169,7 +171,7 @@ Block AddingDefaultsBlockInputStream::readImpl()
if (!evaluate_block.columns())
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
......
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
......@@ -15,7 +15,7 @@ class AddingDefaultsBlockInputStream : public IBlockInputStream
public:
AddingDefaultsBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingDefaults"; }
......@@ -26,6 +26,7 @@ protected:
private:
Block header;
const ColumnsDescription columns;
const ColumnDefaults column_defaults;
const Context & context;
};
......
......@@ -64,9 +64,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
{
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto column_defaults = metadata_snapshot->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
const auto & columns = metadata_snapshot->getColumns();
if (columns.hasDefaults())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, columns, context);
}
}
......
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
......
......@@ -348,7 +348,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns().getDefaults(), context);
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns(), context);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
......
......@@ -6,16 +6,17 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Block.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
Block addMissingDefaults(const Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context)
Block addMissingDefaults(
const Block & block,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context)
{
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.
......@@ -49,7 +50,7 @@ Block addMissingDefaults(const Block & block,
continue;
}
if (column_defaults.count(column.name))
if (columns.hasDefault(column.name))
continue;
String offsets_name = Nested::extractTableName(column.name);
......@@ -72,8 +73,8 @@ Block addMissingDefaults(const Block & block,
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
}
/// Computes explicitly specified values (in column_defaults) by default and materialized columns.
evaluateMissingDefaults(res, required_columns, column_defaults, context);
/// Computes explicitly specified values by default and materialized columns.
evaluateMissingDefaults(res, required_columns, columns, context);
return res;
}
......
......@@ -10,7 +10,7 @@ namespace DB
class Block;
class Context;
class NamesAndTypesList;
struct ColumnDefault;
class ColumnsDescription;
/** Adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
......@@ -21,7 +21,7 @@ struct ColumnDefault;
Block addMissingDefaults(
const Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const ColumnsDescription & columns,
const Context & context);
}
......@@ -2,7 +2,6 @@
#include <Core/Block.h>
#include <Parsers/queryToString.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
......@@ -14,6 +13,8 @@
#include <utility>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Common/checkStackSize.h>
#include <Storages/ColumnsDescription.h>
namespace DB
......@@ -22,46 +23,46 @@ namespace DB
namespace
{
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
checkStackSize();
if (block.has(required_column) || added_columns.count(required_column))
return;
for (const auto & column : required_columns)
auto column_default = columns.getDefault(required_column);
if (column_default)
{
if (block.has(column.name))
continue;
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
auto column_default_expr = column_default->expression->clone();
const auto it = column_defaults.find(column.name);
/// Our default may depend on columns with ALIAS as default expr which not present in block
/// we can easily add them from column_defaults struct
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
NameSet required_columns_names = columns_context.requiredColumns();
if (it != column_defaults.end())
{
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
auto column_default_expr = it->second.expression->clone();
/// Our default may depend on columns with ALIAS as default expr which not present in block
/// we can easily add them from column_defaults struct
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
NameSet required_columns_names = columns_context.requiredColumns();
for (const auto & required_column_name : required_columns_names)
{
/// If we have such default column and it's alias than we should
/// add it into default_expression_list
if (auto rit = column_defaults.find(required_column_name);
rit != column_defaults.end() && rit->second.kind == ColumnDefaultKind::Alias)
{
default_expr_list->children.emplace_back(setAlias(rit->second.expression->clone(), required_column_name));
}
}
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(column.type->getName()));
default_expr_list->children.emplace_back(setAlias(cast_func, it->first));
}
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(columns.get(required_column).type->getName()));
default_expr_list_accum->children.emplace_back(setAlias(cast_func, required_column));
added_columns.emplace(required_column);
for (const auto & required_column_name : required_columns_names)
addDefaultRequiredExpressionsRecursively(block, required_column_name, columns, default_expr_list_accum, added_columns);
}
}
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NameSet added_columns;
for (const auto & column : required_columns)
addDefaultRequiredExpressionsRecursively(block, column.name, columns, default_expr_list, added_columns);
if (default_expr_list->children.empty())
return nullptr;
return default_expr_list;
}
......@@ -161,13 +162,13 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns)
{
if (column_defaults.empty())
if (!columns.hasDefaults())
return;
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, column_defaults);
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns);
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
}
......
......@@ -10,13 +10,13 @@ namespace DB
class Block;
class Context;
class NamesAndTypesList;
struct ColumnDefault;
class ColumnsDescription;
/// Adds missing defaults to block according to required_columns
/// using column_defaults map
/// using columns description
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns = true);
/// Tries to convert columns in block to required_columns
......
......@@ -8,10 +8,10 @@ namespace DB
AddingMissedTransform::AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_)
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
, column_defaults(column_defaults_), context(context_)
, columns(columns_), context(context_)
{
}
......@@ -20,7 +20,7 @@ void AddingMissedTransform::transform(Chunk & chunk)
auto num_rows = chunk.getNumRows();
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), column_defaults, context);
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context);
chunk.setColumns(res.getColumns(), num_rows);
}
......
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
......@@ -20,7 +20,7 @@ public:
AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
......@@ -28,7 +28,7 @@ public:
private:
void transform(Chunk &) override;
const ColumnDefaults column_defaults;
const ColumnsDescription columns;
const Context & context;
};
......
......@@ -378,6 +378,14 @@ bool ColumnsDescription::hasPhysical(const String & column_name) const
}
bool ColumnsDescription::hasDefaults() const
{
for (const auto & column : columns)
if (column.default_desc.expression)
return true;
return false;
}
ColumnDefaults ColumnsDescription::getDefaults() const
{
ColumnDefaults ret;
......
......@@ -109,6 +109,7 @@ public:
ColumnDefaults getDefaults() const; /// TODO: remove
bool hasDefault(const String & column_name) const;
bool hasDefaults() const;
std::optional<ColumnDefault> getDefault(const String & column_name) const;
/// Does column has non default specified compression codec
......
......@@ -180,7 +180,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns().getDefaults(), storage.global_context);
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
/// Move columns from block.
name_and_type = columns.begin();
......
......@@ -224,7 +224,7 @@ Pipe StorageBuffer::read(
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingMissedTransform>(stream_header, header_after_adding_defaults,
metadata_snapshot->getColumns().getDefaults(), context);
metadata_snapshot->getColumns(), context);
});
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
......
......@@ -234,12 +234,12 @@ public:
const Context & context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnDefaults column_defaults_)
ColumnsDescription columns_description_)
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
, column_defaults(std::move(column_defaults_))
, columns_description(std::move(columns_description_))
, context(context_)
, max_block_size(max_block_size_)
{
......@@ -314,8 +314,8 @@ public:
reader = FormatFactory::instance().getInput(
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
if (columns_description.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
reader->readPrefix();
}
......@@ -366,7 +366,7 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
ColumnDefaults column_defaults;
ColumnsDescription columns_description;
const Context & context; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size;
......@@ -417,7 +417,7 @@ Pipe StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns()));
return Pipe::unitePipes(std::move(pipes));
}
......
......@@ -71,7 +71,7 @@ namespace
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
......@@ -86,8 +86,8 @@ namespace
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
if (columns.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
}
String getName() const override
......@@ -312,7 +312,7 @@ Pipe StorageS3::read(
getName(),
metadata_snapshot->getSampleBlock(),
context,
metadata_snapshot->getColumns().getDefaults(),
metadata_snapshot->getColumns(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
......
......@@ -61,7 +61,7 @@ namespace
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
......@@ -81,7 +81,7 @@ namespace
compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
}
String getName() const override
......@@ -201,7 +201,7 @@ Pipe IStorageURLBase::read(
getName(),
getHeaderBlock(column_names, metadata_snapshot),
context,
metadata_snapshot->getColumns().getDefaults(),
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method)));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册