From 3a7d00b66c1ffab6247eb93149ec5a265a300f89 Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Mon, 14 Nov 2022 13:08:00 +0000 Subject: [PATCH] Support delay schedule for ddl scheduler --- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 7 ++++--- src/rootserver/ddl_task/ob_ddl_task.cpp | 14 ++++++++++++++ src/rootserver/ddl_task/ob_ddl_task.h | 9 +++++++-- src/sql/engine/expr/ob_expr_to_outfile_row.cpp | 6 +++--- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 3005790e7a..ea1009891a 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -328,15 +328,16 @@ void ObDDLScheduler::run1() } else if (OB_ISNULL(task)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, task must not be NULL", K(ret)); - } else if (task == first_retry_task) { + } else if (task == first_retry_task || !task->need_schedule()) { // add the task back to the queue if (OB_FAIL(task_queue_.add_task_to_last(task))) { - STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task)); + STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task)); } break; } else { ObCurTraceId::set(task->get_trace_id()); - task->process(); + int task_ret = task->process(); + task->calc_next_schedule_ts(task_ret); if (task->need_retry() && !has_set_stop()) { if (OB_FAIL(task_queue_.add_task_to_last(task))) { STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task)); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 8dc5ed1515..85430f6a7a 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -548,6 +548,20 @@ int ObDDLTask::batch_release_snapshot( return ret; } +void ObDDLTask::calc_next_schedule_ts(int ret_code) +{ + if (OB_TIMEOUT == ret_code) { + const int64_t SEC = 1000000; + delay_schedule_time_ = std::min(delay_schedule_time_ * 6/5 + SEC/10, 30*SEC); + const int64_t max_dt = delay_schedule_time_; + const int64_t min_dt = std::max(0L, max_dt - 3*SEC); + next_schedule_ts_ = ObTimeUtility::current_time() + ObRandom::rand(min_dt, max_dt); + } else { + delay_schedule_time_ = 0; + } + return; +} + #ifdef ERRSIM int ObDDLTask::check_errsim_error() { diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 40a09e0119..64bfddfdc5 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -247,7 +247,8 @@ public: task_type_(task_type), trace_id_(), tenant_id_(0), object_id_(0), schema_version_(0), target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), - allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0) + allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0), + delay_schedule_time_(0), next_schedule_ts_(0) {} virtual ~ObDDLTask() {} virtual int process() = 0; @@ -293,6 +294,8 @@ public: const common::ObIArray &tablet_ids); void set_sys_task_id(const TraceId &sys_task_id) { sys_task_id_ = sys_task_id; } const TraceId &get_sys_task_id() const { return sys_task_id_; } + void calc_next_schedule_ts(int ret_code); + bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); } #ifdef ERRSIM int check_errsim_error(); #endif @@ -302,7 +305,7 @@ public: K(target_object_id_), K(task_status_), K(snapshot_version_), K_(ret_code), K_(task_id), K_(parent_task_id), K_(parent_task_key), K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode), - K_(sys_task_id), K_(err_code_occurence_cnt)); + K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time)); protected: virtual bool is_error_need_retry(const int ret_code) { @@ -333,6 +336,8 @@ protected: lib::Worker::CompatMode compat_mode_; TraceId sys_task_id_; int64_t err_code_occurence_cnt_; // occurence count for all error return codes not in white list. + int64_t delay_schedule_time_; + int64_t next_schedule_ts_; }; enum ColChecksumStat diff --git a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp index 96850cbed6..b3fde5f45e 100644 --- a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp +++ b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp @@ -223,9 +223,9 @@ int ObExprToOutfileRow::extract_fisrt_wchar_from_varhcar(const ObObj &obj, int32 // 2. The FIELDS [OPTIONALLY] ENCLOSED BY character. // 3. The first character of the FIELDS TERMINATED BY and LINES TERMINATED BY values, // if the ENCLOSED BY character is empty or unspecified. -// 4. ASCII 0 (what is actually written following the escape character is ASCII 0, not a -// zero-valued byte). -// 5. If the FIELDS ESCAPED BY character is empty, no characters are escaped and NULL is output +// 4. ASCII 0 (what is actually written following the escape character is ASCII 0, not a +// zero-valued byte). +// 5. If the FIELDS ESCAPED BY character is empty, no characters are escaped and NULL is output // as NULL, not \N. int ObExprToOutfileRow::print_field(char *buf, const int64_t buf_len, int64_t &pos, const ObObj &obj, ObExprOutFileInfo &out_info) -- GitLab