提交 43a18b16 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #883 from yandex/fix-merge-global-in

Fix GLOBAL IN for Merge table over Distributed tables
...@@ -209,9 +209,6 @@ void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const ...@@ -209,9 +209,6 @@ void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const
bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) const bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) const
{ {
if (!table.isRemote())
return false;
const StorageDistributed * distributed = typeid_cast<const StorageDistributed *>(&table); const StorageDistributed * distributed = typeid_cast<const StorageDistributed *>(&table);
if (!distributed) if (!distributed)
return false; return false;
......
...@@ -838,24 +838,13 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() ...@@ -838,24 +838,13 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
if (max_streams > 1 && !is_remote) if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio; max_streams *= settings.max_streams_to_max_threads_ratio;
ASTPtr actual_query_ptr;
if (storage->isRemote())
{
/// In case of a remote query, we send only SELECT, which will be executed.
actual_query_ptr = query.cloneFirstSelect();
}
else
actual_query_ptr = query_ptr;
/// PREWHERE optimization /// PREWHERE optimization
{ {
auto optimize_prewhere = [&](auto & merge_tree) auto optimize_prewhere = [&](auto & merge_tree)
{ {
const ASTSelectQuery & actual_select = typeid_cast<const ASTSelectQuery &>(*actual_query_ptr);
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable /// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && actual_select.where_expression && !actual_select.prewhere_expression && !actual_select.final()) if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
MergeTreeWhereOptimizer{actual_query_ptr, context, merge_tree.getData(), required_columns, log}; MergeTreeWhereOptimizer{query_ptr, context, merge_tree.getData(), required_columns, log};
}; };
if (const StorageMergeTree * merge_tree = typeid_cast<const StorageMergeTree *>(storage.get())) if (const StorageMergeTree * merge_tree = typeid_cast<const StorageMergeTree *>(storage.get()))
...@@ -864,8 +853,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() ...@@ -864,8 +853,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
optimize_prewhere(*merge_tree); optimize_prewhere(*merge_tree);
} }
streams = storage->read(required_columns, actual_query_ptr, streams = storage->read(required_columns, query_ptr, context, from_stage, max_block_size, max_streams);
context, from_stage, max_block_size, max_streams);
if (alias_actions) if (alias_actions)
{ {
...@@ -1316,11 +1304,6 @@ void InterpreterSelectQuery::executeLimit() ...@@ -1316,11 +1304,6 @@ void InterpreterSelectQuery::executeLimit()
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets) void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets)
{ {
/// If the query is not distributed, then remove the creation of temporary tables from subqueries (intended for sending to remote servers).
if (!(storage && storage->isRemote()))
for (auto & elem : subqueries_for_sets)
elem.second.table.reset();
const Settings & settings = context.getSettingsRef(); const Settings & settings = context.getSettingsRef();
executeUnion(); executeUnion();
......
...@@ -173,7 +173,7 @@ ASTPtr ASTSelectQuery::clone() const ...@@ -173,7 +173,7 @@ ASTPtr ASTSelectQuery::clone() const
return ptr; return ptr;
} }
ASTPtr ASTSelectQuery::cloneFirstSelect() const std::shared_ptr<ASTSelectQuery> ASTSelectQuery::cloneFirstSelect() const
{ {
auto res = cloneImpl(false); auto res = cloneImpl(false);
res->prev_union_all = nullptr; res->prev_union_all = nullptr;
......
...@@ -39,7 +39,7 @@ public: ...@@ -39,7 +39,7 @@ public:
ASTPtr clone() const override; ASTPtr clone() const override;
/// Get a deep copy of the first SELECT query tree. /// Get a deep copy of the first SELECT query tree.
ASTPtr cloneFirstSelect() const; std::shared_ptr<ASTSelectQuery> cloneFirstSelect() const;
private: private:
std::shared_ptr<ASTSelectQuery> cloneImpl(bool traverse_union_all) const; std::shared_ptr<ASTSelectQuery> cloneImpl(bool traverse_union_all) const;
......
...@@ -60,8 +60,8 @@ namespace ...@@ -60,8 +60,8 @@ namespace
/// Creates a copy of query, changes database and table names. /// Creates a copy of query, changes database and table names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table) ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{ {
auto modified_query_ast = query->clone(); auto modified_query_ast = typeid_cast<const ASTSelectQuery &>(*query).cloneFirstSelect();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table); modified_query_ast->replaceDatabaseAndTable(database, table);
return modified_query_ast; return modified_query_ast;
} }
......
...@@ -50,6 +50,19 @@ StorageMerge::StorageMerge( ...@@ -50,6 +50,19 @@ StorageMerge::StorageMerge(
{ {
} }
bool StorageMerge::isRemote() const
{
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator();
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()) && iterator->table()->isRemote())
return true;
}
return false;
}
NameAndTypePair StorageMerge::getColumn(const String & column_name) const NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{ {
auto type = VirtualColumnFactory::tryGetType(column_name); auto type = VirtualColumnFactory::tryGetType(column_name);
......
...@@ -20,6 +20,8 @@ public: ...@@ -20,6 +20,8 @@ public:
std::string getName() const override { return "Merge"; } std::string getName() const override { return "Merge"; }
std::string getTableName() const override { return name; } std::string getTableName() const override { return name; }
bool isRemote() const override;
/// The check is delayed to the read method. It checks the support of the tables used. /// The check is delayed to the read method. It checks the support of the tables used.
bool supportsSampling() const override { return true; } bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override { return true; } bool supportsPrewhere() const override { return true; }
......
...@@ -113,7 +113,6 @@ class ClickHouseCluster: ...@@ -113,7 +113,6 @@ class ClickHouseCluster:
for instance in self.instances.values(): for instance in self.instances.values():
instance.docker_client = None instance.docker_client = None
instance.docker_id = None
instance.ip_address = None instance.ip_address = None
instance.client = None instance.client = None
......
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
from contextlib import contextmanager
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query('''
CREATE TABLE local_table(id UInt32, val String) ENGINE = TinyLog;
''')
node1.query("INSERT INTO local_table VALUES (1, 'node1')")
node2.query("INSERT INTO local_table VALUES (2, 'node2')")
node1.query('''
CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table);
CREATE TABLE merge_table(id UInt32, val String) ENGINE = Merge(default, '^distributed_table')
''')
yield cluster
finally:
cluster.shutdown()
def test_global_in(started_cluster):
assert node1.query("SELECT val FROM distributed_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \
== 'node2'
assert node1.query("SELECT val FROM merge_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \
== 'node2'
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册