提交 b7e53208 编写于 作者: N Nikolai Kochetov

Fix tests.

上级 92c937db
......@@ -887,38 +887,10 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
* in the subquery_for_set object, this subquery is set as source and the temporary table _data1 as the table.
* - this function shows the expression IN_data1.
*/
if (!subquery_for_set.source && data.no_storage_or_local)
if (subquery_for_set.source.empty() && data.no_storage_or_local)
{
auto interpreter = interpretSubquery(right_in_operand, data.context, data.subquery_depth, {});
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().getInputStream(); });
/** Why is LazyBlockInputStream used?
*
* The fact is that when processing a query of the form
* SELECT ... FROM remote_test WHERE column GLOBAL IN (subquery),
* if the distributed remote_test table contains localhost as one of the servers,
* the query will be interpreted locally again (and not sent over TCP, as in the case of a remote server).
*
* The query execution pipeline will be:
* CreatingSets
* subquery execution, filling the temporary table with _data1 (1)
* CreatingSets
* reading from the table _data1, creating the set (2)
* read from the table subordinate to remote_test.
*
* (The second part of the pipeline under CreateSets is a reinterpretation of the query inside StorageDistributed,
* the query differs in that the database name and tables are replaced with subordinates, and the subquery is replaced with _data1.)
*
* But when creating the pipeline, when creating the source (2), it will be found that the _data1 table is empty
* (because the query has not started yet), and empty source will be returned as the source.
* And then, when the query is executed, an empty set will be created in step (2).
*
* Therefore, we make the initialization of step (2) lazy
* - so that it does not occur until step (1) is completed, on which the table will be populated.
*
* Note: this solution is not very good, you need to think better.
*/
subquery_for_set.source = QueryPipeline::getPipe(interpreter->execute().pipeline);
}
subquery_for_set.set = set;
......
......@@ -64,13 +64,20 @@ TemporaryTableHolder::TemporaryTableHolder(
const Context & context_,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
const ASTPtr & query)
const ASTPtr & query,
bool create_for_global_subquery)
: TemporaryTableHolder
(
context_,
[&](const StorageID & table_id)
{
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints});
auto storage = StorageMemory::create(
table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints});
if (create_for_global_subquery)
storage->delayReadForGlobalSubqueries();
return storage;
},
query
)
......
......@@ -78,7 +78,8 @@ struct TemporaryTableHolder : boost::noncopyable
const Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
const ASTPtr & query = {});
const ASTPtr & query = {},
bool create_for_global_subquery = false);
TemporaryTableHolder(TemporaryTableHolder && rhs);
TemporaryTableHolder & operator = (TemporaryTableHolder && rhs);
......
......@@ -583,7 +583,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(context, analyzedJoin());
Names original_right_columns;
if (!subquery_for_join.source)
if (subquery_for_join.source.empty())
{
NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns(
joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
......
......@@ -103,7 +103,9 @@ public:
Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns}, ConstraintsDescription{});
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(
context, ColumnsDescription{columns}, ConstraintsDescription{}, nullptr,
/*create_for_global_subquery*/ true);
StoragePtr external_storage = external_storage_holder->getTable();
/** We replace the subquery with the name of the temporary table.
......@@ -134,7 +136,7 @@ public:
ast = database_and_table_name;
external_tables[external_table_name] = external_storage_holder;
subqueries_for_sets[external_table_name].source = interpreter->execute().getInputStream();
subqueries_for_sets[external_table_name].source = QueryPipeline::getPipe(interpreter->execute().pipeline);
subqueries_for_sets[external_table_name].table = external_storage;
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,
......
......@@ -1833,7 +1833,7 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_p
auto creating_sets = std::make_unique<CreatingSetsStep>(
query_plan.getCurrentDataStream(),
subqueries_for_sets,
std::move(subqueries_for_sets),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
*context);
......
......@@ -14,6 +14,14 @@ IAccumulatingTransform::IAccumulatingTransform(Block input_header, Block output_
{
}
InputPort * IAccumulatingTransform::addTotalsPort()
{
if (inputs.size() > 1)
throw Exception("Totals port was already added to IAccumulatingTransform", ErrorCodes::LOGICAL_ERROR);
return &inputs.emplace_back(getInputPort().getHeader(), this);
}
IAccumulatingTransform::Status IAccumulatingTransform::prepare()
{
/// Check can output.
......@@ -42,6 +50,21 @@ IAccumulatingTransform::Status IAccumulatingTransform::prepare()
/// Generate output block.
if (input.isFinished())
{
/// Read from totals port if has it.
if (inputs.size() > 1)
{
auto & totals_input = inputs.back();
if (!totals_input.isFinished())
{
totals_input.setNeeded();
if (!totals_input.hasData())
return Status::NeedData;
totals = totals_input.pull();
totals_input.close();
}
}
finished_input = true;
return Status::Ready;
}
......
......@@ -18,6 +18,7 @@ protected:
Chunk current_input_chunk;
Chunk current_output_chunk;
Chunk totals;
bool has_input = false;
bool finished_input = false;
bool finished_generate = false;
......@@ -34,6 +35,7 @@ public:
Status prepare() override;
void work() override;
InputPort * addTotalsPort();
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
......
......@@ -240,16 +240,13 @@ void QueryPipeline::addCreatingSetsTransform(SubqueriesForSets subqueries_for_se
source.collected_processors = nullptr;
resize(1);
pipe = Pipe::unitePipes({std::move(pipe), std::move(source)}, collected_processors);
/// Order is important for concat. Connect manually.
pipe.transform([&](OutputPortRawPtrs ports) -> Processors
{
auto concat = std::make_shared<ConcatProcessor>(getHeader(), 2);
connect(*ports.front(), concat->getInputs().front());
connect(*ports.back(), concat->getInputs().back());
return { std::move(concat) };
});
Pipes pipes;
pipes.emplace_back(std::move(source));
pipes.emplace_back(std::move(pipe));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe.addTransform(std::make_shared<ConcatProcessor>(getHeader(), 2));
}
void QueryPipeline::setOutputFormat(ProcessorPtr output)
......@@ -324,9 +321,6 @@ void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
source->setProgressCallback(callback);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProgressCallback(callback);
}
}
......@@ -338,9 +332,6 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
source->setProcessListElement(elem);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProcessListElement(elem);
}
}
......
......@@ -33,35 +33,6 @@ CreatingSetsTransform::CreatingSetsTransform(
{
}
InputPort * CreatingSetsTransform::addTotalsPort()
{
if (inputs.size() > 1)
throw Exception("Totals port was already added to CreatingSetsTransform", ErrorCodes::LOGICAL_ERROR);
return &inputs.emplace_back(getInputPort().getHeader(), this);
}
IProcessor::Status CreatingSetsTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == IProcessor::Status::Finished && inputs.size() > 1)
{
auto & totals_input = inputs.back();
if (totals_input.isFinished())
return IProcessor::Status::Finished;
totals_input.setNeeded();
if (!totals_input.hasData())
return IProcessor::Status::NeedData;
auto totals = totals_input.pull();
subquery.setTotals(getInputPort().getHeader().cloneWithColumns(totals.detachColumns()));
totals_input.close();
}
return status;
}
void CreatingSetsTransform::work()
{
if (!is_initialized)
......@@ -110,6 +81,12 @@ void CreatingSetsTransform::finishSubquery()
{
LOG_DEBUG(log, "Subquery has empty result.");
}
if (totals)
subquery.setTotals(getInputPort().getHeader().cloneWithColumns(totals.detachColumns()));
else
/// Set empty totals anyway, it is needed for MergeJoin.
subquery.setTotals({});
}
void CreatingSetsTransform::init()
......@@ -166,7 +143,6 @@ Chunk CreatingSetsTransform::generate()
table_out->writeSuffix();
finishSubquery();
finished = true;
return {};
}
......
......@@ -12,10 +12,10 @@ class QueryStatus;
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
/// This processor creates sets during execution.
/// This processor creates set during execution.
/// Don't return any data. Sets are created when Finish status is returned.
/// In general, several work() methods need to be called to finish.
/// TODO: several independent processors can be created for each subquery. Make subquery a piece of pipeline.
/// Independent processors is created for each subquery.
class CreatingSetsTransform : public IAccumulatingTransform
{
public:
......@@ -28,16 +28,10 @@ public:
String getName() const override { return "CreatingSetsTransform"; }
Status prepare() override;
void work() override;
void consume(Chunk chunk) override;
Chunk generate() override;
InputPort * addTotalsPort();
protected:
bool finished = false;
private:
SubqueryForSet subquery;
......
......@@ -38,11 +38,31 @@ public:
{
}
/// If called, will initialize the number of blocks at first read.
/// It allows to read data which was inserted into memory table AFTER Storage::read was called.
/// This hack is needed for global subqueries.
void delayInitialization(BlocksList * data_, std::mutex * mutex_)
{
data = data_;
mutex = mutex_;
}
String getName() const override { return "Memory"; }
protected:
Chunk generate() override
{
if (data)
{
std::lock_guard guard(*mutex);
current_it = data->begin();
num_blocks = data->size();
is_finished = num_blocks == 0;
data = nullptr;
mutex = nullptr;
}
if (is_finished)
{
return {};
......@@ -71,8 +91,11 @@ private:
Names column_names;
BlocksList::iterator current_it;
size_t current_block_idx = 0;
const size_t num_blocks;
size_t num_blocks;
bool is_finished = false;
BlocksList * data = nullptr;
std::mutex * mutex = nullptr;
};
......@@ -123,6 +146,21 @@ Pipe StorageMemory::read(
std::lock_guard lock(mutex);
if (delay_read_for_global_subqueries)
{
/// Note: for global subquery we use single source.
/// Mainly, the reason is that at this point table is empty,
/// and we don't know the number of blocks are going to be inserted into it.
///
/// It may seem to be not optimal, but actually data from such table is used to fill
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
auto source = std::make_shared<MemorySource>(column_names, data.begin(), data.size(), *this, metadata_snapshot);
source->delayInitialization(&data, &mutex);
return Pipe(std::move(source));
}
size_t size = data.size();
if (num_streams > size)
......
......@@ -48,12 +48,51 @@ public:
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
/** Delays initialization of StorageMemory::read() until the first read is actually happen.
* Usually, fore code like this:
*
* auto out = StorageMemory::write();
* auto in = StorageMemory::read();
* out->write(new_data);
*
* `new_data` won't appear into `in`.
* However, if delayReadForGlobalSubqueries is called, first read from `in` will check for new_data and return it.
*
*
* Why is delayReadForGlobalSubqueries needed?
*
* The fact is that when processing a query of the form
* SELECT ... FROM remote_test WHERE column GLOBAL IN (subquery),
* if the distributed remote_test table contains localhost as one of the servers,
* the query will be interpreted locally again (and not sent over TCP, as in the case of a remote server).
*
* The query execution pipeline will be:
* CreatingSets
* subquery execution, filling the temporary table with _data1 (1)
* CreatingSets
* reading from the table _data1, creating the set (2)
* read from the table subordinate to remote_test.
*
* (The second part of the pipeline under CreateSets is a reinterpretation of the query inside StorageDistributed,
* the query differs in that the database name and tables are replaced with subordinates, and the subquery is replaced with _data1.)
*
* But when creating the pipeline, when creating the source (2), it will be found that the _data1 table is empty
* (because the query has not started yet), and empty source will be returned as the source.
* And then, when the query is executed, an empty set will be created in step (2).
*
* Therefore, we make the initialization of step (2) delayed
* - so that it does not occur until step (1) is completed, on which the table will be populated.
*/
void delayReadForGlobalSubqueries() { delay_read_for_global_subqueries = true; }
private:
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList data;
mutable std::mutex mutex;
bool delay_read_for_global_subqueries = false;
protected:
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册