提交 933c0551 编写于 作者: C chertus

CLICKHOUSE-3578 review proress

上级 b4b58b29
......@@ -51,17 +51,15 @@ Block AddingDefaultsBlockInputStream::readImpl()
if (column_defaults.empty())
return res;
const BlockMissingValues & delayed_defaults = children.back()->getMissingValues();
if (delayed_defaults.empty())
const BlockMissingValues & block_missing_values = children.back()->getMissingValues();
if (block_missing_values.empty())
return res;
Block evaluate_block{res};
/// remove columns for recalculation
for (const auto & column : column_defaults)
{
/// column_defaults contain aliases that could be ommited in evaluate_block
if (evaluate_block.has(column.first))
evaluate_block.erase(column.first);
}
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
......@@ -76,7 +74,7 @@ Block AddingDefaultsBlockInputStream::readImpl()
size_t block_column_position = res.getPositionByName(column_name);
ColumnWithTypeAndName & column_read = res.getByPosition(block_column_position);
const auto & defaults_mask = delayed_defaults.getDefaultsBitmask(block_column_position);
const auto & defaults_mask = block_missing_values.getDefaultsBitmask(block_column_position);
checkCalculated(column_read, column_def, defaults_mask.size());
......
......@@ -14,7 +14,7 @@ BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & hea
}
bool BinaryRowInputStream::read(MutableColumns & columns)
bool BinaryRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -17,7 +17,7 @@ class BinaryRowInputStream : public IRowInputStream
public:
BinaryRowInputStream(ReadBuffer & istr_, const Block & header_);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
ReadBuffer & istr;
......
......@@ -53,7 +53,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
{
size_t num_columns = sample.columns();
MutableColumns columns = sample.cloneEmptyColumns();
delayed_defaults.clear();
block_missing_values.clear();
try
{
......@@ -62,8 +62,8 @@ Block BlockInputStreamFromRowInputStream::readImpl()
try
{
++total_rows;
RowReadExtention info;
if (!row_input->extendedRead(columns, info))
RowReadExtension info;
if (!row_input->read(columns, info))
break;
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
......@@ -73,7 +73,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
size_t column_size = columns[column_idx]->size();
if (column_size == 0)
throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
delayed_defaults.setBit(column_idx, column_size - 1);
block_missing_values.setBit(column_idx, column_size - 1);
}
}
}
......
......@@ -33,7 +33,7 @@ public:
Block getHeader() const override { return sample; }
const BlockMissingValues & getMissingValues() const override { return delayed_defaults; }
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
protected:
Block readImpl() override;
......@@ -42,7 +42,7 @@ private:
RowInputStreamPtr row_input;
Block sample;
size_t max_block_size;
BlockMissingValues delayed_defaults;
BlockMissingValues block_missing_values;
UInt64 allow_errors_num;
Float64 allow_errors_ratio;
......
......@@ -111,7 +111,7 @@ void CSVRowInputStream::readPrefix()
}
bool CSVRowInputStream::read(MutableColumns & columns)
bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -21,7 +21,7 @@ public:
*/
CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -193,7 +193,7 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
}
bool CapnProtoRowInputStream::read(MutableColumns & columns)
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -34,7 +34,7 @@ public:
*/
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const String & schema_dir, const String & schema_file, const String & root_object);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
// Build a traversal plan from a sorted list of fields
......
......@@ -11,7 +11,7 @@ namespace DB
{
/// A way to set some extentions to read and return extra information too. IRowInputStream.extendedRead() output.
struct RowReadExtention
struct RowReadExtension
{
/// IRowInputStream.extendedRead() output value.
/// Contains one bit per column in resently read row. IRowInputStream could leave it empty, or partialy set.
......@@ -27,8 +27,7 @@ public:
/** Read next row and append it to the columns.
* If no more rows - return false.
*/
virtual bool read(MutableColumns & columns) = 0;
virtual bool extendedRead(MutableColumns & columns, RowReadExtention & ) { return read(columns); }
virtual bool read(MutableColumns & columns, RowReadExtension & extra) = 0;
virtual void readPrefix() {} /// delimiter before begin of result
virtual void readSuffix() {} /// delimiter after end of result
......
......@@ -209,13 +209,8 @@ void JSONEachRowRowInputStream::readNestedData(const String & name, MutableColum
nested_prefix_length = 0;
}
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
{
RowReadExtention tmp;
return extendedRead(columns, tmp);
}
bool JSONEachRowRowInputStream::extendedRead(MutableColumns & columns, RowReadExtention & ext)
bool JSONEachRowRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
{
skipWhitespaceIfAny(istr);
......
......@@ -22,8 +22,7 @@ class JSONEachRowRowInputStream : public IRowInputStream
public:
JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool extendedRead(MutableColumns & columns, RowReadExtention & ext) override;
bool read(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -88,7 +88,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
}
bool TSKVRowInputStream::read(MutableColumns & columns)
bool TSKVRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -25,7 +25,7 @@ class TSKVRowInputStream : public IRowInputStream
public:
TSKVRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -75,7 +75,7 @@ static void checkForCarriageReturn(ReadBuffer & istr)
}
bool TabSeparatedRowInputStream::read(MutableColumns & columns)
bool TabSeparatedRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -22,7 +22,7 @@ public:
TabSeparatedRowInputStream(
ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -37,7 +37,7 @@ ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Block & hea
}
bool ValuesRowInputStream::read(MutableColumns & columns)
bool ValuesRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
size_t num_columns = columns.size();
......
......@@ -23,7 +23,7 @@ public:
*/
ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
ReadBuffer & istr;
......
......@@ -37,7 +37,7 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context, bool with_block_copy)
const Context & context, bool save_unneded_columns)
{
if (column_defaults.empty())
return;
......@@ -46,7 +46,7 @@ void evaluateMissingDefaults(Block & block,
if (!default_expr_list)
return;
if (!with_block_copy)
if (!save_unneded_columns)
{
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block);
......@@ -56,7 +56,6 @@ void evaluateMissingDefaults(Block & block,
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
/// evaluate default values for defaulted columns
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(copy_block);
......
......@@ -12,9 +12,10 @@ class Context;
class NamesAndTypesList;
struct ColumnDefault;
///
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const Context & context, bool with_block_copy = true);
const Context & context, bool save_unneded_columns = true);
}
#include <iomanip>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage.h>
......
......@@ -2173,7 +2173,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ValuesRowInputStream input_stream(buf, partition_key_sample, context, format_settings);
MutableColumns columns = partition_key_sample.cloneEmptyColumns();
if (!input_stream.read(columns))
RowReadExtension unused;
if (!input_stream.read(columns, unused))
throw Exception(
"Could not parse partition value: `" + partition_ast.fields_str.toString() + "`",
ErrorCodes::INVALID_PARTITION_VALUE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册