diff --git a/dbms/src/Interpreters/InJoinSubqueriesPreprocessor.cpp b/dbms/src/Interpreters/InJoinSubqueriesPreprocessor.cpp index 2e08e2edb11efe41ea91c9ade663ac7e730adb69..a9ea245b7cfd6af261332962892cb9cabdef3c1c 100644 --- a/dbms/src/Interpreters/InJoinSubqueriesPreprocessor.cpp +++ b/dbms/src/Interpreters/InJoinSubqueriesPreprocessor.cpp @@ -209,9 +209,6 @@ void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) const { - if (!table.isRemote()) - return false; - const StorageDistributed * distributed = typeid_cast(&table); if (!distributed) return false; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index f44db61d0bf3325457d7527526dc51553a02da86..ffa68aa16fef6a26e3f01833c0c8b7f43984ea94 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -838,24 +838,13 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() if (max_streams > 1 && !is_remote) max_streams *= settings.max_streams_to_max_threads_ratio; - ASTPtr actual_query_ptr; - if (storage->isRemote()) - { - /// In case of a remote query, we send only SELECT, which will be executed. - actual_query_ptr = query.cloneFirstSelect(); - } - else - actual_query_ptr = query_ptr; - /// PREWHERE optimization { auto optimize_prewhere = [&](auto & merge_tree) { - const ASTSelectQuery & actual_select = typeid_cast(*actual_query_ptr); - /// Try transferring some condition from WHERE to PREWHERE if enabled and viable - if (settings.optimize_move_to_prewhere && actual_select.where_expression && !actual_select.prewhere_expression && !actual_select.final()) - MergeTreeWhereOptimizer{actual_query_ptr, context, merge_tree.getData(), required_columns, log}; + if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final()) + MergeTreeWhereOptimizer{query_ptr, context, merge_tree.getData(), required_columns, log}; }; if (const StorageMergeTree * merge_tree = typeid_cast(storage.get())) @@ -864,8 +853,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() optimize_prewhere(*merge_tree); } - streams = storage->read(required_columns, actual_query_ptr, - context, from_stage, max_block_size, max_streams); + streams = storage->read(required_columns, query_ptr, context, from_stage, max_block_size, max_streams); if (alias_actions) { @@ -1316,11 +1304,6 @@ void InterpreterSelectQuery::executeLimit() void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets) { - /// If the query is not distributed, then remove the creation of temporary tables from subqueries (intended for sending to remote servers). - if (!(storage && storage->isRemote())) - for (auto & elem : subqueries_for_sets) - elem.second.table.reset(); - const Settings & settings = context.getSettingsRef(); executeUnion(); diff --git a/dbms/src/Parsers/ASTSelectQuery.cpp b/dbms/src/Parsers/ASTSelectQuery.cpp index 85699a7ef42b95ce2a86137ba7a713b47e1905e7..1c1ceb958e2e7b125c0d224467599e0efa90b279 100644 --- a/dbms/src/Parsers/ASTSelectQuery.cpp +++ b/dbms/src/Parsers/ASTSelectQuery.cpp @@ -173,7 +173,7 @@ ASTPtr ASTSelectQuery::clone() const return ptr; } -ASTPtr ASTSelectQuery::cloneFirstSelect() const +std::shared_ptr ASTSelectQuery::cloneFirstSelect() const { auto res = cloneImpl(false); res->prev_union_all = nullptr; diff --git a/dbms/src/Parsers/ASTSelectQuery.h b/dbms/src/Parsers/ASTSelectQuery.h index 35a4effda88133cbacda11dac7033955329667c5..7f56e49e60ec4442c43584bb18b48931114c03e7 100644 --- a/dbms/src/Parsers/ASTSelectQuery.h +++ b/dbms/src/Parsers/ASTSelectQuery.h @@ -39,7 +39,7 @@ public: ASTPtr clone() const override; /// Get a deep copy of the first SELECT query tree. - ASTPtr cloneFirstSelect() const; + std::shared_ptr cloneFirstSelect() const; private: std::shared_ptr cloneImpl(bool traverse_union_all) const; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index cae28ed6de4719b385d4584700e73eb9c4170808..181ec2ecedbf4383b0a280d9b46f997dc7c215a3 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -60,8 +60,8 @@ namespace /// Creates a copy of query, changes database and table names. ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table) { - auto modified_query_ast = query->clone(); - typeid_cast(*modified_query_ast).replaceDatabaseAndTable(database, table); + auto modified_query_ast = typeid_cast(*query).cloneFirstSelect(); + modified_query_ast->replaceDatabaseAndTable(database, table); return modified_query_ast; } diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 785285b2ded61c483bd3dfcd3781dfb13ddf2eee..1f74d75f59df11c17f47231e00e8f99154ba34d2 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -50,6 +50,19 @@ StorageMerge::StorageMerge( { } +bool StorageMerge::isRemote() const +{ + auto database = context.getDatabase(source_database); + auto iterator = database->getIterator(); + + while (iterator->isValid()) + { + if (table_name_regexp.match(iterator->name()) && iterator->table()->isRemote()) + return true; + } + return false; +} + NameAndTypePair StorageMerge::getColumn(const String & column_name) const { auto type = VirtualColumnFactory::tryGetType(column_name); diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index 2245e196aea93a31e67f6c15bebea2f5a34a5724..05827090ed1c3d910075fd0901d2675d125ee759 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -20,6 +20,8 @@ public: std::string getName() const override { return "Merge"; } std::string getTableName() const override { return name; } + bool isRemote() const override; + /// The check is delayed to the read method. It checks the support of the tables used. bool supportsSampling() const override { return true; } bool supportsPrewhere() const override { return true; } diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index 5c23a6a3d561c146b83575e7821e80da5510f40a..a0c516c62217e407151ea52eb20d5b97c3e74be4 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -113,7 +113,6 @@ class ClickHouseCluster: for instance in self.instances.values(): instance.docker_client = None - instance.docker_id = None instance.ip_address = None instance.client = None diff --git a/dbms/tests/integration/test_merge_table_over_distributed/__init__.py b/dbms/tests/integration/test_merge_table_over_distributed/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dbms/tests/integration/test_merge_table_over_distributed/configs/remote_servers.xml b/dbms/tests/integration/test_merge_table_over_distributed/configs/remote_servers.xml new file mode 100644 index 0000000000000000000000000000000000000000..ebce4697529df10edc0fd206bed97aa8be9fa771 --- /dev/null +++ b/dbms/tests/integration/test_merge_table_over_distributed/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_merge_table_over_distributed/test.py b/dbms/tests/integration/test_merge_table_over_distributed/test.py new file mode 100644 index 0000000000000000000000000000000000000000..3e17cc17f664d0b0a99a454aa6d7381f066047a8 --- /dev/null +++ b/dbms/tests/integration/test_merge_table_over_distributed/test.py @@ -0,0 +1,48 @@ +from contextlib import contextmanager + +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml']) +node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml']) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + for node in (node1, node2): + node.query(''' +CREATE TABLE local_table(id UInt32, val String) ENGINE = TinyLog; +''') + + node1.query("INSERT INTO local_table VALUES (1, 'node1')") + node2.query("INSERT INTO local_table VALUES (2, 'node2')") + + node1.query(''' +CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table); +CREATE TABLE merge_table(id UInt32, val String) ENGINE = Merge(default, '^distributed_table') +''') + + yield cluster + + finally: + cluster.shutdown() + + +def test_global_in(started_cluster): + assert node1.query("SELECT val FROM distributed_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \ + == 'node2' + + assert node1.query("SELECT val FROM merge_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \ + == 'node2' + + +if __name__ == '__main__': + with contextmanager(started_cluster)() as cluster: + for name, instance in cluster.instances.items(): + print name, instance.ip_address + raw_input("Cluster created, press any key to destroy...")