diff --git a/dbms/src/Interpreters/ActionLocksManager.cpp b/dbms/src/Interpreters/ActionLocksManager.cpp index 6fa447419254c42a822489006202d4728dadf8ba..1f9329f85a9a177471df17b83f8692325132acf2 100644 --- a/dbms/src/Interpreters/ActionLocksManager.cpp +++ b/dbms/src/Interpreters/ActionLocksManager.cpp @@ -13,6 +13,7 @@ namespace ActionLocks extern const StorageActionBlockType PartsFetch = 2; extern const StorageActionBlockType PartsSend = 3; extern const StorageActionBlockType ReplicationQueue = 4; + extern const StorageActionBlockType DistributedSend = 5; } diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index 15d15fc4f14edbbe9f0027da582e15ec0b33edbb..3b00e20ee4c90197c6477dc953f16df97593a892 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include "InterpreterSystemQuery.h" namespace DB @@ -42,6 +44,7 @@ namespace ActionLocks extern StorageActionBlockType PartsFetch; extern StorageActionBlockType PartsSend; extern StorageActionBlockType ReplicationQueue; + extern StorageActionBlockType DistributedSend; } @@ -194,9 +197,18 @@ BlockIO InterpreterSystemQuery::execute() case Type::START_REPLICATION_QUEUES: startStopAction(context, query, ActionLocks::ReplicationQueue, true); break; + case Type::STOP_DISTRIBUTED_SENDS: + startStopAction(context, query, ActionLocks::DistributedSend, false); + break; + case Type::START_DISTRIBUTED_SENDS: + startStopAction(context, query, ActionLocks::DistributedSend, true); + break; case Type::SYNC_REPLICA: syncReplica(query); break; + case Type::FLUSH_DISTRIBUTED: + flushDistributed(query); + break; case Type::RESTART_REPLICAS: restartReplicas(system_context); break; @@ -303,11 +315,21 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) StoragePtr table = context.getTable(database_name, table_name); - auto table_replicated = dynamic_cast(table.get()); - if (!table_replicated) + if (auto storage_replicated = dynamic_cast(table.get())) + storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds()); + else throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS); +} - table_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds()); +void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) +{ + String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase(); + String & table_name = query.target_table; + + if (auto storage_distributed = dynamic_cast(context.getTable(database_name, table_name).get())) + storage_distributed->flushClusterNodesAllData(); + else + throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS); } diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.h b/dbms/src/Interpreters/InterpreterSystemQuery.h index 65a6b13884b11f326d3c743f514d68b18ddfa73e..31945745c1eddd7a7e5c6e6074aefab7f68393b2 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.h +++ b/dbms/src/Interpreters/InterpreterSystemQuery.h @@ -31,6 +31,7 @@ private: void restartReplicas(Context & system_context); void syncReplica(ASTSystemQuery & query); + void flushDistributed(ASTSystemQuery & query); }; diff --git a/dbms/src/Parsers/ASTSystemQuery.cpp b/dbms/src/Parsers/ASTSystemQuery.cpp index 1f49453df4838f14aed21fb2f37ae3459a277c86..699dd9d0f54dce4bdc3b89e8541307795ce7dfa2 100644 --- a/dbms/src/Parsers/ASTSystemQuery.cpp +++ b/dbms/src/Parsers/ASTSystemQuery.cpp @@ -41,6 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type) return "RESTART REPLICA"; case Type::SYNC_REPLICA: return "SYNC REPLICA"; + case Type::FLUSH_DISTRIBUTED: + return "FLUSH DISTRIBUTED"; case Type::RELOAD_DICTIONARY: return "RELOAD DICTIONARY"; case Type::RELOAD_DICTIONARIES: @@ -65,6 +67,10 @@ const char * ASTSystemQuery::typeToString(Type type) return "STOP REPLICATION QUEUES"; case Type::START_REPLICATION_QUEUES: return "START REPLICATION QUEUES"; + case Type::STOP_DISTRIBUTED_SENDS: + return "STOP DISTRIBUTED SENDS"; + case Type::START_DISTRIBUTED_SENDS: + return "START DISTRIBUTED SENDS"; case Type::FLUSH_LOGS: return "FLUSH LOGS"; default: @@ -99,12 +105,14 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, || type == Type::STOP_REPLICATED_SENDS || type == Type::START_REPLICATED_SENDS || type == Type::STOP_REPLICATION_QUEUES - || type == Type::START_REPLICATION_QUEUES) + || type == Type::START_REPLICATION_QUEUES + || type == Type::STOP_DISTRIBUTED_SENDS + || type == Type::START_DISTRIBUTED_SENDS) { if (!target_table.empty()) print_database_table(); } - else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA) + else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED) { print_database_table(); } diff --git a/dbms/src/Parsers/ASTSystemQuery.h b/dbms/src/Parsers/ASTSystemQuery.h index d32a5dd08dab3157f4ee0fc054d869ec07d406ef..0ff8228e2e0dafcb9996e5e5322ccb157e29ea0c 100644 --- a/dbms/src/Parsers/ASTSystemQuery.h +++ b/dbms/src/Parsers/ASTSystemQuery.h @@ -40,6 +40,9 @@ public: STOP_REPLICATION_QUEUES, START_REPLICATION_QUEUES, FLUSH_LOGS, + FLUSH_DISTRIBUTED, + STOP_DISTRIBUTED_SENDS, + START_DISTRIBUTED_SENDS, END }; diff --git a/dbms/src/Parsers/ParserSystemQuery.cpp b/dbms/src/Parsers/ParserSystemQuery.cpp index e3431c50be5ca62417705033886e72320ebc32aa..333613e951213a81e2a7a8c22203c8505803f99a 100644 --- a/dbms/src/Parsers/ParserSystemQuery.cpp +++ b/dbms/src/Parsers/ParserSystemQuery.cpp @@ -49,6 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & case Type::RESTART_REPLICA: case Type::SYNC_REPLICA: + case Type::FLUSH_DISTRIBUTED: if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table)) return false; break; @@ -61,6 +62,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & case Type::START_REPLICATED_SENDS: case Type::STOP_REPLICATION_QUEUES: case Type::START_REPLICATION_QUEUES: + case Type::STOP_DISTRIBUTED_SENDS: + case Type::START_DISTRIBUTED_SENDS: parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table); break; diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 35eca10bbfa12209000506b9ce86fde50e2e5d2b..ff780f6f7b7164f491e79e76a8b39f98d5354834 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -30,6 +30,7 @@ namespace DB namespace ErrorCodes { + extern const int ABORTED; extern const int INCORRECT_FILE_NAME; extern const int CHECKSUM_DOESNT_MATCH; extern const int TOO_LARGE_SIZE_COMPRESSED; @@ -58,12 +59,14 @@ namespace } -StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool) +StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( + StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker) : storage(storage), pool{pool}, path{storage.path + name + '/'} , current_batch_file_path{path + "current_batch.txt"} , default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} , sleep_time{default_sleep_time} , log{&Logger::get(getLoggerName())} + , monitor_blocker(monitor_blocker) { const Settings & settings = storage.global_context.getSettingsRef(); should_batch_inserts = settings.distributed_directory_monitor_batch_inserts; @@ -85,6 +88,14 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() } } +void StorageDistributedDirectoryMonitor::flushAllData() +{ + if (!quit) + { + std::unique_lock lock{mutex}; + processFiles(); + } +} void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() { @@ -114,18 +125,25 @@ void StorageDistributedDirectoryMonitor::run() { auto do_sleep = true; - try + if (!monitor_blocker.isCancelled()) { - do_sleep = !findFiles(); + try + { + do_sleep = !processFiles(); + } + catch (...) + { + do_sleep = true; + ++error_count; + sleep_time = std::min( + std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, + std::chrono::milliseconds{max_sleep_time}); + tryLogCurrentException(getLoggerName().data()); + } } - catch (...) + else { - do_sleep = true; - ++error_count; - sleep_time = std::min( - std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, - std::chrono::milliseconds{max_sleep_time}); - tryLogCurrentException(getLoggerName().data()); + LOG_DEBUG(log, "Skipping send data over distributed table."); } if (do_sleep) @@ -174,7 +192,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } -bool StorageDistributedDirectoryMonitor::findFiles() +bool StorageDistributedDirectoryMonitor::processFiles() { std::map files; diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index 2c95947355d22fded3b185e795fd2500d6636276..9416db9be2c870eeebe95133d9d5ccf15c21d993 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -19,15 +19,19 @@ namespace DB class StorageDistributedDirectoryMonitor { public: - StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool); + StorageDistributedDirectoryMonitor( + StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker); + ~StorageDistributedDirectoryMonitor(); static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); + void flushAllData(); + void shutdownAndDropAllData(); private: void run(); - bool findFiles(); + bool processFiles(); void processFile(const std::string & file_path); void processFilesWithBatching(const std::map & files); @@ -57,6 +61,7 @@ private: std::mutex mutex; std::condition_variable cond; Logger * log; + ActionBlocker & monitor_blocker; ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; /// Read insert query and insert settings for backward compatible. diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 4440d2b96ee142cf89e1f2b202f84ee4479c6193..826407e059991e8caa30563ef6d46ccb80f1576b 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -65,6 +65,10 @@ namespace ErrorCodes extern const int TOO_MANY_ROWS; } +namespace ActionLocks +{ + extern const StorageActionBlockType DistributedSend; +} namespace { @@ -427,7 +431,7 @@ void StorageDistributed::createDirectoryMonitors() void StorageDistributed::requireDirectoryMonitor(const std::string & name) { std::lock_guard lock(cluster_nodes_mutex); - cluster_nodes_data[name].requireDirectoryMonitor(name, *this); + cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker); } ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name) @@ -454,11 +458,17 @@ void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::strin conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage); } -void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::string & name, StorageDistributed & storage) +void StorageDistributed::ClusterNodeData::requireDirectoryMonitor( + const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker) { requireConnectionPool(name, storage); if (!directory_monitor) - directory_monitor = std::make_unique(storage, name, conneciton_pool); + directory_monitor = std::make_unique(storage, name, conneciton_pool, monitor_blocker); +} + +void StorageDistributed::ClusterNodeData::flushAllData() +{ + directory_monitor->flushAllData(); } void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() @@ -499,6 +509,22 @@ ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const Select return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()}); } +ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) +{ + if (type == ActionLocks::DistributedSend) + return monitors_blocker.cancel(); + return {}; +} + +void StorageDistributed::flushClusterNodesAllData() +{ + std::lock_guard lock(cluster_nodes_mutex); + + /// TODO: Maybe it should be executed in parallel + for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it) + it->second.flushAllData(); +} + void registerStorageDistributed(StorageFactory & factory) { diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 404a9a7265e9e31e135ed909623c4fc4fc87ce3b..fee3ba78d8d8bbe723612f512f66a86ce5e323b5 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -105,8 +106,11 @@ public: /// ensure connection pool creation and return it ConnectionPoolPtr requireConnectionPool(const std::string & name); + void flushClusterNodesAllData(); + ClusterPtr getCluster() const; + ActionLock getActionLock(StorageActionBlockType type) override; String table_name; String remote_database; @@ -135,7 +139,9 @@ public: /// Creates connection_pool if not exists. void requireConnectionPool(const std::string & name, const StorageDistributed & storage); /// Creates directory_monitor if not exists. - void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage); + void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker); + + void flushAllData(); void shutdownAndDropAllData(); }; @@ -145,6 +151,8 @@ public: /// Used for global monotonic ordering of files to send. SimpleIncrement file_names_increment; + ActionBlocker monitors_blocker; + protected: StorageDistributed( const String & database_name, diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index 0779132c37efe72224f56dcd15748121290a7d23..c843312e3d7aae13ea079640f90743d6405cbdef 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -370,6 +370,11 @@ void StorageMaterializedView::checkPartitionCanBeDropped(const ASTPtr & partitio target_table->checkPartitionCanBeDropped(partition); } +ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type) +{ + return has_inner_table ? getTargetTable()->getActionLock(type) : ActionLock{}; +} + void registerStorageMaterializedView(StorageFactory & factory) { factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args) diff --git a/dbms/src/Storages/StorageMaterializedView.h b/dbms/src/Storages/StorageMaterializedView.h index 8214875528d6efb8761150191cd90c460bd73cc1..5b14b90d77b1b9d5b1607dbae7dd779c8f453d94 100644 --- a/dbms/src/Storages/StorageMaterializedView.h +++ b/dbms/src/Storages/StorageMaterializedView.h @@ -51,6 +51,8 @@ public: StoragePtr getTargetTable() const; StoragePtr tryGetTargetTable() const; + ActionLock getActionLock(StorageActionBlockType type) override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, diff --git a/dbms/tests/integration/README.md b/dbms/tests/integration/README.md index 6643ed6dc9118b8fe0da9d47d4f28de2bed62c01..b22753313f69728092b8e9286b8495d3b889b8cf 100644 --- a/dbms/tests/integration/README.md +++ b/dbms/tests/integration/README.md @@ -14,7 +14,7 @@ Don't use Docker from your system repository. * [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip` * [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest` -* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python` +* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf` (highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-pymongo python-tzlocal python-kazoo python-psycopg2 python-kafka` diff --git a/dbms/tests/integration/test_distributed_system_query/__init__.py b/dbms/tests/integration/test_distributed_system_query/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dbms/tests/integration/test_distributed_system_query/configs/remote_servers.xml b/dbms/tests/integration/test_distributed_system_query/configs/remote_servers.xml new file mode 100644 index 0000000000000000000000000000000000000000..ebce4697529df10edc0fd206bed97aa8be9fa771 --- /dev/null +++ b/dbms/tests/integration/test_distributed_system_query/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_distributed_system_query/test.py b/dbms/tests/integration/test_distributed_system_query/test.py new file mode 100644 index 0000000000000000000000000000000000000000..c6e28c440341bc86aa6972a0b78fc4b035de9d19 --- /dev/null +++ b/dbms/tests/integration/test_distributed_system_query/test.py @@ -0,0 +1,41 @@ +from contextlib import contextmanager + +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml']) +node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml']) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + for node in (node1, node2): + node.query('''CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id;''') + + node1.query('''CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, id);''') + + yield cluster + + finally: + cluster.shutdown() + + +def test_start_and_stop_replica_send(started_cluster): + node1.query("SYSTEM STOP DISTRIBUTED SENDS distributed_table;") + + node1.query("INSERT INTO distributed_table VALUES (0, 'node1')") + node1.query("INSERT INTO distributed_table VALUES (1, 'node2')") + + # Write only to this node when stop distributed sends + assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1' + + node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;") + node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;") + assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2' +