提交 614e0d9b 编写于 作者: A Alexander Tokmakov

just another fix for ddl worker

上级 e8df9971
......@@ -320,6 +320,8 @@ std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from
String DDLTaskBase::getLogEntryName(UInt32 log_entry_number)
{
/// Sequential counter in ZooKeeper is Int32.
assert(log_entry_number < std::numeric_limits<Int32>::max());
constexpr size_t seq_node_digits = 10;
String number = toString(log_entry_number);
String name = "query-" + String(seq_node_digits - number.size(), '0') + number;
......@@ -330,7 +332,9 @@ UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)
{
constexpr const char * name = "query-";
assert(startsWith(log_entry_name, name));
return parse<UInt32>(log_entry_name.substr(strlen(name)));
UInt32 num = parse<UInt32>(log_entry_name.substr(strlen(name)));
assert(num < std::numeric_limits<Int32>::max());
return num;
}
void ZooKeeperMetadataTransaction::commit()
......
......@@ -48,6 +48,7 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed";
namespace
{
......@@ -290,7 +291,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
if (zookeeper->exists(task->getFinishedNodePath()))
{
out_reason = "Task has been already processed";
out_reason = TASK_PROCESSED_OUT_REASON;
return {};
}
......@@ -311,51 +312,117 @@ void DDLWorker::scheduleTasks(bool reinitialized)
auto zookeeper = tryGetZooKeeper();
/// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
/// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status.
/// We have some unfinished tasks.
/// To avoid duplication of some queries we should try to write execution status again.
/// To avoid skipping of some entries which were not executed we should be careful when choosing begin_node to start from.
/// NOTE: It does not protect from all cases of query duplication, see also comments in processTask(...)
if (reinitialized)
{
for (auto & task : current_tasks)
if (current_tasks.empty())
LOG_TRACE(log, "Don't have unfinished tasks after restarting");
else
LOG_INFO(log, "Have {} unfinished tasks, will check them", current_tasks.size());
assert(current_tasks.size() <= pool_size + (worker_pool != nullptr));
auto task_it = current_tasks.begin();
while (task_it != current_tasks.end())
{
if (task->was_executed)
auto & task = *task_it;
if (task->completely_processed)
{
assert(task->was_executed);
/// Status must be written (but finished/ node may not exist if entry was deleted).
/// If someone is deleting entry concurrently, then /active status dir must not exist.
assert(zookeeper->exists(task->getFinishedNodePath()) || !zookeeper->exists(fs::path(task->entry_path) / "active"));
++task_it;
}
else if (task->was_executed)
{
bool task_still_exists = zookeeper->exists(task->entry_path);
/// Connection was lost on attempt to write status. Will retry.
bool status_written = zookeeper->exists(task->getFinishedNodePath());
if (!status_written && task_still_exists)
{
/// You might think that the following condition is redundant, because status_written implies completely_processed.
/// But it's wrong. It's possible that (!task->completely_processed && status_written)
/// if ZooKeeper successfully received and processed our request
/// but we lost connection while waiting for the response.
/// Yeah, distributed systems is a zoo.
if (status_written)
task->completely_processed = true;
else
processTask(*task, zookeeper);
}
++task_it;
}
else
{
/// We didn't even executed a query, so let's just remove it.
/// We will try to read the task again and execute it from the beginning.
if (!first_failed_task_name || task->entry_name < *first_failed_task_name)
first_failed_task_name = task->entry_name;
task_it = current_tasks.erase(task_it);
}
}
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
filterAndSortQueueNodes(queue_nodes);
if (queue_nodes.empty())
{
LOG_TRACE(log, "No tasks to schedule");
return;
}
else if (max_tasks_in_queue < queue_nodes.size())
if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();
/// Detect queue start, using:
/// - skipped tasks
/// - in memory tasks (that are currently active)
/// - in memory tasks (that are currently active or were finished recently)
/// - failed tasks (that should be processed again)
auto begin_node = queue_nodes.begin();
UInt64 last_task_id = 0;
if (!current_tasks.empty())
if (first_failed_task_name)
{
auto & last_task = current_tasks.back();
last_task_id = DDLTaskBase::getLogEntryNumber(last_task->entry_name);
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task->entry_name);
/// If we had failed tasks, then we should start from the first failed task.
assert(reinitialized);
begin_node = std::lower_bound(queue_nodes.begin(), queue_nodes.end(), first_failed_task_name);
}
if (last_skipped_entry_name)
else
{
UInt64 last_skipped_entry_id = DDLTaskBase::getLogEntryNumber(*last_skipped_entry_name);
if (last_skipped_entry_id > last_task_id)
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_skipped_entry_name);
/// We had no failed tasks. Let's just choose the maximum entry we have previously seen.
String last_task_name;
if (!current_tasks.empty())
last_task_name = current_tasks.back()->entry_name;
if (last_skipped_entry_name && last_task_name < *last_skipped_entry_name)
last_task_name = *last_skipped_entry_name;
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task_name);
}
if (begin_node == queue_nodes.end())
LOG_DEBUG(log, "No tasks to schedule");
else
LOG_DEBUG(log, "Will schedule {} tasks starting from {}", std::distance(begin_node, queue_nodes.end()), *begin_node);
/// Let's ensure that it's exactly the first task we should process.
/// Maybe such asserts are too paranoid and excessive,
/// but it's easy enough to break DDLWorker in a very unobvious way by making some minor change in code.
[[maybe_unused]] bool have_no_tasks_info = !first_failed_task_name && current_tasks.empty() && !last_skipped_entry_name;
assert(have_no_tasks_info || queue_nodes.end() == std::find_if(queue_nodes.begin(), queue_nodes.end(), [&](const String & entry_name)
{
/// We should return true if some invariants are violated.
String reason;
auto task = initAndCheckTask(entry_name, reason, zookeeper);
bool maybe_currently_processing = current_tasks.end() != std::find_if(current_tasks.begin(), current_tasks.end(), [&](const auto & t)
{
return t->entry_name == entry_name;
});
/// begin_node is something like a log pointer
if (begin_node == queue_nodes.end() || entry_name < *begin_node)
{
/// Return true if entry should be scheduled.
/// There is a minor race condition: initAndCheckTask(...) may return not null
/// if someone is deleting outdated entry right now (including finished/ nodes), so we also check active/ status dir.
bool maybe_concurrently_deleting = task && !zookeeper->exists(fs::path(task->entry_path) / "active");
return task && !maybe_concurrently_deleting && !maybe_currently_processing;
}
else
{
/// Return true if entry should not be scheduled.
bool processed = !task && reason == TASK_PROCESSED_OUT_REASON;
return processed || maybe_currently_processing;
}
}));
for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{
String entry_name = *it;
......@@ -391,8 +458,18 @@ void DDLWorker::scheduleTasks(bool reinitialized)
DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
{
current_tasks.remove_if([](const DDLTaskPtr & t) { return t->completely_processed.load(); });
assert(current_tasks.size() <= pool_size);
/// Tasks are scheduled and executed in main thread <==> Parallel execution is disabled
assert((worker_pool != nullptr) == (1 < pool_size));
/// Parallel execution is disabled ==> All previous tasks are failed to start or finished,
/// so current tasks list must be empty when we are ready to process new one.
assert(worker_pool || current_tasks.empty());
/// Parallel execution is enabled ==> Not more than pool_size tasks are currently executing.
/// Note: If current_tasks.size() == pool_size, then all worker threads are busy,
/// so we will wait on worker_pool->scheduleOrThrowOnError(...)
assert(!worker_pool || current_tasks.size() <= pool_size);
current_tasks.emplace_back(std::move(task));
if (first_failed_task_name && *first_failed_task_name == current_tasks.back()->entry_name)
first_failed_task_name.reset();
return *current_tasks.back();
}
......@@ -479,10 +556,15 @@ void DDLWorker::updateMaxDDLEntryID(const String & entry_name)
void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
{
LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
assert(!task.completely_processed);
String active_node_path = task.getActiveNodePath();
String finished_node_path = task.getFinishedNodePath();
/// Step 1: Create ephemeral node in active/ status dir.
/// It allows other hosts to understand that task is currently executing (useful for system.distributed_ddl_queue)
/// and protects from concurrent deletion or the task.
/// It will tryRemove(...) on exception
auto active_node = zkutil::EphemeralNodeHolder::existing(active_node_path, *zookeeper);
......@@ -498,7 +580,21 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
/// Status dirs were not created in enqueueQuery(...) or someone is removing entry
if (create_active_res == Coordination::Error::ZNONODE)
{
assert(dynamic_cast<DatabaseReplicatedTask *>(&task) == nullptr);
if (task.was_executed)
{
/// Special case:
/// Task was executed (and we are trying to write status after connection loss) ==> Status dirs were previously created.
/// (Status dirs were previously created AND active/ does not exist) ==> Task was removed.
/// We cannot write status, but it's not required anymore, because no one will try to execute it again.
/// So we consider task as completely processed.
LOG_WARNING(log, "Task {} is executed, but looks like entry {} was deleted, cannot write status", task.entry_name, task.entry_path);
task.completely_processed = true;
return;
}
createStatusDirs(task.entry_path, zookeeper);
}
if (create_active_res == Coordination::Error::ZNODEEXISTS)
{
......@@ -508,7 +604,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
String dummy;
if (zookeeper->tryGet(active_node_path, dummy, nullptr, eph_node_disappeared))
{
constexpr int timeout_ms = 5000;
constexpr int timeout_ms = 30 * 1000;
if (!eph_node_disappeared->tryWait(timeout_ms))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Ephemeral node {} still exists, "
"probably it's owned by someone else", active_node_path);
......@@ -518,6 +614,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral);
}
/// Step 2: Execute query from the task.
if (!task.was_executed)
{
/// If table and database engine supports it, they will execute task.ops by their own in a single transaction
......@@ -588,6 +685,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
updateMaxDDLEntryID(task.entry_name);
/// Step 3: Create node in finished/ status dir and write execution status.
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
/// NOTE: If ZooKeeper connection is lost here, we will try again to write query status.
/// NOTE: If both table and database are replicated, task is executed in single ZK transaction.
......
......@@ -123,6 +123,7 @@ protected:
/// Save state of executed task to avoid duplicate execution on ZK error
std::optional<String> last_skipped_entry_name;
std::optional<String> first_failed_task_name;
std::list<DDLTaskPtr> current_tasks;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册