提交 d52f6888 编写于 作者: A Alexey Milovidov

Attempt to fix regression: missing support for remote tables in IN section...

Attempt to fix regression: missing support for remote tables in IN section when querying Distributed tables [#CLICKHOUSE-2]
上级 59716044
......@@ -61,6 +61,8 @@
#include <DataTypes/DataTypeFunction.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Core/iostream_debug_helpers.h>
namespace DB
{
......@@ -2438,6 +2440,9 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
initChain(chain, source_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
DUMP(only_types);
DUMP(StackTrace().toString());
step.required_output.push_back(select_query->where_expression->getColumnName());
getRootActions(select_query->where_expression, only_types, false, step.actions);
......
......@@ -676,7 +676,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
if (pipeline.streams.empty())
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(
storage->analyze(required_columns, query_info, context, from_stage)));
pipeline.transform([&](auto & stream)
{
......
......@@ -180,6 +180,20 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Without executing a query determine,
* what structure of result and what query processed stage
* we will get if we will call 'read' method.
*/
virtual Block analyze(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & processed_stage)
{
processed_stage = QueryProcessingStage::FetchColumns;
return getSampleBlockForColumns(column_names);
}
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.
......
......@@ -197,6 +197,26 @@ BlockInputStreams StorageDistributed::read(
}
Block StorageDistributed::analyze(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage)
{
auto cluster = getCluster();
const Settings & settings = context.getSettingsRef();
size_t result_size = (cluster->getRemoteShardCount() * settings.max_parallel_replicas) + cluster->getLocalShardCount();
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
return materializeBlock(InterpreterSelectQuery(query_info.query, context, {}, processed_stage).getSampleBlock());
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
{
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
......
......@@ -60,6 +60,12 @@ public:
size_t max_block_size,
unsigned num_streams) override;
Block analyze(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册