未验证 提交 fd3abbe9 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #4935 from zhang2014/feature/support_system_replicas

Support system replicas queries for distributed
......@@ -13,6 +13,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsFetch = 2;
extern const StorageActionBlockType PartsSend = 3;
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
}
......
......@@ -15,6 +15,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageFactory.h>
#include <Parsers/ASTSystemQuery.h>
......@@ -22,6 +23,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <csignal>
#include <algorithm>
#include "InterpreterSystemQuery.h"
namespace DB
......@@ -42,6 +44,7 @@ namespace ActionLocks
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
}
......@@ -194,9 +197,18 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_REPLICATION_QUEUES:
startStopAction(context, query, ActionLocks::ReplicationQueue, true);
break;
case Type::STOP_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, false);
break;
case Type::START_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, true);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::FLUSH_DISTRIBUTED:
flushDistributed(query);
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
break;
......@@ -303,11 +315,21 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
StoragePtr table = context.getTable(database_name, table_name);
auto table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!table_replicated)
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
else
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
table_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String & table_name = query.target_table;
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
storage_distributed->flushClusterNodesAllData();
else
throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
......
......@@ -31,6 +31,7 @@ private:
void restartReplicas(Context & system_context);
void syncReplica(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
};
......
......@@ -41,6 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RESTART REPLICA";
case Type::SYNC_REPLICA:
return "SYNC REPLICA";
case Type::FLUSH_DISTRIBUTED:
return "FLUSH DISTRIBUTED";
case Type::RELOAD_DICTIONARY:
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
......@@ -65,6 +67,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP REPLICATION QUEUES";
case Type::START_REPLICATION_QUEUES:
return "START REPLICATION QUEUES";
case Type::STOP_DISTRIBUTED_SENDS:
return "STOP DISTRIBUTED SENDS";
case Type::START_DISTRIBUTED_SENDS:
return "START DISTRIBUTED SENDS";
case Type::FLUSH_LOGS:
return "FLUSH LOGS";
default:
......@@ -99,12 +105,14 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATED_SENDS
|| type == Type::START_REPLICATED_SENDS
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES)
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
{
if (!target_table.empty())
print_database_table();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA)
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)
{
print_database_table();
}
......
......@@ -40,6 +40,9 @@ public:
STOP_REPLICATION_QUEUES,
START_REPLICATION_QUEUES,
FLUSH_LOGS,
FLUSH_DISTRIBUTED,
STOP_DISTRIBUTED_SENDS,
START_DISTRIBUTED_SENDS,
END
};
......
......@@ -49,6 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::FLUSH_DISTRIBUTED:
if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table))
return false;
break;
......@@ -61,6 +62,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table);
break;
......
......@@ -30,6 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int INCORRECT_FILE_NAME;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
......@@ -58,12 +59,14 @@ namespace
}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool)
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker)
: storage(storage), pool{pool}, path{storage.path + name + '/'}
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker)
{
const Settings & settings = storage.global_context.getSettingsRef();
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
......@@ -85,6 +88,14 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
}
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
if (!quit)
{
std::unique_lock lock{mutex};
processFiles();
}
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
......@@ -114,18 +125,25 @@ void StorageDistributedDirectoryMonitor::run()
{
auto do_sleep = true;
try
if (!monitor_blocker.isCancelled())
{
do_sleep = !findFiles();
try
{
do_sleep = !processFiles();
}
catch (...)
{
do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data());
}
}
catch (...)
else
{
do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data());
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
......@@ -174,7 +192,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
bool StorageDistributedDirectoryMonitor::findFiles()
bool StorageDistributedDirectoryMonitor::processFiles()
{
std::map<UInt64, std::string> files;
......
......@@ -19,15 +19,19 @@ namespace DB
class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool);
StorageDistributedDirectoryMonitor(
StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker);
~StorageDistributedDirectoryMonitor();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
void flushAllData();
void shutdownAndDropAllData();
private:
void run();
bool findFiles();
bool processFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
......@@ -57,6 +61,7 @@ private:
std::mutex mutex;
std::condition_variable cond;
Logger * log;
ActionBlocker & monitor_blocker;
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.
......
......@@ -65,6 +65,10 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS;
}
namespace ActionLocks
{
extern const StorageActionBlockType DistributedSend;
}
namespace
{
......@@ -427,7 +431,7 @@ void StorageDistributed::createDirectoryMonitors()
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker);
}
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
......@@ -454,11 +458,17 @@ void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::strin
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
}
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::string & name, StorageDistributed & storage)
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker)
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool);
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
}
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
......@@ -499,6 +509,22 @@ ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const Select
return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
}
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::DistributedSend)
return monitors_blocker.cancel();
return {};
}
void StorageDistributed::flushClusterNodesAllData()
{
std::lock_guard lock(cluster_nodes_mutex);
/// TODO: Maybe it should be executed in parallel
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it)
it->second.flushAllData();
}
void registerStorageDistributed(StorageFactory & factory)
{
......
......@@ -11,6 +11,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
#include <Common/ActionBlocker.h>
namespace DB
......@@ -105,8 +106,11 @@ public:
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
void flushClusterNodesAllData();
ClusterPtr getCluster() const;
ActionLock getActionLock(StorageActionBlockType type) override;
String table_name;
String remote_database;
......@@ -135,7 +139,9 @@ public:
/// Creates connection_pool if not exists.
void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage);
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker);
void flushAllData();
void shutdownAndDropAllData();
};
......@@ -145,6 +151,8 @@ public:
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
ActionBlocker monitors_blocker;
protected:
StorageDistributed(
const String & database_name,
......
......@@ -370,6 +370,11 @@ void StorageMaterializedView::checkPartitionCanBeDropped(const ASTPtr & partitio
target_table->checkPartitionCanBeDropped(partition);
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
return has_inner_table ? getTargetTable()->getActionLock(type) : ActionLock{};
}
void registerStorageMaterializedView(StorageFactory & factory)
{
factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args)
......
......@@ -51,6 +51,8 @@ public:
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
ActionLock getActionLock(StorageActionBlockType type) override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
......
......@@ -14,7 +14,7 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-pymongo python-tzlocal python-kazoo python-psycopg2 python-kafka`
......
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
from contextlib import contextmanager
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query('''CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id;''')
node1.query('''CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, id);''')
yield cluster
finally:
cluster.shutdown()
def test_start_and_stop_replica_send(started_cluster):
node1.query("SYSTEM STOP DISTRIBUTED SENDS distributed_table;")
node1.query("INSERT INTO distributed_table VALUES (0, 'node1')")
node1.query("INSERT INTO distributed_table VALUES (1, 'node2')")
# Write only to this node when stop distributed sends
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1'
node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;")
node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;")
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册