提交 d9ce96f8 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Fixed test and misspellings. [#CLICKHOUSE-3207]

上级 f815498e
......@@ -33,10 +33,6 @@
#include <Common/isLocalAddress.h>
#include <Poco/Timestamp.h>
#include <ext/scope_guard.h>
#include <experimental/optional>
namespace DB
{
......@@ -209,12 +205,12 @@ static bool isSupportedAlterType(int type)
ASTAlterQuery::DROP_PARTITION
};
return supported_alter_types.count(type);
return supported_alter_types.count(type) != 0;
}
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
: context(context_)
: context(context_), log(&Logger::get("DDLWorker"))
{
queue_dir = zk_root_dir;
if (queue_dir.back() == '/')
......@@ -222,8 +218,8 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const
if (config)
{
task_max_lifetime = config->getUInt64(prefix + "task_max_lifetime", task_max_lifetime);
cleanup_delay_period = config->getUInt64(prefix + "cleanup_delay_period", cleanup_delay_period);
task_max_lifetime = config->getUInt64(prefix + "task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
cleanup_delay_period = config->getUInt64(prefix + "cleanup_delay_period", static_cast<UInt64>(cleanup_delay_period));
max_tasks_in_queue = std::max(1UL, config->getUInt64(prefix + "max_tasks_in_queue ", max_tasks_in_queue));
}
......@@ -622,12 +618,12 @@ void DDLWorker::processTaskAlter(
if (execute_once_on_replica)
{
/// The following code can perform ALTER twice if
/// current server aquires lock, executes replicated alter,
/// losts zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
/// The following code can perform ALTER twice if:
/// current server acquires the lock, executes replicated alter,
/// loses zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
/// To avoid this problem alter() method of replicated tables should be changed and takes into account ddl query id tag.
if (!context.getSettingsRef().distributed_ddl_allow_replicated_alter)
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Distributed DDL alters for replicated tables don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
......@@ -646,7 +642,7 @@ void DDLWorker::processTaskAlter(
String is_executed_path = shard_path + "/executed";
zookeeper->createAncestors(shard_path + "/");
bool alter_executed_by_replica = false;
bool alter_executed_by_any_replica = false;
{
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
std::mt19937 rng(std::hash<String>{}(task.host_id_str) + reinterpret_cast<intptr_t>(&rng));
......@@ -655,7 +651,7 @@ void DDLWorker::processTaskAlter(
{
if (zookeeper->exists(is_executed_path))
{
alter_executed_by_replica = true;
alter_executed_by_any_replica = true;
break;
}
......@@ -665,12 +661,12 @@ void DDLWorker::processTaskAlter(
if (execute_on_leader_replica && task.execution_status.code == ErrorCodes::NOT_IMPLEMENTED)
{
/// TODO: it is ok to recieve exception "host is not leader"
/// TODO: it is ok to receive exception "host is not leader"
}
zookeeper->create(is_executed_path, task.host_id_str, zkutil::CreateMode::Persistent);
lock->unlock();
alter_executed_by_replica = true;
alter_executed_by_any_replica = true;
break;
}
......@@ -678,7 +674,7 @@ void DDLWorker::processTaskAlter(
}
}
if (!alter_executed_by_replica)
if (!alter_executed_by_any_replica)
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot enqueue replicated DDL query");
}
else
......@@ -691,8 +687,8 @@ void DDLWorker::processTaskAlter(
void DDLWorker::cleanupQueue()
{
/// Both ZK and Poco use Unix epoch
size_t current_time_seconds = Poco::Timestamp().epochTime();
constexpr size_t zookeeper_time_resolution = 1000;
Int64 current_time_seconds = Poco::Timestamp().epochTime();
constexpr Int64 zookeeper_time_resolution = 1000;
// Too early to check
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
......@@ -719,7 +715,8 @@ void DDLWorker::cleanupQueue()
try
{
/// To avoid concurrent checks and cleans
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
if (!lock->tryLock())
continue;
......@@ -890,6 +887,9 @@ void DDLWorker::run()
LOG_ERROR(log, "Unexpected ZooKeeper error: " << getCurrentExceptionMessage(true) << ". Terminating...");
throw;
}
/// Unlock the processing just in case
event_queue_updated->set();
}
catch (...)
{
......@@ -1056,17 +1056,28 @@ private:
};
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, Context & context)
{
const auto query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ptr.get());
ASTPtr query_ptr;
/// Remove FORMAT ... INTO OUTFILE if exists
if (dynamic_cast<const ASTQueryWithOutput *>(query_ptr_.get()))
{
query_ptr = query_ptr_->clone();
auto query_with_output = dynamic_cast<ASTQueryWithOutput *>(query_ptr.get());
query_with_output->out_file = nullptr;
query_with_output->format = nullptr;
}
else
query_ptr = query_ptr_;
auto query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ptr.get());
if (!query)
{
throw Exception("Distributed execution is not supported for such DDL queries",
ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
}
auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get());
if (query_alter)
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get()))
{
for (const auto & param : query_alter->parameters)
{
......
......@@ -57,28 +57,24 @@ private:
bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);
/// Checks and cleanups queue's nodes
void cleanupQueue();
/// Init task node
void createStatusDirs(const std::string & node_name);
ASTPtr getRewrittenQuery(const DDLLogEntry & node);
void run();
private:
Context & context;
Logger * log = &Logger::get("DDLWorker");
Logger * log;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
std::string master_dir; /// dir with queries was initiated by the server
/// Last task that was skipped or sucesfully executed
/// Name of last task that was skipped or successfully executed
std::string last_processed_task_name;
std::shared_ptr<zkutil::ZooKeeper> zookeeper;
......@@ -91,12 +87,12 @@ private:
std::atomic<bool> stop_flag{false};
std::thread thread;
size_t last_cleanup_time_seconds = 0;
Int64 last_cleanup_time_seconds = 0;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
size_t cleanup_delay_period = 60; // minute (in seconds)
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
size_t task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
......
......@@ -3,6 +3,6 @@
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>1</cleanup_delay_period>
<cleanup_delay_period>5</cleanup_delay_period>
</distributed_ddl>
</yandex>
\ No newline at end of file
......@@ -141,9 +141,9 @@ def test_create_view(started_cluster):
ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test.super_simple (i Int8) ON CLUSTER 'cluster'")
ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册