提交 1f7a798c 编写于 作者: N Nikolai Kochetov

Add flag to set that set is created.

上级 ddcbc2f9
......@@ -146,6 +146,9 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
}
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();
......
......@@ -1188,8 +1188,9 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
/// Check has column in (empty set).
String set_to_check;
for (auto & action : actions)
for (auto it = actions.rbegin(); it != actions.rend(); ++it)
{
auto & action = *it;
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
......@@ -1198,6 +1199,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
&& action.argument_names.size() > 1)
{
set_to_check = action.argument_names[1];
break;
}
}
}
......@@ -1210,7 +1212,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
{
if (auto * column_set = typeid_cast<const ColumnSet *>(action.added_column.get()))
{
if (column_set->getData()->getTotalRowCount() == 0)
if (column_set->getData()->isCreated() && column_set->getData()->empty())
return true;
}
}
......
......@@ -249,6 +249,8 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
if (!set->insertFromBlock(block))
return;
}
set->finishInsert();
res.in->readSuffix();
prepared_sets[set_key] = std::move(set);
......
......@@ -291,6 +291,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
Block block = header.cloneWithColumns(std::move(columns));
insertFromBlock(block);
finishInsert();
}
......
......@@ -56,6 +56,10 @@ public:
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Block & block);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; }
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
......@@ -107,6 +111,9 @@ private:
/// Do we need to additionally store all elements of the set in explicit form for subsequent use for index.
bool fill_set_elements;
/// Check if set contains all the data.
bool is_created = false;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(
const ColumnRawPtrs & key_columns,
......
......@@ -141,6 +141,9 @@ void CreatingSetsTransform::work()
auto finishCurrentSubquery = [&]()
{
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();
......
......@@ -64,7 +64,8 @@ FilterTransform::FilterTransform(
IProcessor::Status FilterTransform::prepare()
{
if (constant_filter_description.always_false)
if (constant_filter_description.always_false
|| expression->checkColumnIsAlwaysFalse(filter_column_name))
{
input.close();
output.finish();
......@@ -83,18 +84,6 @@ void FilterTransform::removeFilterIfNeed(Chunk & chunk)
void FilterTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
/// Cannot check this in prepare. Because in prepare columns for set may be not created yet.
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
{
stopReading();
chunk = Chunk(getOutputPort().getHeader().getColumns(), 0);
return;
}
}
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
......
......@@ -36,8 +36,6 @@ private:
/// Header after expression, but before removing filter column.
Block transformed_header;
bool initialized = false;
void removeFilterIfNeed(Chunk & chunk);
};
......
......@@ -70,6 +70,7 @@ void SetOrJoinBlockOutputStream::write(const Block & block)
void SetOrJoinBlockOutputStream::writeSuffix()
{
table.finishInsert();
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
......@@ -123,6 +124,7 @@ StorageSet::StorageSet(
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
......@@ -180,8 +182,11 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
insertBlock(block);
finishInsert();
backup_stream.readSuffix();
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
......
......@@ -50,6 +50,8 @@ private:
/// Insert the block into the state.
virtual void insertBlock(const Block & block) = 0;
/// Call after all blocks were inserted.
virtual void finishInsert() = 0;
virtual size_t getSize() const = 0;
};
......@@ -75,6 +77,7 @@ private:
SetPtr set;
void insertBlock(const Block & block) override;
void finishInsert() override;
size_t getSize() const override;
protected:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册