提交 e4785aeb 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Simplified clean queue logic. Added more tests. [#CLICKHOUSE-3128]

上级 c2e84d6d
......@@ -258,7 +258,7 @@ bool ExecutionStatus::tryDeserializeText(const std::string & data)
ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message)
{
String msg = start_of_message.empty() ? "" : (start_of_message + ": " + getCurrentExceptionMessage(false, true));
String msg = (start_of_message.empty() ? "" : (start_of_message + ": ")) + getCurrentExceptionMessage(false, true);
return ExecutionStatus(getCurrentExceptionCode(), msg);
}
......
......@@ -138,3 +138,8 @@ void Lock::unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_loc
}
}
void Lock::unlockAssumeLockNodeRemovedManually()
{
locked.reset(nullptr);
}
......@@ -60,6 +60,7 @@ namespace zkutil
void unlock();
void unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_locks);
void unlockAssumeLockNodeRemovedManually();
bool tryLock();
......
......@@ -95,7 +95,7 @@ public:
/// Throw an exception if something went wrong.
std::string create(const std::string & path, const std::string & data, int32_t mode);
/// Doesn not throw in the following cases:
/// Does not throw in the following cases:
/// * The parent for the created node does not exist
/// * The parent is ephemeral.
/// * The node already exists.
......
......@@ -381,6 +381,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_UUID = 376;
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE = 377;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS = 378;
extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK = 379;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -49,8 +49,9 @@ namespace ErrorCodes
extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED;
extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int UNFINISHED;
extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK;
}
......@@ -189,6 +190,15 @@ struct DDLTask
};
static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
}
static bool isSupportedAlterType(int type)
{
static const std::unordered_set<int> supported_alter_types{
......@@ -214,6 +224,7 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const
{
task_max_lifetime = config->getUInt64(prefix + "task_max_lifetime", task_max_lifetime);
cleanup_delay_period = config->getUInt64(prefix + "cleanup_delay_period", cleanup_delay_period);
max_tasks_in_queue = std::max(1UL, config->getUInt64(prefix + "max_tasks_in_queue ", max_tasks_in_queue));
}
host_fqdn = getFQDNOrHostName();
......@@ -301,18 +312,24 @@ bool DDLWorker::initAndCheckTask(const String & entry_name)
}
static void filterAndSortQueueNodes(Strings & all_nodes)
{
all_nodes.erase(std::remove_if(all_nodes.begin(), all_nodes.end(), [] (const String & s) { return !startsWith(s, "query-"); }), all_nodes.end());
std::sort(all_nodes.begin(), all_nodes.end());
}
void DDLWorker::processTasks()
{
LOG_DEBUG(log, "Processing tasks");
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, event_queue_updated);
queue_nodes.erase(std::remove_if(queue_nodes.begin(), queue_nodes.end(), [&] (const String & s) { return !startsWith(s, "query-"); }), queue_nodes.end());
filterAndSortQueueNodes(queue_nodes);
if (queue_nodes.empty())
return;
bool server_startup = last_processed_task_name.empty();
std::sort(queue_nodes.begin(), queue_nodes.end());
auto begin_node = server_startup
? queue_nodes.begin()
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_task_name);
......@@ -434,7 +451,7 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
return;
LOG_WARNING(log, "Not found the exact match of host " << task.host_id.readableString() << " from task " << task.entry_name
<< " in " << " cluster " << task.cluster_name << " definition. Will try to find it using host name resolving.");
<< " in cluster " << task.cluster_name << " definition. Will try to find it using host name resolving.");
bool found_via_resolving = false;
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
......@@ -505,13 +522,26 @@ void DDLWorker::processTask(DDLTask & task)
{
LOG_DEBUG(log, "Processing task " << task.entry_name << " (" << task.entry.query << ")");
createStatusDirs(task.entry_path);
String dummy;
String active_node_path = task.entry_path + "/active/" + task.host_id_str;
auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != ZOK && code != ZNODEEXISTS)
String finished_node_path = task.entry_path + "/finished/" + task.host_id_str;
auto code = zookeeper->tryCreateWithRetries(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
if (code == ZOK || code == ZNODEEXISTS)
{
// Ok
}
else if (code == ZNONODE)
{
/// There is no parent
createStatusDirs(task.entry_path);
if (ZOK != zookeeper->tryCreateWithRetries(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw zkutil::KeeperException(code, active_node_path);
}
else
throw zkutil::KeeperException(code, active_node_path);
if (!task.was_executed)
{
try
......@@ -548,13 +578,10 @@ void DDLWorker::processTask(DDLTask & task)
/// Delete active flag and create finish flag
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(task.entry_path + "/active/" + task.host_id_str, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(task.entry_path + "/finished/" + task.host_id_str,
task.execution_status.serializeText(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
code = zookeeper->tryMulti(ops);
if (code != ZOK)
throw zkutil::KeeperException("Cannot commit executed task to ZooKeeper " + task.entry_name, code);
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(active_node_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(finished_node_path, task.execution_status.serializeText(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
}
......@@ -617,10 +644,7 @@ void DDLWorker::processTaskAlter(
bool alter_executed_by_replica = false;
{
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
zkutil::Lock lock(zookeeper_holder, shard_path, "lock", task.host_id_str);
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
std::mt19937 rng(std::hash<String>{}(task.host_id_str) + reinterpret_cast<intptr_t>(&rng));
for (int num_tries = 0; num_tries < 10; ++num_tries)
......@@ -631,7 +655,7 @@ void DDLWorker::processTaskAlter(
break;
}
if (lock.tryLock())
if (lock->tryLock())
{
tryExecuteQuery(rewritten_query, task, task.execution_status);
......@@ -641,7 +665,7 @@ void DDLWorker::processTaskAlter(
}
zookeeper->create(is_executed_path, task.host_id_str, zkutil::CreateMode::Persistent);
lock.unlock();
lock->unlock();
alter_executed_by_replica = true;
break;
}
......@@ -660,7 +684,7 @@ void DDLWorker::processTaskAlter(
}
void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
void DDLWorker::cleanupQueue()
{
/// Both ZK and Poco use Unix epoch
size_t current_time_seconds = Poco::Timestamp().epochTime();
......@@ -674,111 +698,70 @@ void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
LOG_DEBUG(log, "Cleaning queue");
Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(queue_dir);
const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched;
Strings queue_nodes = zookeeper->getChildren(queue_dir);
filterAndSortQueueNodes(queue_nodes);
size_t num_outdated_nodes = (queue_nodes.size() > max_tasks_in_queue) ? queue_nodes.size() - max_tasks_in_queue : 0;
auto first_non_outdated_node = queue_nodes.begin() + num_outdated_nodes;
for (const String & node_name : node_names)
for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
{
String node_name = *it;
String node_path = queue_dir + "/" + node_name;
String lock_path = node_path + "/lock_write"; /// per-node lock to avoid concurrent cleaning
bool node_was_deleted = false;
auto delete_node = [&] ()
{
Strings childs = zookeeper->getChildren(node_path);
for (const String & child : childs)
{
if (child != "lock_write")
zookeeper->removeRecursive(node_path + "/" + child);
}
String lock_path = node_path + "/lock";
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(lock_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path, -1));
zookeeper->multi(ops);
node_was_deleted = true;
};
zkutil::Stat stat;
String dummy;
try
{
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Check>(node_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(lock_path, host_fqdn_id, zookeeper->getDefaultACL(),
zkutil::CreateMode::Ephemeral));
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
/// To avoid concurrent checks and cleans
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
if (!lock->tryLock())
continue;
auto delete_node = [&] ()
{
if (code == ZNONODE)
{
/// Task node was deleted
continue;
}
else if (code == ZNODEEXISTS)
Strings childs = zookeeper->getChildren(node_path);
for (const String & child : childs)
{
/// Is it our lock?
String owner;
if (!zookeeper->tryGet(lock_path, owner) || owner != host_fqdn_id)
continue;
if (child != "lock")
zookeeper->tryRemoveRecursive(node_path + "/" + child);
}
else
throw zkutil::KeeperException(code);
}
SCOPE_EXIT({
if (!node_was_deleted && !zookeeper->expired())
{
try
{
zookeeper->tryRemoveWithRetries(lock_path, -1);
}
catch (...)
{
tryLogCurrentException(log, "Can't remove lock for cleaning");
}
}
});
/// Remove the lock node and its parent atomically
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(lock_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path, -1));
zookeeper->multi(ops);
zkutil::Stat stat;
String node_data = zookeeper->get(node_path, &stat);
lock->unlockAssumeLockNodeRemovedManually();
};
size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
if (zookeeper_time_seconds + task_max_lifetime < current_time_seconds)
/// Skip if there are active nodes (it is weak guard)
if (zookeeper->tryGet(node_path + "/active", dummy, &stat) && stat.numChildren > 0)
continue;
/// Delete if too many (max_tasks_in_queue) task in queue
if (it < first_non_outdated_node)
{
size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
LOG_INFO(log, "Lifetime of task " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
LOG_INFO(log, "Task " << node_name << " is outdated, deleting it");
delete_node();
continue;
}
Strings active_hosts = zookeeper->getChildren(node_path + "/active");
if (!active_hosts.empty())
continue;
Strings finished_hosts = zookeeper->getChildren(node_path + "/finished");
DDLLogEntry entry;
entry.parse(node_data);
/// Not all nodes were finished
if (finished_hosts.size() < entry.hosts.size())
continue;
zookeeper->get(node_path, &stat);
size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
/// Could be childs that are not from host list
bool all_finished = true;
NameSet finished_hosts_set(finished_hosts.begin(), finished_hosts.end());
for (const HostID & host : entry.hosts)
/// Delte if node lifetmie (task_max_lifetime) is expired
if (zookeeper_time_seconds + task_max_lifetime < current_time_seconds)
{
if (!finished_hosts_set.count(host.toString()))
{
all_finished = false;
break;
}
}
size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
LOG_INFO(log, "Lifetime of task " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
if (all_finished)
{
LOG_INFO(log, "Task " << node_name << " had been executed by each host, deleting it");
delete_node();
continue;
}
}
catch (...)
......@@ -806,13 +789,35 @@ void DDLWorker::createStatusDirs(const std::string & node_path)
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
{
if (entry.hosts.empty())
return {};
throw Exception("Empty host list in a distributed DDL task", ErrorCodes::LOGICAL_ERROR);
String query_path_prefix = queue_dir + "/query-";
zookeeper->createAncestors(query_path_prefix);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
createStatusDirs(node_path);
String node_path;
try
{
node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
}
catch (const zkutil::KeeperException & e)
{
/// TODO: This condition could be relaxed with additional post-checks
if (e.isTemporaryError())
throw Exception("Unknown status of distributed DDL task", ErrorCodes::UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK);
throw;
}
/// Optional step
try
{
createStatusDirs(node_path);
}
catch (...)
{
LOG_INFO(log, "An error occurred while creating auxiliary ZooKeeper directories in " << node_path << " . They will be created later"
<< ". Error : " << getCurrentExceptionMessage(true));
}
return node_path;
}
......
......@@ -59,7 +59,7 @@ private:
/// Checks and cleanups queue's nodes
void cleanupQueue(const Strings * node_names_to_check = nullptr);
void cleanupQueue();
void createStatusDirs(const std::string & node_name);
......@@ -93,10 +93,12 @@ private:
size_t last_cleanup_time_seconds = 0;
/// Delete node if its age is greater than that
size_t task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
size_t cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
size_t task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
friend class DDLQueryStatusInputSream;
friend class DDLTask;
......
......@@ -8,7 +8,7 @@ class Client:
def __init__(self, host, port=9000, command='/usr/bin/clickhouse-client'):
self.host = host
self.port = port
self.command = [command, '--host', self.host, '--port', str(self.port)]
self.command = [command, '--host', self.host, '--port', str(self.port), '--stacktrace']
def query(self, sql, stdin=None, timeout=None):
......
......@@ -49,7 +49,8 @@ class ClickHouseCluster:
self.is_up = False
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False, clickhouse_path_dir=None):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False,
clickhouse_path_dir=None, hostname=None):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
......@@ -65,7 +66,10 @@ class ClickHouseCluster:
if name in self.instances:
raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
instance = ClickHouseInstance(self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir)
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
......@@ -135,7 +139,7 @@ version: '2'
services:
{name}:
image: ubuntu:14.04
hostname: {name}
hostname: {hostname}
user: '{uid}'
volumes:
- {binary_path}:/usr/bin/clickhouse:ro
......@@ -153,12 +157,13 @@ services:
class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir):
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
self.name = name
self.base_cmd = cluster.base_cmd[:]
self.docker_id = cluster.get_instance_docker_id(self.name)
self.cluster = cluster
self.hostname = hostname if hostname is not None else self.name
self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
......@@ -187,6 +192,16 @@ class ClickHouseInstance:
return self.client.get_query_request(*args, **kwargs)
def exec_in_container(self, cmd, **kwargs):
container = self.get_docker_handle()
handle = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(handle).decode('utf8')
exit_code = self.docker_client.api.exec_inspect(handle)['ExitCode']
if exit_code:
raise Exception('Cmd {} failed! Return code {}. Output {}'.format(' '.join(cmd), exit_code, output))
return output
def get_docker_handle(self):
return self.docker_client.containers.get(self.docker_id)
......@@ -301,6 +316,7 @@ class ClickHouseInstance:
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
name=self.name,
hostname=self.hostname,
uid=os.getuid(),
binary_path=self.server_bin_path,
configs_dir=configs_dir,
......
......@@ -56,6 +56,15 @@ class PartitionManager:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
def pop_rules(self):
res = self._iptables_rules[:]
self.heal_all()
return res
def push_rules(self, rules):
for rule in rules:
self._add_rule(rule)
@staticmethod
def _check_instance(instance):
......@@ -77,6 +86,18 @@ class PartitionManager:
self.heal_all()
class PartitionManagerDisbaler:
def __init__(self, manager):
self.manager = manager
self.rules = self.manager.pop_rules()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.manager.push_rules(self.rules)
class _NetworkManager:
"""Execute commands inside a container with access to network settings.
......
<yandex>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<remote_servers>
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<remote_servers>
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<remote_servers>
<cluster_without_replication>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_without_replication>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<remote_servers>
<!-- Main cluster -->
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
<!-- Cluster with specified default database -->
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
<!-- Cluster without replicas -->
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
<!-- Cluster without internal replication -->
<cluster_without_replication>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_without_replication>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>1</cleanup_delay_period>
</distributed_ddl>
</yandex>
\ No newline at end of file
......@@ -4,7 +4,7 @@ import datetime
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.network import PartitionManager, PartitionManagerDisbaler
from helpers.test_tools import TSV
......@@ -17,9 +17,9 @@ def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, tsv_content
assert len(set(codes)) == 1, tsv_content
assert codes[0] == "0", tsv_content
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(instance, query, num_hosts=None):
......@@ -50,8 +50,19 @@ TEST_REPLICATED_ALTERS=True
cluster = ClickHouseCluster(__file__)
@pytest.fixture(scope="module")
def started_cluster():
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()
for inst_name, inst in cluster.instances.items():
clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))
for inst_name in instances_to_replace:
inst = cluster.instances[inst_name]
cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
def init_cluster(cluster):
try:
for i in xrange(4):
cluster.add_instance(
......@@ -62,15 +73,19 @@ def started_cluster():
cluster.start()
# Replace config files for testing ability to set host in DNS and IP formats
replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = cluster.instances['ch4']
cluster.pm_random_drops = PartitionManager()
cluster.pm_random_drops._add_rule({'probability': 0.05, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.05, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
# Initialize databases and service tables
instance = cluster.instances['ch1']
instance.query("SELECT 1")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
(database String, name String, engine String, metadata_modification_time DateTime)
......@@ -79,20 +94,31 @@ CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
except Exception as e:
print e
raise
@pytest.fixture(scope="module")
def started_cluster():
try:
init_cluster(cluster)
yield cluster
instance = cluster.instances['ch1']
ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
finally:
# Remove iptables rules for sacrifice instance
cluster.pm_random_drops.heal_all()
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in cluster.instances.values():
ddl_check_there_are_no_dublicates(instance)
finally:
# Remove iptables rules for sacrifice instance
cluster.pm_random_drops.heal_all()
#cluster.shutdown()
......@@ -163,10 +189,14 @@ def test_replicated_alters(started_cluster):
if not TEST_REPLICATED_ALTERS:
return
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge, i)
......@@ -200,6 +230,10 @@ ENGINE = Distributed(cluster, default, merge, i)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster")
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册